From 41ef22cf09d093bb23634b0f89fe9467a95710e1 Mon Sep 17 00:00:00 2001 From: orignal Date: Fri, 10 Jan 2014 20:21:38 -0500 Subject: [PATCH] stream recieve queue --- Queue.h | 23 +++++++++++++++++-- Streaming.cpp | 63 +++++++++++++++++++++++++++++++++++++++++++-------- Streaming.h | 20 +++++++++++++--- 3 files changed, 92 insertions(+), 14 deletions(-) diff --git a/Queue.h b/Queue.h index 31e8e0a4..ae60f099 100644 --- a/Queue.h +++ b/Queue.h @@ -45,6 +45,18 @@ namespace util } return el; } + + bool Wait (int sec, int usec) + { + std::unique_lock l(m_QueueMutex); + return m_NonEmpty.wait_for (l, std::chrono::seconds (sec) + std::chrono::milliseconds (usec)) != std::cv_status::timeout; + } + + bool IsEmpty () + { + std::unique_lock l(m_QueueMutex); + return m_Queue.empty (); + } void WakeUp () { m_NonEmpty.notify_one (); }; @@ -54,14 +66,21 @@ namespace util return GetNonThreadSafe (); } + Element * Peek () + { + std::unique_lock l(m_QueueMutex); + return GetNonThreadSafe (true); + } + private: - Element * GetNonThreadSafe () + Element * GetNonThreadSafe (bool peek = false) { if (!m_Queue.empty ()) { Element * el = m_Queue.front (); - m_Queue.pop (); + if (!peek) + m_Queue.pop (); return el; } return nullptr; diff --git a/Streaming.cpp b/Streaming.cpp index fc746ebc..d0df63de 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -1,5 +1,6 @@ #include "I2PEndian.h" #include +#include #include #include "Log.h" #include "RouterInfo.h" @@ -20,9 +21,15 @@ namespace stream m_RecvStreamID = i2p::context.GetRandomNumberGenerator ().GenerateWord32 (); } - void Stream::HandleNextPacket (const uint8_t * buf, size_t len) + Stream::~Stream () { - const uint8_t * end = buf + len; + while (auto packet = m_ReceiveQueue.Get ()) + delete packet; + } + + void Stream::HandleNextPacket (Packet * packet) + { + const uint8_t * end = packet->buf + packet->len, * buf = packet->buf; buf += 4; // sendStreamID if (!m_SendStreamID) m_SendStreamID = be32toh (*(uint32_t *)buf); @@ -61,6 +68,9 @@ namespace stream // we have reached payload section std::string str((const char *)buf, end-buf); LogPrint ("Payload: ", str); + + packet->offset = buf - packet->buf; + m_ReceiveQueue.Put (packet); } size_t Stream::Send (uint8_t * buf, size_t len, int timeout) @@ -105,6 +115,37 @@ namespace stream DeleteI2NPMessage (msg); return len; } + + size_t Stream::Receive (uint8_t * buf, size_t len, int timeout) + { + if (m_ReceiveQueue.IsEmpty ()) + { + if (!m_ReceiveQueue.Wait (timeout, 0)) + return 0; + } + + // either non-empty or we have received empty + size_t pos = 0; + while (pos < len) + { + Packet * packet = m_ReceiveQueue.Peek (); + if (packet) + { + size_t l = std::min (packet->GetLength (), len - pos); + memcpy (buf + pos, packet->GetBuffer (), l); + pos += l; + packet->offset += l; + if (!packet->GetLength ()) + { + m_ReceiveQueue.Get (); + delete packet; + } + } + else // no more data available + break; + } + return pos; + } StreamingDestination * sharedLocalDestination = nullptr; @@ -124,14 +165,17 @@ namespace stream DeleteI2NPMessage (m_LeaseSet); } - void StreamingDestination::HandleNextPacket (const uint8_t * buf, size_t len) + void StreamingDestination::HandleNextPacket (Packet * packet) { - uint32_t sendStreamID = be32toh (*(uint32_t *)(buf)); + uint32_t sendStreamID = be32toh (*(uint32_t *)(packet->buf)); auto it = m_Streams.find (sendStreamID); if (it != m_Streams.end ()) - it->second->HandleNextPacket (buf, len); + it->second->HandleNextPacket (packet); else + { LogPrint ("Unknown stream ", sendStreamID); + delete packet; + } } Stream * StreamingDestination::CreateNewStream (const i2p::data::LeaseSet * remote) @@ -232,13 +276,14 @@ namespace stream CryptoPP::Gunzip decompressor; decompressor.Put (buf, length); decompressor.MessageEnd(); - uint8_t uncompressed[2048]; - int uncompressedSize = decompressor.MaxRetrievable (); - decompressor.Get (uncompressed, uncompressedSize); + Packet * uncompressed = new Packet; + uncompressed->offset = 0; + uncompressed->len = decompressor.MaxRetrievable (); + decompressor.Get (uncompressed->buf, uncompressed->len); // then forward to streaming engine // TODO: we have onle one destination, might be more if (sharedLocalDestination) - sharedLocalDestination->HandleNextPacket (uncompressed, uncompressedSize); + sharedLocalDestination->HandleNextPacket (uncompressed); } else LogPrint ("Data: protocol ", buf[9], " is not supported"); diff --git a/Streaming.h b/Streaming.h index 26e84e3a..5f0ab5f4 100644 --- a/Streaming.h +++ b/Streaming.h @@ -4,6 +4,7 @@ #include #include #include +#include "Queue.h" #include "Identity.h" #include "LeaseSet.h" #include "I2NPProtocol.h" @@ -25,26 +26,39 @@ namespace stream const uint16_t PACKET_FLAG_NO_ACK = 0x0400; const size_t STREAMING_MTU = 1730; - + + struct Packet + { + uint8_t buf[STREAMING_MTU]; + size_t len, offset; + + Packet (): len (0), offset (0) {}; + uint8_t * GetBuffer () { return buf + offset; }; + size_t GetLength () const { return len - offset; }; + }; + class StreamingDestination; class Stream { public: Stream (StreamingDestination * local, const i2p::data::LeaseSet * remote); + ~Stream (); uint32_t GetSendStreamID () const { return m_SendStreamID; }; uint32_t GetRecvStreamID () const { return m_RecvStreamID; }; const i2p::data::LeaseSet * GetRemoteLeaseSet () const { return m_RemoteLeaseSet; }; bool IsEstablished () const { return m_SendStreamID; }; - void HandleNextPacket (const uint8_t * buf, size_t len); + void HandleNextPacket (Packet * packet); size_t Send (uint8_t * buf, size_t len, int timeout); // timeout in seconds + size_t Receive (uint8_t * buf, size_t len, int timeout); // returns 0 if timeout expired private: uint32_t m_SendStreamID, m_RecvStreamID, m_SequenceNumber; StreamingDestination * m_LocalDestination; const i2p::data::LeaseSet * m_RemoteLeaseSet; + i2p::util::Queue m_ReceiveQueue; }; class StreamingDestination @@ -61,7 +75,7 @@ namespace stream Stream * CreateNewStream (const i2p::data::LeaseSet * remote); void DeleteStream (Stream * stream); - void HandleNextPacket (const uint8_t * buf, size_t len); + void HandleNextPacket (Packet * packet); private: