@ -22,54 +22,8 @@
# define HEADER_SIZE 12
# define HEADER_SIZE 12
# define NO_PTS UINT64_C(-1)
# define NO_PTS UINT64_C(-1)
static struct frame_meta *
frame_meta_new ( uint64_t pts ) {
struct frame_meta * meta = SDL_malloc ( sizeof ( * meta ) ) ;
if ( ! meta ) {
return meta ;
}
meta - > pts = pts ;
meta - > next = NULL ;
return meta ;
}
static void
frame_meta_delete ( struct frame_meta * frame_meta ) {
SDL_free ( frame_meta ) ;
}
static bool
static bool
receiver_state_push_meta ( struct receiver_state * state , uint64_t pts ) {
stream_recv_packet ( struct stream * stream , AVPacket * packet ) {
struct frame_meta * frame_meta = frame_meta_new ( pts ) ;
if ( ! frame_meta ) {
return false ;
}
// append to the list
// (iterate to find the last item, in practice the list should be tiny)
struct frame_meta * * p = & state - > frame_meta_queue ;
while ( * p ) {
p = & ( * p ) - > next ;
}
* p = frame_meta ;
return true ;
}
static uint64_t
receiver_state_take_meta ( struct receiver_state * state ) {
struct frame_meta * frame_meta = state - > frame_meta_queue ; // first item
SDL_assert ( frame_meta ) ; // must not be empty
uint64_t pts = frame_meta - > pts ;
state - > frame_meta_queue = frame_meta - > next ; // remove the item
frame_meta_delete ( frame_meta ) ;
return pts ;
}
static int
read_packet_with_meta ( void * opaque , uint8_t * buf , int buf_size ) {
struct stream * stream = opaque ;
struct receiver_state * state = & stream - > receiver_state ;
// The video stream contains raw packets, without time information. When we
// The video stream contains raw packets, without time information. When we
// record, we retrieve the timestamps separately, from a "meta" header
// record, we retrieve the timestamps separately, from a "meta" header
// added by the server before each raw packet.
// added by the server before each raw packet.
@ -82,60 +36,30 @@ read_packet_with_meta(void *opaque, uint8_t *buf, int buf_size) {
//
//
// It is followed by <packet_size> bytes containing the packet/frame.
// It is followed by <packet_size> bytes containing the packet/frame.
if ( ! state - > remaining ) {
# define HEADER_SIZE 12
uint8_t header [ HEADER_SIZE ] ;
uint8_t header [ HEADER_SIZE ] ;
ssize_t r = net_recv_all ( stream - > socket , header , HEADER_SIZE ) ;
ssize_t r = net_recv_all ( stream - > socket , header , HEADER_SIZE ) ;
if ( r = = - 1 ) {
if ( r < HEADER_SIZE ) {
return AVERROR ( errno ) ;
return false ;
}
if ( r = = 0 ) {
return AVERROR_EOF ;
}
}
// no partial read (net_recv_all())
SDL_assert_release ( r = = HEADER_SIZE ) ;
uint64_t pts = buffer_read64be ( header ) ;
uint64_t pts = buffer_read64be ( header ) ;
state - > remaining = buffer_read32be ( & header [ 8 ] ) ;
uint32_t len = buffer_read32be ( & header [ 8 ] ) ;
SDL_assert ( len ) ;
if ( pts ! = NO_PTS & & ! receiver_state_push_meta ( state , pts ) ) {
LOGE ( " Could not store PTS for recording " ) ;
// we could not save the PTS, the recording would be broken
return AVERROR ( ENOMEM ) ;
}
}
SDL_assert ( state - > remaining ) ;
if ( buf_size > state - > remaining ) {
if ( av_new_packet ( packet , len ) ) {
buf_size = state - > remaining ;
LOGE ( " Could not allocate packet " ) ;
return false ;
}
}
ssize_t r = net_recv ( stream - > socket , buf , buf_size ) ;
r = net_recv_all ( stream - > socket , packet - > data , len ) ;
if ( r = = - 1 ) {
if ( r < len ) {
return errno ? AVERROR ( errno ) : AVERROR_EOF ;
av_packet_unref ( packet ) ;
}
return false ;
if ( r = = 0 ) {
return AVERROR_EOF ;
}
}
SDL_assert ( state - > remaining > = r ) ;
packet - > pts = pts ! = NO_PTS ? pts : AV_NOPTS_VALUE ;
state - > remaining - = r ;
return r ;
}
static int
return true ;
read_raw_packet ( void * opaque , uint8_t * buf , int buf_size ) {
struct stream * stream = opaque ;
ssize_t r = net_recv ( stream - > socket , buf , buf_size ) ;
if ( r = = - 1 ) {
return errno ? AVERROR ( errno ) : AVERROR_EOF ;
}
if ( r = = 0 ) {
return AVERROR_EOF ;
}
return r ;
}
}
static void
static void
@ -145,55 +69,136 @@ notify_stopped(void) {
SDL_PushEvent ( & stop_event ) ;
SDL_PushEvent ( & stop_event ) ;
}
}
static int
static bool
run_stream ( void * data ) {
process_config_packet ( struct stream * stream , AVPacket * packet ) {
struct stream * stream = data ;
if ( stream - > recorder & & ! recorder_write ( stream - > recorder , packet ) ) {
LOGE ( " Could not send config packet to recorder " ) ;
return false ;
}
return true ;
}
AVFormatContext * format_ctx = avformat_alloc_context ( ) ;
static bool
if ( ! format_ctx ) {
process_frame ( struct stream * stream , AVPacket * packet ) {
LOGC ( " Could not allocate format context " ) ;
if ( stream - > decoder & & ! decoder_push ( stream - > decoder , packet ) ) {
goto end ;
return false ;
}
if ( stream - > recorder ) {
packet - > dts = packet - > pts ;
if ( ! recorder_write ( stream - > recorder , packet ) ) {
LOGE ( " Could not write frame to output file " ) ;
return false ;
}
}
}
unsigned char * buffer = av_malloc ( BUFSIZE ) ;
return true ;
if ( ! buffer ) {
}
LOGC ( " Could not allocate buffer " ) ;
goto finally_free_format_ctx ;
static bool
stream_parse ( struct stream * stream , AVPacket * packet ) {
uint8_t * in_data = packet - > data ;
int in_len = packet - > size ;
uint8_t * out_data = NULL ;
int out_len = 0 ;
int r = av_parser_parse2 ( stream - > parser , stream - > codec_ctx ,
& out_data , & out_len , in_data , in_len ,
AV_NOPTS_VALUE , AV_NOPTS_VALUE , - 1 ) ;
// PARSER_FLAG_COMPLETE_FRAMES is set
SDL_assert ( r = = in_len ) ;
SDL_assert ( out_len = = in_len ) ;
if ( stream - > parser - > key_frame = = 1 ) {
packet - > flags | = AV_PKT_FLAG_KEY ;
}
}
// initialize the receiver state
bool ok = process_frame ( stream , packet ) ;
stream - > receiver_state . frame_meta_queue = NULL ;
if ( ! ok ) {
stream - > receiver_state . remaining = 0 ;
LOGE ( " Could not process frame " ) ;
return false ;
// if recording is enabled, a "header" is sent between raw packets
int ( * read_packet ) ( void * , uint8_t * , int ) =
stream - > recorder ? read_packet_with_meta : read_raw_packet ;
AVIOContext * avio_ctx = avio_alloc_context ( buffer , BUFSIZE , 0 , stream ,
read_packet , NULL , NULL ) ;
if ( ! avio_ctx ) {
LOGC ( " Could not allocate avio context " ) ;
// avformat_open_input takes ownership of 'buffer'
// so only free the buffer before avformat_open_input()
av_free ( buffer ) ;
goto finally_free_format_ctx ;
}
}
format_ctx - > pb = avio_ctx ;
return true ;
}
if ( avformat_open_input ( & format_ctx , NULL , NULL , NULL ) < 0 ) {
static bool
LOGE ( " Could not open video stream " ) ;
stream_push_packet ( struct stream * stream , AVPacket * packet ) {
goto finally_free_avio_ctx ;
bool is_config = packet - > pts = = AV_NOPTS_VALUE ;
// A config packet must not be decoded immetiately (it contains no
// frame); instead, it must be concatenated with the future data packet.
if ( stream - > has_pending | | is_config ) {
size_t offset ;
if ( stream - > has_pending ) {
offset = stream - > pending . size ;
if ( av_grow_packet ( & stream - > pending , packet - > size ) ) {
LOGE ( " Could not grow packet " ) ;
return false ;
}
} else {
offset = 0 ;
if ( av_new_packet ( & stream - > pending , packet - > size ) ) {
LOGE ( " Could not create packet " ) ;
return false ;
}
stream - > has_pending = true ;
}
memcpy ( stream - > pending . data + offset , packet - > data , packet - > size ) ;
if ( ! is_config ) {
// prepare the concat packet to send to the decoder
stream - > pending . pts = packet - > pts ;
stream - > pending . dts = packet - > dts ;
stream - > pending . flags = packet - > flags ;
packet = & stream - > pending ;
}
}
if ( is_config ) {
// config packet
bool ok = process_config_packet ( stream , packet ) ;
if ( ! ok ) {
return false ;
}
} else {
// data packet
bool ok = stream_parse ( stream , packet ) ;
if ( stream - > has_pending ) {
// the pending packet must be discarded (consumed or error)
stream - > has_pending = false ;
av_packet_unref ( & stream - > pending ) ;
}
}
if ( ! ok ) {
return false ;
}
}
return true ;
}
static int
run_stream ( void * data ) {
struct stream * stream = data ;
AVCodec * codec = avcodec_find_decoder ( AV_CODEC_ID_H264 ) ;
AVCodec * codec = avcodec_find_decoder ( AV_CODEC_ID_H264 ) ;
if ( ! codec ) {
if ( ! codec ) {
LOGE ( " H.264 decoder not found " ) ;
LOGE ( " H.264 decoder not found " ) ;
goto end ;
goto end ;
}
}
stream - > codec_ctx = avcodec_alloc_context3 ( codec ) ;
if ( ! stream - > codec_ctx ) {
LOGC ( " Could not allocate codec context " ) ;
goto end ;
}
if ( stream - > decoder & & ! decoder_open ( stream - > decoder , codec ) ) {
if ( stream - > decoder & & ! decoder_open ( stream - > decoder , codec ) ) {
LOGE ( " Could not open decoder " ) ;
LOGE ( " Could not open decoder " ) ;
goto finally_close_input ;
goto finally_ free_codec_ctx ;
}
}
if ( stream - > recorder & & ! recorder_open ( stream - > recorder , codec ) ) {
if ( stream - > recorder & & ! recorder_open ( stream - > recorder , codec ) ) {
@ -201,50 +206,40 @@ run_stream(void *data) {
goto finally_close_decoder ;
goto finally_close_decoder ;
}
}
AVPacket packet ;
stream - > parser = av_parser_init ( AV_CODEC_ID_H264 ) ;
av_init_packet ( & packet ) ;
if ( ! stream - > parser ) {
packet . data = NULL ;
LOGE ( " Could not initialize parser " ) ;
packet . size = 0 ;
goto finally_close_recorder ;
while ( ! av_read_frame ( format_ctx , & packet ) ) {
if ( SDL_AtomicGet ( & stream - > stopped ) ) {
// if the stream is stopped, the socket had been shutdown, so the
// last packet is probably corrupted (but not detected as such by
// FFmpeg) and will not be decoded correctly
av_packet_unref ( & packet ) ;
goto quit ;
}
if ( stream - > decoder & & ! decoder_push ( stream - > decoder , & packet ) ) {
av_packet_unref ( & packet ) ;
goto quit ;
}
}
if ( stream - > recorder ) {
// We must only pass complete frames to av_parser_parse2()!
// we retrieve the PTS in order they were received, so they will
// It's more complicated, but this allows to reduce the latency by 1 frame!
// be assigned to the correct frame
stream - > parser - > flags | = PARSER_FLAG_COMPLETE_FRAMES ;
uint64_t pts = receiver_state_take_meta ( & stream - > receiver_state ) ;
packet . pts = pts ;
for ( ; ; ) {
packet . dts = pts ;
AVPacket packet ;
bool ok = stream_recv_packet ( stream , & packet ) ;
// no need to rescale with av_packet_rescale_ts(), the timestamps
if ( ! ok ) {
// are in microseconds both in input and output
// end of stream
if ( ! recorder_write ( stream - > recorder , & packet ) ) {
break ;
LOGE ( " Could not write frame to output file " ) ;
av_packet_unref ( & packet ) ;
goto quit ;
}
}
}
ok = stream_push_packet ( stream , & packet ) ;
av_packet_unref ( & packet ) ;
av_packet_unref ( & packet ) ;
if ( ! ok ) {
if ( avio_ctx - > eof_reached ) {
// cannot process packet (error already logged)
break ;
break ;
}
}
}
}
LOGD ( " End of frames " ) ;
LOGD ( " End of frames " ) ;
quit :
if ( stream - > has_pending ) {
av_packet_unref ( & stream - > pending ) ;
}
av_parser_close ( stream - > parser ) ;
finally_close_recorder :
if ( stream - > recorder ) {
if ( stream - > recorder ) {
recorder_close ( stream - > recorder ) ;
recorder_close ( stream - > recorder ) ;
}
}
@ -252,13 +247,8 @@ finally_close_decoder:
if ( stream - > decoder ) {
if ( stream - > decoder ) {
decoder_close ( stream - > decoder ) ;
decoder_close ( stream - > decoder ) ;
}
}
finally_close_input :
finally_free_codec_ctx :
avformat_close_input ( & format_ctx ) ;
avcodec_free_context ( & stream - > codec_ctx ) ;
finally_free_avio_ctx :
av_free ( avio_ctx - > buffer ) ;
av_free ( avio_ctx ) ;
finally_free_format_ctx :
avformat_free_context ( format_ctx ) ;
end :
end :
notify_stopped ( ) ;
notify_stopped ( ) ;
return 0 ;
return 0 ;
@ -270,7 +260,7 @@ stream_init(struct stream *stream, socket_t socket,
stream - > socket = socket ;
stream - > socket = socket ;
stream - > decoder = decoder ,
stream - > decoder = decoder ,
stream - > recorder = recorder ;
stream - > recorder = recorder ;
SDL_AtomicSet ( & stream - > stopped , 0 ) ;
stream - > has_pending = false ;
}
}
bool
bool
@ -287,7 +277,6 @@ stream_start(struct stream *stream) {
void
void
stream_stop ( struct stream * stream ) {
stream_stop ( struct stream * stream ) {
SDL_AtomicSet ( & stream - > stopped , 1 ) ;
if ( stream - > decoder ) {
if ( stream - > decoder ) {
decoder_interrupt ( stream - > decoder ) ;
decoder_interrupt ( stream - > decoder ) ;
}
}