diff --git a/CMakeLists.txt b/CMakeLists.txt index baf1c5b..c08f065 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ project(v4l2rtspserver) option(COVERAGE "Coverage" OFF) set(ALSA ON CACHE BOOL "use ALSA is available") -set(LIVE555URL https://download.videolan.org/pub/contrib/live555/live.2020.03.06.tar.gz CACHE STRING "live555 url") +set(LIVE555URL http://www.live555.com/liveMedia/public/live555-latest.tar.gz CACHE STRING "live555 url") set(LIVE555CFLAGS -DBSD=1 -DSOCKLEN_T=socklen_t -D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE=1 -DALLOW_RTSP_SERVER_PORT_REUSE=1 -DNO_OPENSSL=1 CACHE STRING "live555 CFGLAGS") set(CMAKE_CXX_STANDARD 11) diff --git a/inc/HTTPServer.h b/inc/HTTPServer.h index 4508701..1d88f12 100644 --- a/inc/HTTPServer.h +++ b/inc/HTTPServer.h @@ -14,6 +14,104 @@ #include "RTSPServer.hh" #include "RTSPCommon.hh" +#include // for "ignoreSigPipeOnSocket()" + + +#define TCP_STREAM_SINK_MIN_READ_SIZE 1000 +#define TCP_STREAM_SINK_BUFFER_SIZE 10000 + +class TCPStreamSink: public MediaSink { + public: + TCPStreamSink(UsageEnvironment& env, int socketNum) : MediaSink(env), fUnwrittenBytesStart(0), fUnwrittenBytesEnd(0), fInputSourceIsOpen(False), fOutputSocketIsWritable(True),fOutputSocketNum(socketNum) { + ignoreSigPipeOnSocket(socketNum); + } + + protected: + virtual ~TCPStreamSink() { + envir().taskScheduler().disableBackgroundHandling(fOutputSocketNum); + if (fSource != NULL) { + Medium::close(fSource); + } + } + + protected: + virtual Boolean continuePlaying() { + fInputSourceIsOpen = fSource != NULL; + processBuffer(); + return True; + } + + private: + void processBuffer(){ + // First, try writing data to our output socket, if we can: + if (fOutputSocketIsWritable && numUnwrittenBytes() > 0) { + int numBytesWritten = send(fOutputSocketNum, (const char*)&fBuffer[fUnwrittenBytesStart], numUnwrittenBytes(), 0); + if (numBytesWritten < (int)numUnwrittenBytes()) { + // The output socket is no longer writable. Set a handler to be called when it becomes writable again. + fOutputSocketIsWritable = False; + if (envir().getErrno() != EPIPE) { // on this error, the socket might still be writable, but no longer usable + envir().taskScheduler().setBackgroundHandling(fOutputSocketNum, SOCKET_WRITABLE, socketWritableHandler, this); + } + } + if (numBytesWritten > 0) { + // We wrote at least some of our data. Update our buffer pointers: + fUnwrittenBytesStart += numBytesWritten; + if (fUnwrittenBytesStart > fUnwrittenBytesEnd) fUnwrittenBytesStart = fUnwrittenBytesEnd; // sanity check + if (fUnwrittenBytesStart == fUnwrittenBytesEnd && (!fInputSourceIsOpen || !fSource->isCurrentlyAwaitingData())) { + fUnwrittenBytesStart = fUnwrittenBytesEnd = 0; // reset the buffer to empty + } + } + } + + // Then, read from our input source, if we can (& we're not already reading from it): + if (fInputSourceIsOpen && freeBufferSpace() >= TCP_STREAM_SINK_MIN_READ_SIZE && !fSource->isCurrentlyAwaitingData()) { + fSource->getNextFrame(&fBuffer[fUnwrittenBytesEnd], freeBufferSpace(), afterGettingFrame, this, ourOnSourceClosure, this); + } else if (!fInputSourceIsOpen && numUnwrittenBytes() == 0) { + // We're now done: + onSourceClosure(); + } + } + + static void socketWritableHandler(void* clientData, int mask) { ((TCPStreamSink*)clientData)->socketWritableHandler(); } + void socketWritableHandler() { + envir().taskScheduler().disableBackgroundHandling(fOutputSocketNum); // disable this handler until the next time it's needed + fOutputSocketIsWritable = True; + processBuffer(); + } + + static void afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes, + struct timeval presentationTime, unsigned durationInMicroseconds) { + ((TCPStreamSink*)clientData)->afterGettingFrame(frameSize, numTruncatedBytes); + } + + + void afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes) { + if (numTruncatedBytes > 0) { + envir() << "TCPStreamSink::afterGettingFrame(): The input frame data was too large for our buffer. " + << numTruncatedBytes + << " bytes of trailing data was dropped! Correct this by increasing the definition of \"TCP_STREAM_SINK_BUFFER_SIZE\" in \"include/TCPStreamSink.hh\".\n"; + } + fUnwrittenBytesEnd += frameSize; + processBuffer(); + } + + static void ourOnSourceClosure(void* clientData) { ((TCPStreamSink*)clientData)->ourOnSourceClosure(); } + + void ourOnSourceClosure() { + // The input source has closed: + fInputSourceIsOpen = False; + processBuffer(); + } + + unsigned numUnwrittenBytes() const { return fUnwrittenBytesEnd - fUnwrittenBytesStart; } + unsigned freeBufferSpace() const { return TCP_STREAM_SINK_BUFFER_SIZE - fUnwrittenBytesEnd; } + + private: + unsigned char fBuffer[TCP_STREAM_SINK_BUFFER_SIZE]; + unsigned fUnwrittenBytesStart, fUnwrittenBytesEnd; + Boolean fInputSourceIsOpen, fOutputSocketIsWritable; + int fOutputSocketNum; +}; // --------------------------------------------------------- // Extend RTSP server to add support for HLS and MPEG-DASH @@ -25,7 +123,7 @@ class HTTPServer : public RTSPServer { public: HTTPClientConnection(RTSPServer& ourServer, int clientSocket, struct sockaddr_in clientAddr) - : RTSPServer::RTSPClientConnection(ourServer, clientSocket, clientAddr), fTCPSink(NULL), fStreamToken(NULL), fSubsession(NULL), fSource(NULL) { + : RTSPServer::RTSPClientConnection(ourServer, clientSocket, clientAddr), m_TCPSink(NULL), m_StreamToken(NULL), m_Subsession(NULL) { } virtual ~HTTPClientConnection(); @@ -43,11 +141,10 @@ class HTTPServer : public RTSPServer static void afterStreaming(void* clientData); private: - static u_int32_t fClientSessionId; - TCPStreamSink* fTCPSink; - void* fStreamToken; - ServerMediaSubsession* fSubsession; - FramedSource* fSource; + static u_int32_t m_ClientSessionId; + TCPStreamSink* m_TCPSink; + void* m_StreamToken; + ServerMediaSubsession* m_Subsession; }; public: diff --git a/inc/MemoryBufferSink.h b/inc/MemoryBufferSink.h index a3c8729..1c1fa98 100644 --- a/inc/MemoryBufferSink.h +++ b/inc/MemoryBufferSink.h @@ -19,13 +19,13 @@ class MemoryBufferSink : public MediaSink { public: - static MemoryBufferSink* createNew(UsageEnvironment& env, unsigned int bufferSize, unsigned int sliceDuration) + static MemoryBufferSink* createNew(UsageEnvironment& env, unsigned int bufferSize, unsigned int sliceDuration, unsigned int nbSlices = 5) { - return new MemoryBufferSink(env, bufferSize, sliceDuration); + return new MemoryBufferSink(env, bufferSize, sliceDuration, nbSlices); } protected: - MemoryBufferSink(UsageEnvironment& env, unsigned bufferSize, unsigned int sliceDuration); + MemoryBufferSink(UsageEnvironment& env, unsigned bufferSize, unsigned int sliceDuration, unsigned int nbSlices); virtual ~MemoryBufferSink(); virtual Boolean continuePlaying(); @@ -53,5 +53,6 @@ class MemoryBufferSink : public MediaSink std::map m_outputBuffers; unsigned int m_refTime; unsigned int m_sliceDuration; + unsigned int m_nbSlices; }; diff --git a/src/DeviceSource.cpp b/src/DeviceSource.cpp index 2e5e02d..7895c05 100644 --- a/src/DeviceSource.cpp +++ b/src/DeviceSource.cpp @@ -102,6 +102,7 @@ void* V4L2DeviceSource::thread() { if (FD_ISSET(fd, &fdset)) { + LOG(DEBUG) << "waitingFrame\tdelay:" << (1000-(tv.tv_usec/1000)) << "ms"; if (this->getNextFrame() <= 0) { LOG(ERROR) << "error:" << strerror(errno); @@ -213,6 +214,7 @@ int V4L2DeviceSource::getNextFrame() timersub(&tv,&ref,&diff); m_in.notify(tv.tv_sec, frameSize); LOG(DEBUG) << "getNextFrame\ttimestamp:" << ref.tv_sec << "." << ref.tv_usec << "\tsize:" << frameSize <<"\tdiff:" << (diff.tv_sec*1000+diff.tv_usec/1000) << "ms"; + processFrame(buffer,frameSize,ref); if (m_outfd != -1) { @@ -238,9 +240,9 @@ void V4L2DeviceSource::processFrame(char * frame, int frameSize, const timeval & char* buf = new char[size]; memcpy(buf, frame.first, size); queueFrame(buf,size,ref); + frameList.pop_front(); LOG(DEBUG) << "queueFrame\ttimestamp:" << ref.tv_sec << "." << ref.tv_usec << "\tsize:" << size <<"\tdiff:" << (diff.tv_sec*1000+diff.tv_usec/1000) << "ms"; - frameList.pop_front(); } } diff --git a/src/HTTPServer.cpp b/src/HTTPServer.cpp index 1301c2c..3d11262 100644 --- a/src/HTTPServer.cpp +++ b/src/HTTPServer.cpp @@ -20,11 +20,10 @@ #include "RTSPCommon.hh" #include #include "ByteStreamMemoryBufferSource.hh" -#include "TCPStreamSink.hh" #include "HTTPServer.h" -u_int32_t HTTPServer::HTTPClientConnection::fClientSessionId = 0; +u_int32_t HTTPServer::HTTPClientConnection::m_ClientSessionId = 0; void HTTPServer::HTTPClientConnection::sendHeader(const char* contentType, unsigned int contentLength) { @@ -56,22 +55,16 @@ void HTTPServer::HTTPClientConnection::streamSource(const std::string & content) void HTTPServer::HTTPClientConnection::streamSource(FramedSource* source) { - if (fTCPSink != NULL) + if (m_TCPSink != NULL) { - fTCPSink->stopPlaying(); - Medium::close(fTCPSink); - fTCPSink = NULL; - } - if (fSource != NULL) - { - Medium::close(fSource); - fSource = NULL; + m_TCPSink->stopPlaying(); + Medium::close(m_TCPSink); + m_TCPSink = NULL; } if (source != NULL) { - fTCPSink = TCPStreamSink::createNew(envir(), fClientOutputSocket); - fTCPSink->startPlaying(*source, afterStreaming, this); - fSource = source; // we need to keep tracking of source, because sink do not release it + m_TCPSink = new TCPStreamSink(envir(), fClientOutputSocket); + m_TCPSink->startPlaying(*source, afterStreaming, this); } } @@ -322,17 +315,17 @@ void HTTPServer::HTTPClientConnection::handleHTTPCmd_StreamingGET(char const* ur // Call "getStreamParameters()" to create the stream's source. (Because we're not actually streaming via RTP/RTCP, most // of the parameters to the call are dummy.) - ++fClientSessionId; + ++m_ClientSessionId; Port clientRTPPort(0), clientRTCPPort(0), serverRTPPort(0), serverRTCPPort(0); netAddressBits destinationAddress = 0; u_int8_t destinationTTL = 0; Boolean isMulticast = False; - subsession->getStreamParameters(fClientSessionId, 0, clientRTPPort,clientRTCPPort, -1,0,0, destinationAddress,destinationTTL, isMulticast, serverRTPPort,serverRTCPPort, fStreamToken); + subsession->getStreamParameters(m_ClientSessionId, 0, clientRTPPort,clientRTCPPort, -1,0,0, destinationAddress,destinationTTL, isMulticast, serverRTPPort,serverRTCPPort, m_StreamToken); // Seek the stream source to the desired place, with the desired duration, and (as a side effect) get the number of bytes: double dOffsetInSeconds = (double)offsetInSeconds; u_int64_t numBytes = 0; - subsession->seekStream(fClientSessionId, fStreamToken, dOffsetInSeconds, 0.0, numBytes); + subsession->seekStream(m_ClientSessionId, m_StreamToken, dOffsetInSeconds, 0.0, numBytes); if (numBytes == 0) { @@ -346,10 +339,10 @@ void HTTPServer::HTTPClientConnection::handleHTTPCmd_StreamingGET(char const* ur this->sendHeader("video/mp2t", numBytes); // stream body - this->streamSource(subsession->getStreamSource(fStreamToken)); + this->streamSource(subsession->getStreamSource(m_StreamToken)); // pointer to subsession to close it - fSubsession = subsession; + m_Subsession = subsession; } } } @@ -385,7 +378,7 @@ HTTPServer::HTTPClientConnection::~HTTPClientConnection() { this->streamSource(NULL); - if (fSubsession) { - fSubsession->deleteStream(fClientSessionId, fStreamToken); + if (m_Subsession) { + m_Subsession->deleteStream(m_ClientSessionId, m_StreamToken); } } diff --git a/src/MemoryBufferSink.cpp b/src/MemoryBufferSink.cpp index 61f9aaa..50d8bc9 100644 --- a/src/MemoryBufferSink.cpp +++ b/src/MemoryBufferSink.cpp @@ -12,7 +12,7 @@ // ----------------------------------------- // MemoryBufferSink // ----------------------------------------- -MemoryBufferSink::MemoryBufferSink(UsageEnvironment& env, unsigned bufferSize, unsigned int sliceDuration) : MediaSink(env), m_bufferSize(bufferSize), m_refTime(0), m_sliceDuration(sliceDuration) +MemoryBufferSink::MemoryBufferSink(UsageEnvironment& env, unsigned bufferSize, unsigned int sliceDuration, unsigned int nbSlices) : MediaSink(env), m_bufferSize(bufferSize), m_refTime(0), m_sliceDuration(sliceDuration), m_nbSlices(nbSlices) { m_buffer = new unsigned char[m_bufferSize]; } @@ -59,7 +59,7 @@ void MemoryBufferSink::afterGettingFrame(unsigned frameSize, unsigned numTruncat outputBuffer.append((const char*)m_buffer, frameSize); // remove old buffers - while (m_outputBuffers.size()>3) + while (m_outputBuffers.size()>m_nbSlices) { m_outputBuffers.erase(m_outputBuffers.begin()); }