@ -14,6 +14,104 @@
# include "RTSPServer.hh"
# include "RTSPCommon.hh"
# include <GroupsockHelper.hh> // for "ignoreSigPipeOnSocket()"
# define TCP_STREAM_SINK_MIN_READ_SIZE 1000
# define TCP_STREAM_SINK_BUFFER_SIZE 10000
class TCPStreamSink : public MediaSink {
public :
TCPStreamSink ( UsageEnvironment & env , int socketNum ) : MediaSink ( env ) , fUnwrittenBytesStart ( 0 ) , fUnwrittenBytesEnd ( 0 ) , fInputSourceIsOpen ( False ) , fOutputSocketIsWritable ( True ) , fOutputSocketNum ( socketNum ) {
ignoreSigPipeOnSocket ( socketNum ) ;
}
protected :
virtual ~ TCPStreamSink ( ) {
envir ( ) . taskScheduler ( ) . disableBackgroundHandling ( fOutputSocketNum ) ;
if ( fSource ! = NULL ) {
Medium : : close ( fSource ) ;
}
}
protected :
virtual Boolean continuePlaying ( ) {
fInputSourceIsOpen = fSource ! = NULL ;
processBuffer ( ) ;
return True ;
}
private :
void processBuffer ( ) {
// First, try writing data to our output socket, if we can:
if ( fOutputSocketIsWritable & & numUnwrittenBytes ( ) > 0 ) {
int numBytesWritten = send ( fOutputSocketNum , ( const char * ) & fBuffer [ fUnwrittenBytesStart ] , numUnwrittenBytes ( ) , 0 ) ;
if ( numBytesWritten < ( int ) numUnwrittenBytes ( ) ) {
// The output socket is no longer writable. Set a handler to be called when it becomes writable again.
fOutputSocketIsWritable = False ;
if ( envir ( ) . getErrno ( ) ! = EPIPE ) { // on this error, the socket might still be writable, but no longer usable
envir ( ) . taskScheduler ( ) . setBackgroundHandling ( fOutputSocketNum , SOCKET_WRITABLE , socketWritableHandler , this ) ;
}
}
if ( numBytesWritten > 0 ) {
// We wrote at least some of our data. Update our buffer pointers:
fUnwrittenBytesStart + = numBytesWritten ;
if ( fUnwrittenBytesStart > fUnwrittenBytesEnd ) fUnwrittenBytesStart = fUnwrittenBytesEnd ; // sanity check
if ( fUnwrittenBytesStart = = fUnwrittenBytesEnd & & ( ! fInputSourceIsOpen | | ! fSource - > isCurrentlyAwaitingData ( ) ) ) {
fUnwrittenBytesStart = fUnwrittenBytesEnd = 0 ; // reset the buffer to empty
}
}
}
// Then, read from our input source, if we can (& we're not already reading from it):
if ( fInputSourceIsOpen & & freeBufferSpace ( ) > = TCP_STREAM_SINK_MIN_READ_SIZE & & ! fSource - > isCurrentlyAwaitingData ( ) ) {
fSource - > getNextFrame ( & fBuffer [ fUnwrittenBytesEnd ] , freeBufferSpace ( ) , afterGettingFrame , this , ourOnSourceClosure , this ) ;
} else if ( ! fInputSourceIsOpen & & numUnwrittenBytes ( ) = = 0 ) {
// We're now done:
onSourceClosure ( ) ;
}
}
static void socketWritableHandler ( void * clientData , int mask ) { ( ( TCPStreamSink * ) clientData ) - > socketWritableHandler ( ) ; }
void socketWritableHandler ( ) {
envir ( ) . taskScheduler ( ) . disableBackgroundHandling ( fOutputSocketNum ) ; // disable this handler until the next time it's needed
fOutputSocketIsWritable = True ;
processBuffer ( ) ;
}
static void afterGettingFrame ( void * clientData , unsigned frameSize , unsigned numTruncatedBytes ,
struct timeval presentationTime , unsigned durationInMicroseconds ) {
( ( TCPStreamSink * ) clientData ) - > afterGettingFrame ( frameSize , numTruncatedBytes ) ;
}
void afterGettingFrame ( unsigned frameSize , unsigned numTruncatedBytes ) {
if ( numTruncatedBytes > 0 ) {
envir ( ) < < " TCPStreamSink::afterGettingFrame(): The input frame data was too large for our buffer. "
< < numTruncatedBytes
< < " bytes of trailing data was dropped! Correct this by increasing the definition of \" TCP_STREAM_SINK_BUFFER_SIZE \" in \" include/TCPStreamSink.hh \" . \n " ;
}
fUnwrittenBytesEnd + = frameSize ;
processBuffer ( ) ;
}
static void ourOnSourceClosure ( void * clientData ) { ( ( TCPStreamSink * ) clientData ) - > ourOnSourceClosure ( ) ; }
void ourOnSourceClosure ( ) {
// The input source has closed:
fInputSourceIsOpen = False ;
processBuffer ( ) ;
}
unsigned numUnwrittenBytes ( ) const { return fUnwrittenBytesEnd - fUnwrittenBytesStart ; }
unsigned freeBufferSpace ( ) const { return TCP_STREAM_SINK_BUFFER_SIZE - fUnwrittenBytesEnd ; }
private :
unsigned char fBuffer [ TCP_STREAM_SINK_BUFFER_SIZE ] ;
unsigned fUnwrittenBytesStart , fUnwrittenBytesEnd ;
Boolean fInputSourceIsOpen , fOutputSocketIsWritable ;
int fOutputSocketNum ;
} ;
// ---------------------------------------------------------
// Extend RTSP server to add support for HLS and MPEG-DASH
@ -25,7 +123,7 @@ class HTTPServer : public RTSPServer
{
public :
HTTPClientConnection ( RTSPServer & ourServer , int clientSocket , struct sockaddr_in clientAddr )
: RTSPServer : : RTSPClientConnection ( ourServer , clientSocket , clientAddr ) , fTCPSink( NULL ) , fStreamToken ( NULL ) , fSubsession ( NULL ) , fSource ( NULL ) {
: RTSPServer : : RTSPClientConnection ( ourServer , clientSocket , clientAddr ) , m_TCPSink( NULL ) , m_StreamToken ( NULL ) , m_Subsession ( NULL ) {
}
virtual ~ HTTPClientConnection ( ) ;
@ -43,11 +141,10 @@ class HTTPServer : public RTSPServer
static void afterStreaming ( void * clientData ) ;
private :
static u_int32_t fClientSessionId ;
TCPStreamSink * fTCPSink ;
void * fStreamToken ;
ServerMediaSubsession * fSubsession ;
FramedSource * fSource ;
static u_int32_t m_ClientSessionId ;
TCPStreamSink * m_TCPSink ;
void * m_StreamToken ;
ServerMediaSubsession * m_Subsession ;
} ;
public :