From 8a593ab42fc1ac9c16371bb1879fb9578c2ef111 Mon Sep 17 00:00:00 2001 From: mpromonet Date: Sun, 17 Apr 2016 16:39:07 +0000 Subject: [PATCH] continue to work on HLS using memory buffers --- inc/ServerMediaSubsession.h | 245 +++++++++++++++++++--------------- src/ServerMediaSubsession.cpp | 7 - src/main.cpp | 240 +++++++++++++++------------------ 3 files changed, 246 insertions(+), 246 deletions(-) diff --git a/inc/ServerMediaSubsession.h b/inc/ServerMediaSubsession.h index bf39744..62a3166 100644 --- a/inc/ServerMediaSubsession.h +++ b/inc/ServerMediaSubsession.h @@ -10,11 +10,13 @@ #ifndef SERVER_MEDIA_SUBSESSION #define SERVER_MEDIA_SUBSESSION +#include + #include #include #include #include -#include +#include // live555 #include @@ -87,92 +89,136 @@ class UnicastServerMediaSubsession : public OnDemandServerMediaSubsession , publ // ----------------------------------------- // ServerMediaSubsession for HLS // ----------------------------------------- - -class HLSSink : public MediaSink +class HLSServerMediaSubsession : public UnicastServerMediaSubsession { - public: - static HLSSink* createNew(UsageEnvironment& env, unsigned int bufferSize) - { - return new HLSSink(env, bufferSize); - } - - protected: - HLSSink(UsageEnvironment& env, unsigned bufferSize) : MediaSink(env), m_bufferSize(bufferSize), m_slice(0), m_firstslice(0) - { - m_buffer = new unsigned char[m_bufferSize]; - } - - virtual ~HLSSink() - { - delete[] m_buffer; - } - - - virtual Boolean continuePlaying() - { - Boolean ret = False; - if (fSource != NULL) + class HLSSink : public MediaSink + { + public: + static HLSSink* createNew(UsageEnvironment& env, unsigned int bufferSize, unsigned int sliceDuration) { - fSource->getNextFrame(m_buffer, m_bufferSize, - afterGettingFrame, this, - onSourceClosure, this); - ret = True; + return new HLSSink(env, bufferSize, sliceDuration); } - return ret; - } - - static void afterGettingFrame(void* clientData, unsigned frameSize, - unsigned numTruncatedBytes, - struct timeval presentationTime, - unsigned /*durationInMicroseconds*/) - { - HLSSink* sink = (HLSSink*)clientData; - sink->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime); - } - - void afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime) - { - if (numTruncatedBytes > 0) + + protected: + HLSSink(UsageEnvironment& env, unsigned bufferSize, unsigned int sliceDuration) : MediaSink(env), m_bufferSize(bufferSize), m_refTime(0), m_sliceDuration(sliceDuration) { - envir() << "FileSink::afterGettingFrame(): The input frame data was too large for our buffer size \n"; + m_buffer = new unsigned char[m_bufferSize]; } - if (m_os.is_open()) + + virtual ~HLSSink() { - if (m_slice != (presentationTime.tv_sec/10)) + delete[] m_buffer; + } + + + virtual Boolean continuePlaying() + { + Boolean ret = False; + if (fSource != NULL) { - m_os.close(); + fSource->getNextFrame(m_buffer, m_bufferSize, + afterGettingFrame, this, + onSourceClosure, this); + ret = True; } + return ret; } - if (!m_os.is_open()) + + static void afterGettingFrame(void* clientData, unsigned frameSize, + unsigned numTruncatedBytes, + struct timeval presentationTime, + unsigned durationInMicroseconds) { - m_slice = presentationTime.tv_sec/10; - if (m_firstslice == 0) + HLSSink* sink = (HLSSink*)clientData; + sink->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime); + } + + void afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime) + { + if (numTruncatedBytes > 0) { - m_firstslice = m_slice; + envir() << "FileSink::afterGettingFrame(): The input frame data was too large for our buffer size \n"; + // realloc a bigger buffer + m_bufferSize += numTruncatedBytes; + delete[] m_buffer; + m_buffer = new unsigned char[m_bufferSize]; } - std::ostringstream os; - os << m_slice << ".ts"; - m_os.open(os.str().c_str()); + else + { + // append buffer to slice buffer + if (m_refTime == 0) + { + m_refTime = presentationTime.tv_sec; + } + unsigned int slice = (presentationTime.tv_sec-m_refTime)/m_sliceDuration; + std::string& outputBuffer = m_outputBuffers[slice]; + outputBuffer.append((const char*)m_buffer, frameSize); + + // remove old buffers + while (m_outputBuffers.size()>5) + { + m_outputBuffers.erase(m_outputBuffers.begin()); + } + } + + continuePlaying(); } - if (m_os.is_open()) + + public: + unsigned int getHLSBufferSize(unsigned int slice) { - m_os.write((char*)m_buffer, frameSize); + unsigned int size = 0; + std::map::iterator it = m_outputBuffers.find(slice); + if (it != m_outputBuffers.end()) + { + size = it->second.size(); + } + return size; } - - continuePlaying(); - } - - private: - unsigned char * m_buffer; - unsigned int m_bufferSize; - std::ofstream m_os; - public: - unsigned int m_slice; - unsigned int m_firstslice; -}; - -class HLSServerMediaSubsession : public OnDemandServerMediaSubsession , public BaseServerMediaSubsession -{ + + const char* getHLSBuffer(unsigned int slice) + { + const char* content = NULL; + std::map::iterator it = m_outputBuffers.find(slice); + if (it != m_outputBuffers.end()) + { + content = it->second.c_str(); + } + return content; + } + + unsigned int firstTime() + { + unsigned int firstTime = 0; + if (m_outputBuffers.size() != 0) + { + firstTime = m_outputBuffers.begin()->first; + } + return firstTime*m_sliceDuration; + } + + unsigned int duration() + { + unsigned int duration = 0; + if (m_outputBuffers.size() != 0) + { + duration = m_outputBuffers.rbegin()->first - m_outputBuffers.begin()->first; + } + return (duration)*m_sliceDuration; + } + unsigned int getSliceDuration() + { + return m_sliceDuration; + } + + private: + unsigned char * m_buffer; + unsigned int m_bufferSize; + std::map m_outputBuffers; + unsigned int m_refTime; + unsigned int m_sliceDuration; + }; + public: static HLSServerMediaSubsession* createNew(UsageEnvironment& env, StreamReplicator* replicator, const std::string& format) { @@ -181,57 +227,48 @@ class HLSServerMediaSubsession : public OnDemandServerMediaSubsession , public B protected: HLSServerMediaSubsession(UsageEnvironment& env, StreamReplicator* replicator, const std::string& format) - : OnDemandServerMediaSubsession(env, False), BaseServerMediaSubsession(replicator), m_format(format) + : UnicastServerMediaSubsession(env, replicator, format) { // Create a source FramedSource* source = replicator->createStreamReplica(); FramedSource* videoSource = createSource(env, source, format); - // Start Playing the Sink - m_videoSink = HLSSink::createNew(env, 65535); - m_videoSink->startPlaying(*videoSource, NULL, NULL); + // Start Playing the HLS Sink + m_hlsSink = HLSSink::createNew(env, OutPacketBuffer::maxSize, 10); + m_hlsSink->startPlaying(*videoSource, NULL, NULL); + } + virtual ~HLSServerMediaSubsession() + { + Medium::close(m_hlsSink); } - virtual FramedSource* createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate) + virtual float getCurrentNPT(void* streamToken) { - FramedSource* source = m_replicator->createStreamReplica(); - return createSource(envir(), source, m_format); - } - - virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource) - { - return createSink(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, m_format); + return (m_hlsSink->firstTime()); } - - virtual char const* getAuxSDPLine(RTPSink* rtpSink,FramedSource* inputSource); - virtual float duration() const { - std::cout << "duration " << (m_videoSink->m_slice - m_videoSink->m_firstslice)*10 << std::endl; - return (m_videoSink->m_slice - m_videoSink->m_firstslice)*10; + virtual float duration() const + { + return (m_hlsSink->duration()); } virtual void seekStream(unsigned clientSessionId, void* streamToken, double& seekNPT, double streamDuration, u_int64_t& numBytes) { - m_slice = seekNPT / 10; - seekNPT = m_slice*10; - std::ostringstream os; - os << m_slice+m_videoSink->m_firstslice << ".ts"; - struct stat sb; - int statResult = stat(os.str().c_str(), &sb); - if (statResult == 0) - { - numBytes = sb.st_size; - } + m_slice = seekNPT / m_hlsSink->getSliceDuration(); + seekNPT = m_slice * m_hlsSink->getSliceDuration(); + numBytes = m_hlsSink->getHLSBufferSize(m_slice); + std::cout << "seek seekNPT:" << seekNPT << " slice:" << m_slice << " numBytes:" << numBytes << std::endl; + } virtual FramedSource* getStreamSource(void* streamToken) { - std::ostringstream os; - os << m_slice+m_videoSink->m_firstslice << ".ts"; - return ByteStreamFileSource::createNew(envir(), os.str().c_str()); + unsigned int size = m_hlsSink->getHLSBufferSize(m_slice); + u_int8_t* content = new u_int8_t[size]; + memcpy(content, m_hlsSink->getHLSBuffer(m_slice), size); + return ByteStreamMemoryBufferSource::createNew(envir(), content, size); } protected: - const std::string m_format; unsigned int m_slice; - HLSSink * m_videoSink; + HLSSink * m_hlsSink; }; #endif diff --git a/src/ServerMediaSubsession.cpp b/src/ServerMediaSubsession.cpp index 38096dc..09d6753 100644 --- a/src/ServerMediaSubsession.cpp +++ b/src/ServerMediaSubsession.cpp @@ -145,10 +145,3 @@ char const* UnicastServerMediaSubsession::getAuxSDPLine(RTPSink* rtpSink,FramedS return this->getAuxLine(dynamic_cast(m_replicator->inputSource()), rtpSink->rtpPayloadType()); } -// ----------------------------------------- -// ServerMediaSubsession for Unicast -// ----------------------------------------- -char const* HLSServerMediaSubsession::getAuxSDPLine(RTPSink* rtpSink,FramedSource* inputSource) -{ - return this->getAuxLine(dynamic_cast(m_replicator->inputSource()), rtpSink->rtpPayloadType()); -} diff --git a/src/main.cpp b/src/main.cpp index af13ce6..2434bbb 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -13,20 +13,17 @@ ** -------------------------------------------------------------------------*/ -#include +#include #include "RTSPServer.hh" #include "RTSPCommon.hh" -#include #include -#ifndef _BYTE_STREAM_MEMORY_BUFFER_SOURCE_HH #include "ByteStreamMemoryBufferSource.hh" -#endif -#ifndef _TCP_STREAM_SINK_HH #include "TCPStreamSink.hh" -#endif - +// ----------------------------------------- +// RTSP server supporting live HLS +// ----------------------------------------- class HLSServer : public RTSPServer { @@ -34,102 +31,100 @@ class HLSServer : public RTSPServer { public: HLSClientConnection(RTSPServer& ourServer, int clientSocket, struct sockaddr_in clientAddr) - : RTSPServer::RTSPClientConnection(ourServer, clientSocket, clientAddr), fClientSessionId(0), fStreamSource(NULL), fPlaylistSource(NULL), fTCPSink(NULL) { + : RTSPServer::RTSPClientConnection(ourServer, clientSocket, clientAddr), fClientSessionId(0), fTCPSink(NULL) { } ~HLSClientConnection() { - if (fTCPSink != NULL) fTCPSink->stopPlaying(); - Medium::close(fPlaylistSource); - Medium::close(fStreamSource); - Medium::close(fTCPSink); + if (fTCPSink != NULL) + { + fTCPSink->stopPlaying(); + Medium::close(fTCPSink); + } } private: - void sendPlayList(char const* urlSuffix) + void sendHeader(const char* contentType, unsigned int contentLength) { - // First, make sure that the named file exists, and is streamable: - ServerMediaSession* session = fOurServer.lookupServerMediaSession(urlSuffix); - if (session == NULL) { - std::cout << "============no session==============" << std::endl; - handleHTTPCmd_notFound(); - return; - } - - // To be able to construct a playlist for the requested file, we need to know its duration: - float duration = session->duration(); - if (duration <= 0.0) { - std::cout << "============no duration==============" << std::endl; - handleHTTPCmd_notSupported(); - return; - } - - // Now, construct the playlist. It will consist of a prefix, one or more media file specifications, and a suffix: - unsigned const maxIntLen = 10; // >= the maximum possible strlen() of an integer in the playlist - char const* const playlistPrefixFmt = - "#EXTM3U\r\n" - "#EXT-X-ALLOW-CACHE:YES\r\n" - "#EXT-X-MEDIA-SEQUENCE:%d\r\n" - "#EXT-X-TARGETDURATION:%d\r\n"; - unsigned const playlistPrefixFmt_maxLen = strlen(playlistPrefixFmt) + maxIntLen; - - char const* const playlistMediaFileSpecFmt = - "#EXTINF:%d,\r\n" - "%s?segment=%d,%d\r\n"; - unsigned const playlistMediaFileSpecFmt_maxLen = strlen(playlistMediaFileSpecFmt) + maxIntLen + strlen(urlSuffix) + 2*maxIntLen; - - // Figure out the 'target duration' that will produce a playlist that will fit in our response buffer. (But make it at least 10s.) - unsigned const playlistMaxSize = 10000; - unsigned const mediaFileSpecsMaxSize = playlistMaxSize - (playlistPrefixFmt_maxLen /*+ playlistSuffixFmt_maxLen*/); - unsigned const maxNumMediaFileSpecs = mediaFileSpecsMaxSize/playlistMediaFileSpecFmt_maxLen; - - unsigned targetDuration = (unsigned)(duration/maxNumMediaFileSpecs + 1); - if (targetDuration < 10) targetDuration = 10; - - unsigned int startTime = 0; - char* playlist = new char[playlistMaxSize]; - char* s = playlist; - sprintf(s, playlistPrefixFmt, startTime,targetDuration); - s += strlen(s); - - unsigned durSoFar = startTime; - while (1) { - unsigned dur = targetDuration < duration ? targetDuration : (unsigned)duration; - duration -= dur; - sprintf(s, playlistMediaFileSpecFmt, dur, urlSuffix, durSoFar, dur); - s += strlen(s); - if (duration < 1.0) break; - - durSoFar += dur; - } - - unsigned playlistLen = s - playlist; - - // Construct our response: - snprintf((char*)fResponseBuffer, sizeof fResponseBuffer, + // Construct our response: + snprintf((char*)fResponseBuffer, sizeof fResponseBuffer, "HTTP/1.1 200 OK\r\n" "%s" "Server: LIVE555 Streaming Media v%s\r\n" + "Content-Type: %s\r\n" "Content-Length: %d\r\n" - "Content-Type: application/vnd.apple.mpegurl\r\n" "\r\n", dateHeader(), LIVEMEDIA_LIBRARY_VERSION_STRING, - playlistLen); + contentType, + contentLength); - // Send the response header now, because we're about to add more data (the playlist): - send(fClientOutputSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0); - fResponseBuffer[0] = '\0'; // We've already sent the response. This tells the calling code not to send it again. + // Send the response header + send(fClientOutputSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0); + fResponseBuffer[0] = '\0'; // We've already sent the response. This tells the calling code not to send it again. + } + + void streamSource(FramedSource* source) + { + if (fTCPSink != NULL) + { + fTCPSink->stopPlaying(); + Medium::close(fTCPSink); + fTCPSink = NULL; + } + if (source != NULL) + { + fTCPSink = TCPStreamSink::createNew(envir(), fClientOutputSocket); + fTCPSink->startPlaying(*source, afterStreaming, this); + } + } + + void sendPlayList(char const* urlSuffix) + { + // First, make sure that the named file exists, and is streamable: + ServerMediaSession* session = fOurServer.lookupServerMediaSession(urlSuffix); + if (session == NULL) { + handleHTTPCmd_notFound(); + return; + } - // Then, send the playlist. Because it's large, we don't do so using "send()", because that might not send it all at once. - // Instead, we stream the playlist over the TCP socket: - if (fPlaylistSource != NULL) { // sanity check - if (fTCPSink != NULL) fTCPSink->stopPlaying(); - Medium::close(fPlaylistSource); - } - fPlaylistSource = ByteStreamMemoryBufferSource::createNew(envir(), (u_int8_t*)playlist, playlistLen); - if (fTCPSink == NULL) fTCPSink = TCPStreamSink::createNew(envir(), fClientOutputSocket); - fTCPSink->startPlaying(*fPlaylistSource, afterStreaming, this); + // To be able to construct a playlist for the requested file, we need to know its duration: + float duration = session->duration(); + if (duration <= 0.0) { + handleHTTPCmd_notSupported(); + return; + } + + ServerMediaSubsessionIterator iter(*session); + ServerMediaSubsession* subsession = iter.next(); + if (subsession == NULL) { + handleHTTPCmd_notSupported(); + return; + } + + unsigned int startTime = subsession->getCurrentNPT(NULL); + unsigned sliceDuration = 10; + std::ostringstream os; + os << "#EXTM3U\r\n" + << "#EXT-X-ALLOW-CACHE:YES\r\n" + << "#EXT-X-MEDIA-SEQUENCE:" << startTime << "\r\n" + << "#EXT-X-TARGETDURATION:" << sliceDuration << "\r\n"; + + for (unsigned int slice=0; slice*sliceDurationsendHeader("application/vnd.apple.mpegurl", playList.size()); + + // stream body + u_int8_t* playListBuffer = new u_int8_t[playList.size()]; + memcpy(playListBuffer, playList.c_str(), playList.size()); + this->streamSource(ByteStreamMemoryBufferSource::createNew(envir(), playListBuffer, playList.size())); } void handleHTTPCmd_StreamingGET(char const* urlSuffix, char const* /*fullRequestStr*/) { @@ -138,8 +133,8 @@ class HLSServer : public RTSPServer do { char const* questionMarkPos = strrchr(urlSuffix, '?'); if (questionMarkPos == NULL) break; - unsigned offsetInSeconds, durationInSeconds; - if (sscanf(questionMarkPos, "?segment=%u,%u", &offsetInSeconds, &durationInSeconds) != 2) break; + unsigned offsetInSeconds; + if (sscanf(questionMarkPos, "?segment=%u", &offsetInSeconds) != 1) break; char* streamName = strDup(urlSuffix); streamName[questionMarkPos-urlSuffix] = '\0'; @@ -173,42 +168,21 @@ class HLSServer : public RTSPServer // Seek the stream source to the desired place, with the desired duration, and (as a side effect) get the number of bytes: double dOffsetInSeconds = (double)offsetInSeconds; - u_int64_t numBytes; - subsession->seekStream(fClientSessionId, streamToken, dOffsetInSeconds, (double)durationInSeconds, numBytes); - unsigned numTSBytesToStream = (unsigned)numBytes; - std::cout << "numTSBytesToStream:" << numTSBytesToStream << std::endl; + u_int64_t numBytes = 0; + subsession->seekStream(fClientSessionId, streamToken, dOffsetInSeconds, 0.0, numBytes); - if (numTSBytesToStream == 0) { + if (numBytes == 0) { // For some reason, we do not know the size of the requested range. We can't handle this request: handleHTTPCmd_notSupported(); break; } - // Construct our response: - snprintf((char*)fResponseBuffer, sizeof fResponseBuffer, - "HTTP/1.1 200 OK\r\n" - "%s" - "Server: LIVE555 Streaming Media v%s\r\n" - "Content-Length: %d\r\n" - "Content-Type: text/plain; charset=ISO-8859-1\r\n" - "\r\n", - dateHeader(), - LIVEMEDIA_LIBRARY_VERSION_STRING, - numTSBytesToStream); - // Send the response now, because we're about to add more data (from the source): - send(fClientOutputSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0); - fResponseBuffer[0] = '\0'; // We've already sent the response. This tells the calling code not to send it again. + // send response header + this->sendHeader("video/mp2t", numBytes); - // Ask the media source to deliver - to the TCP sink - the desired data: - if (fStreamSource != NULL) { // sanity check - if (fTCPSink != NULL) fTCPSink->stopPlaying(); - Medium::close(fStreamSource); - } - fStreamSource = subsession->getStreamSource(streamToken); - if (fStreamSource != NULL) { - if (fTCPSink == NULL) fTCPSink = TCPStreamSink::createNew(envir(), fClientOutputSocket); - fTCPSink->startPlaying(*fStreamSource, afterStreaming, this); - } + // stream body + this->streamSource(subsession->getStreamSource(streamToken)); + } while(0); delete[] streamName; @@ -218,8 +192,8 @@ class HLSServer : public RTSPServer this->sendPlayList(urlSuffix); } - static void afterStreaming(void* clientData) { - std::cout << "afterStreaming" << std::endl; + static void afterStreaming(void* clientData) + { HLSServer::HLSClientConnection* clientConnection = (HLSServer::HLSClientConnection*)clientData; // Arrange to delete the 'client connection' object: if (clientConnection->fRecursionCount > 0) { @@ -227,33 +201,27 @@ class HLSServer : public RTSPServer clientConnection->fIsActive = False; // will cause the object to get deleted at the end of handling the request } else { // We're no longer handling a request; delete the object now: -// delete clientConnection; + delete clientConnection; } } - private: + private: u_int32_t fClientSessionId; - FramedSource* fStreamSource; - ByteStreamMemoryBufferSource* fPlaylistSource; TCPStreamSink* fTCPSink; }; public: static HLSServer* createNew(UsageEnvironment& env, Port rtspPort, UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds) { - int ourSocket = setUpOurSocket(env, rtspPort); - if (ourSocket == -1) return NULL; - - return new HLSServer(env, ourSocket, rtspPort, authDatabase, reclamationTestSeconds); + int ourSocket = setUpOurSocket(env, rtspPort); + if (ourSocket == -1) return NULL; + return new HLSServer(env, ourSocket, rtspPort, authDatabase, reclamationTestSeconds); } HLSServer(UsageEnvironment& env, int ourSocket, Port rtspPort, UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds) : RTSPServer(env, ourSocket, rtspPort, authDatabase, reclamationTestSeconds) { } - virtual ~HLSServer() { - } - RTSPServer::RTSPClientConnection* createNewClientConnection(int clientSocket, struct sockaddr_in clientAddr) { - return new HLSClientConnection(*this, clientSocket, clientAddr); + return new HLSClientConnection(*this, clientSocket, clientAddr); } }; @@ -561,12 +529,14 @@ int main(int argc, char** argv) rtcpPortNum+=2; } - // Create Unicast Session - addSession(rtspServer, baseUrl+url, UnicastServerMediaSubsession::createNew(*env,replicator,rtpFormat)); - + // Create Unicast Session if (muxTS) { - addSession(rtspServer, "hls", HLSServerMediaSubsession::createNew(*env,replicator,rtpFormat)); + addSession(rtspServer, baseUrl+url, HLSServerMediaSubsession::createNew(*env,replicator,rtpFormat)); + } + else + { + addSession(rtspServer, baseUrl+url, UnicastServerMediaSubsession::createNew(*env,replicator,rtpFormat)); } } if (out)