protect queue access with mutex

This commit is contained in:
Michel Promonet 2015-03-30 20:09:24 +00:00
parent c01325cabe
commit 99b4c42a07
2 changed files with 11 additions and 6 deletions

View File

@ -98,6 +98,7 @@ class V4L2DeviceSource: public FramedSource
V4l2Capture * m_device; V4l2Capture * m_device;
unsigned int m_queueSize; unsigned int m_queueSize;
pthread_t m_thid; pthread_t m_thid;
pthread_mutex_t m_mutex;
std::string m_auxLine; std::string m_auxLine;
}; };

View File

@ -63,10 +63,12 @@ V4L2DeviceSource::V4L2DeviceSource(UsageEnvironment& env, V4L2DeviceParameters p
{ {
m_eventTriggerId = envir().taskScheduler().createEventTrigger(V4L2DeviceSource::deliverFrameStub); m_eventTriggerId = envir().taskScheduler().createEventTrigger(V4L2DeviceSource::deliverFrameStub);
memset(&m_thid, 0, sizeof(m_thid)); memset(&m_thid, 0, sizeof(m_thid));
memset(&m_mutex, 0, sizeof(m_mutex));
if (m_device) if (m_device)
{ {
if (useThread) if (useThread)
{ {
pthread_mutex_init(&m_mutex, NULL);
pthread_create(&m_thid, NULL, threadStub, this); pthread_create(&m_thid, NULL, threadStub, this);
} }
else else
@ -82,6 +84,7 @@ V4L2DeviceSource::~V4L2DeviceSource()
envir().taskScheduler().deleteEventTrigger(m_eventTriggerId); envir().taskScheduler().deleteEventTrigger(m_eventTriggerId);
m_device->captureStop(); m_device->captureStop();
pthread_join(m_thid, NULL); pthread_join(m_thid, NULL);
pthread_mutex_destroy(&m_mutex);
} }
// thread mainloop // thread mainloop
@ -123,10 +126,7 @@ void* V4L2DeviceSource::thread()
// getting FrameSource callback // getting FrameSource callback
void V4L2DeviceSource::doGetNextFrame() void V4L2DeviceSource::doGetNextFrame()
{ {
if (!m_captureQueue.empty())
{
deliverFrame(); deliverFrame();
}
} }
// stopping FrameSource callback // stopping FrameSource callback
@ -144,6 +144,7 @@ void V4L2DeviceSource::deliverFrame()
fDurationInMicroseconds = 0; fDurationInMicroseconds = 0;
fFrameSize = 0; fFrameSize = 0;
pthread_mutex_lock (&m_mutex);
if (m_captureQueue.empty()) if (m_captureQueue.empty())
{ {
LOG(DEBUG) << "Queue is empty"; LOG(DEBUG) << "Queue is empty";
@ -172,6 +173,7 @@ void V4L2DeviceSource::deliverFrame()
memcpy(fTo, frame->m_buffer, fFrameSize); memcpy(fTo, frame->m_buffer, fFrameSize);
delete frame; delete frame;
} }
pthread_mutex_unlock (&m_mutex);
// send Frame to the consumer // send Frame to the consumer
FramedSource::afterGetting(this); FramedSource::afterGetting(this);
@ -203,7 +205,7 @@ int V4L2DeviceSource::getNextFrame()
timeval diff; timeval diff;
timersub(&tv,&ref,&diff); timersub(&tv,&ref,&diff);
m_in.notify(tv.tv_sec, frameSize); m_in.notify(tv.tv_sec, frameSize);
LOG(DEBUG) << "getNextFrame\ttimestamp:" << ref.tv_sec << "." << ref.tv_usec << "\tsize:" << frameSize <<"\tdiff:" << (diff.tv_sec*1000+diff.tv_usec/1000) << "ms\tqueue:" << m_captureQueue.size(); LOG(DEBUG) << "getNextFrame\ttimestamp:" << ref.tv_sec << "." << ref.tv_usec << "\tsize:" << frameSize <<"\tdiff:" << (diff.tv_sec*1000+diff.tv_usec/1000) << "ms";
processFrame(buffer,frameSize,ref); processFrame(buffer,frameSize,ref);
} }
return frameSize; return frameSize;
@ -226,7 +228,7 @@ void V4L2DeviceSource::processFrame(char * frame, int frameSize, const timeval &
memcpy(buf, frame.first, size); memcpy(buf, frame.first, size);
queueFrame(buf,size,ref); queueFrame(buf,size,ref);
LOG(DEBUG) << "queueFrame\ttimestamp:" << ref.tv_sec << "." << ref.tv_usec << "\tsize:" << size <<"\tdiff:" << (diff.tv_sec*1000+diff.tv_usec/1000) << "ms\tqueue:" << m_captureQueue.size(); LOG(DEBUG) << "queueFrame\ttimestamp:" << ref.tv_sec << "." << ref.tv_usec << "\tsize:" << size <<"\tdiff:" << (diff.tv_sec*1000+diff.tv_usec/1000) << "ms";
if (m_outfd != -1) write(m_outfd, buf, size); if (m_outfd != -1) write(m_outfd, buf, size);
frameList.pop_front(); frameList.pop_front();
@ -236,6 +238,7 @@ void V4L2DeviceSource::processFrame(char * frame, int frameSize, const timeval &
// post a frame to fifo // post a frame to fifo
void V4L2DeviceSource::queueFrame(char * frame, int frameSize, const timeval &tv) void V4L2DeviceSource::queueFrame(char * frame, int frameSize, const timeval &tv)
{ {
pthread_mutex_lock (&m_mutex);
while (m_captureQueue.size() >= m_queueSize) while (m_captureQueue.size() >= m_queueSize)
{ {
LOG(DEBUG) << "Queue full size drop frame size:" << (int)m_captureQueue.size() ; LOG(DEBUG) << "Queue full size drop frame size:" << (int)m_captureQueue.size() ;
@ -243,6 +246,7 @@ void V4L2DeviceSource::queueFrame(char * frame, int frameSize, const timeval &tv
m_captureQueue.pop_front(); m_captureQueue.pop_front();
} }
m_captureQueue.push_back(new Frame(frame, frameSize, tv)); m_captureQueue.push_back(new Frame(frame, frameSize, tv));
pthread_mutex_unlock (&m_mutex);
// post an event to ask to deliver the frame // post an event to ask to deliver the frame
envir().taskScheduler().triggerEvent(m_eventTriggerId, this); envir().taskScheduler().triggerEvent(m_eventTriggerId, this);