From 4d97b0e20635ccb8b5fbffc5e5a4847b31972ce6 Mon Sep 17 00:00:00 2001 From: orignal Date: Wed, 22 Oct 2014 11:46:54 -0400 Subject: [PATCH] moved StreamingDestination inside ClientDestination --- ClientContext.cpp | 22 +++++----- ClientContext.h | 16 ++++---- Destination.cpp | 91 ++++------------------------------------- Destination.h | 52 ++--------------------- HTTPServer.cpp | 6 +-- I2PTunnel.cpp | 12 +++--- I2PTunnel.h | 13 +++--- SAM.cpp | 16 ++++---- SAM.h | 3 +- SOCKS.cpp | 2 +- Streaming.cpp | 102 ++++++++++++++++++++++++++++++++++++++++------ Streaming.h | 41 +++++++++++++++++++ 12 files changed, 187 insertions(+), 189 deletions(-) diff --git a/ClientContext.cpp b/ClientContext.cpp index aa8e8c95..5937e8f8 100644 --- a/ClientContext.cpp +++ b/ClientContext.cpp @@ -27,7 +27,7 @@ namespace client { if (!m_SharedLocalDestination) { - m_SharedLocalDestination = new i2p::stream::StreamingDestination (false, i2p::data::SIGNING_KEY_TYPE_DSA_SHA1); // non-public, DSA + m_SharedLocalDestination = new ClientDestination (false, i2p::data::SIGNING_KEY_TYPE_DSA_SHA1); // non-public, DSA m_Destinations[m_SharedLocalDestination->GetIdentity ().GetIdentHash ()] = m_SharedLocalDestination; m_SharedLocalDestination->Start (); } @@ -41,7 +41,7 @@ namespace client std::string ircDestination = i2p::util::config::GetArg("-ircdest", ""); if (ircDestination.length () > 0) // ircdest is presented { - i2p::stream::StreamingDestination * localDestination = nullptr; + ClientDestination * localDestination = nullptr; std::string ircKeys = i2p::util::config::GetArg("-irckeys", ""); if (ircKeys.length () > 0) localDestination = i2p::client::context.LoadLocalDestination (ircKeys, false); @@ -125,7 +125,7 @@ namespace client #else it->path(); #endif - auto localDestination = new i2p::stream::StreamingDestination (fullPath, true); + auto localDestination = new ClientDestination (fullPath, true); m_Destinations[localDestination->GetIdentHash ()] = localDestination; numDestinations++; } @@ -134,25 +134,25 @@ namespace client LogPrint (numDestinations, " local destinations loaded"); } - i2p::stream::StreamingDestination * ClientContext::LoadLocalDestination (const std::string& filename, bool isPublic) + ClientDestination * ClientContext::LoadLocalDestination (const std::string& filename, bool isPublic) { - auto localDestination = new i2p::stream::StreamingDestination (i2p::util::filesystem::GetFullPath (filename), isPublic); + auto localDestination = new ClientDestination (i2p::util::filesystem::GetFullPath (filename), isPublic); std::unique_lock l(m_DestinationsMutex); m_Destinations[localDestination->GetIdentHash ()] = localDestination; localDestination->Start (); return localDestination; } - i2p::stream::StreamingDestination * ClientContext::CreateNewLocalDestination (bool isPublic, i2p::data::SigningKeyType sigType) + ClientDestination * ClientContext::CreateNewLocalDestination (bool isPublic, i2p::data::SigningKeyType sigType) { - auto localDestination = new i2p::stream::StreamingDestination (isPublic, sigType); + auto localDestination = new ClientDestination (isPublic, sigType); std::unique_lock l(m_DestinationsMutex); m_Destinations[localDestination->GetIdentHash ()] = localDestination; localDestination->Start (); return localDestination; } - void ClientContext::DeleteLocalDestination (i2p::stream::StreamingDestination * destination) + void ClientContext::DeleteLocalDestination (ClientDestination * destination) { if (!destination) return; auto it = m_Destinations.find (destination->GetIdentHash ()); @@ -168,7 +168,7 @@ namespace client } } - i2p::stream::StreamingDestination * ClientContext::CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic) + ClientDestination * ClientContext::CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic) { auto it = m_Destinations.find (keys.GetPublic ().GetIdentHash ()); if (it != m_Destinations.end ()) @@ -181,14 +181,14 @@ namespace client } return nullptr; } - auto localDestination = new i2p::stream::StreamingDestination (keys, isPublic); + auto localDestination = new ClientDestination (keys, isPublic); std::unique_lock l(m_DestinationsMutex); m_Destinations[keys.GetPublic ().GetIdentHash ()] = localDestination; localDestination->Start (); return localDestination; } - i2p::stream::StreamingDestination * ClientContext::FindLocalDestination (const i2p::data::IdentHash& destination) const + ClientDestination * ClientContext::FindLocalDestination (const i2p::data::IdentHash& destination) const { auto it = m_Destinations.find (destination); if (it != m_Destinations.end ()) diff --git a/ClientContext.h b/ClientContext.h index 8fbef267..58c2cf36 100644 --- a/ClientContext.h +++ b/ClientContext.h @@ -22,12 +22,12 @@ namespace client void Start (); void Stop (); - i2p::stream::StreamingDestination * GetSharedLocalDestination () const { return m_SharedLocalDestination; }; - i2p::stream::StreamingDestination * CreateNewLocalDestination (bool isPublic = true, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_DSA_SHA1); // transient - i2p::stream::StreamingDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true); - void DeleteLocalDestination (i2p::stream::StreamingDestination * destination); - i2p::stream::StreamingDestination * FindLocalDestination (const i2p::data::IdentHash& destination) const; - i2p::stream::StreamingDestination * LoadLocalDestination (const std::string& filename, bool isPublic); + ClientDestination * GetSharedLocalDestination () const { return m_SharedLocalDestination; }; + ClientDestination * CreateNewLocalDestination (bool isPublic = true, i2p::data::SigningKeyType sigType = i2p::data::SIGNING_KEY_TYPE_DSA_SHA1); // transient + ClientDestination * CreateNewLocalDestination (const i2p::data::PrivateKeys& keys, bool isPublic = true); + void DeleteLocalDestination (ClientDestination * destination); + ClientDestination * FindLocalDestination (const i2p::data::IdentHash& destination) const; + ClientDestination * LoadLocalDestination (const std::string& filename, bool isPublic); private: @@ -36,8 +36,8 @@ namespace client private: std::mutex m_DestinationsMutex; - std::map m_Destinations; - i2p::stream::StreamingDestination * m_SharedLocalDestination; + std::map m_Destinations; + ClientDestination * m_SharedLocalDestination; i2p::proxy::HTTPProxy * m_HttpProxy; i2p::proxy::SOCKSProxy * m_SocksProxy; diff --git a/Destination.cpp b/Destination.cpp index da877f53..254f9829 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -21,6 +21,7 @@ namespace client m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel if (m_IsPublic) LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created"); + m_StreamingDestination = new i2p::stream::StreamingDestination (*this); // TODO: } ClientDestination::ClientDestination (const std::string& fullPath, bool isPublic): @@ -56,6 +57,7 @@ namespace client CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); dh.GenerateKeyPair(i2p::context.GetRandomNumberGenerator (), m_EncryptionPrivateKey, m_EncryptionPublicKey); m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel + m_StreamingDestination = new i2p::stream::StreamingDestination (*this); // TODO: } ClientDestination::ClientDestination (const i2p::data::PrivateKeys& keys, bool isPublic): @@ -67,6 +69,7 @@ namespace client m_Pool = i2p::tunnel::tunnels.CreateTunnelPool (*this, 3); // 3-hops tunnel if (m_IsPublic) LogPrint ("Local address ", GetIdentHash ().ToBase32 (), ".b32.i2p created"); + m_StreamingDestination = new i2p::stream::StreamingDestination (*this); // TODO: } ClientDestination::~ClientDestination () @@ -79,6 +82,7 @@ namespace client delete m_LeaseSet; delete m_Work; delete m_Service; + delete m_StreamingDestination; // TODO } void ClientDestination::Run () @@ -94,10 +98,12 @@ namespace client m_Pool->SetActive (true); m_IsRunning = true; m_Thread = new std::thread (std::bind (&ClientDestination::Run, this)); + m_StreamingDestination->Start (); } void ClientDestination::Stop () { + m_StreamingDestination->Stop (); if (m_Pool) i2p::tunnel::tunnels.StopTunnelPool (m_Pool); m_IsRunning = false; @@ -250,7 +256,7 @@ namespace client if (uncompressed->len <= i2p::stream::MAX_PACKET_SIZE) { decompressor.Get (uncompressed->buf, uncompressed->len); - HandleNextPacket (uncompressed); + m_StreamingDestination->HandleNextPacket (uncompressed); } else { @@ -286,87 +292,4 @@ namespace client return msg; } } - -namespace stream -{ - - void StreamingDestination::Start () - { - ClientDestination::Start (); - } - - void StreamingDestination::Stop () - { - ResetAcceptor (); - { - std::unique_lock l(m_StreamsMutex); - for (auto it: m_Streams) - delete it.second; - m_Streams.clear (); - } - ClientDestination::Stop (); - } - - - void StreamingDestination::HandleNextPacket (Packet * packet) - { - uint32_t sendStreamID = packet->GetSendStreamID (); - if (sendStreamID) - { - auto it = m_Streams.find (sendStreamID); - if (it != m_Streams.end ()) - it->second->HandleNextPacket (packet); - else - { - LogPrint ("Unknown stream ", sendStreamID); - delete packet; - } - } - else // new incoming stream - { - auto incomingStream = CreateNewIncomingStream (); - incomingStream->HandleNextPacket (packet); - if (m_Acceptor != nullptr) - m_Acceptor (incomingStream); - else - { - LogPrint ("Acceptor for incoming stream is not set"); - DeleteStream (incomingStream); - } - } - } - - Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote) - { - Stream * s = new Stream (*GetService (), *this, remote); - std::unique_lock l(m_StreamsMutex); - m_Streams[s->GetRecvStreamID ()] = s; - return s; - } - - Stream * StreamingDestination::CreateNewIncomingStream () - { - Stream * s = new Stream (*GetService (), *this); - std::unique_lock l(m_StreamsMutex); - m_Streams[s->GetRecvStreamID ()] = s; - return s; - } - - void StreamingDestination::DeleteStream (Stream * stream) - { - if (stream) - { - std::unique_lock l(m_StreamsMutex); - auto it = m_Streams.find (stream->GetRecvStreamID ()); - if (it != m_Streams.end ()) - { - m_Streams.erase (it); - if (GetService ()) - GetService ()->post ([stream](void) { delete stream; }); - else - delete stream; - } - } - } -} } diff --git a/Destination.h b/Destination.h index 6b41b231..c7f0b981 100644 --- a/Destination.h +++ b/Destination.h @@ -28,6 +28,7 @@ namespace client bool IsRunning () const { return m_IsRunning; }; boost::asio::io_service * GetService () { return m_Service; }; i2p::tunnel::TunnelPool * GetTunnelPool () { return m_Pool; }; + i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; }; bool IsReady () const { return m_LeaseSet && m_LeaseSet->HasNonExpiredLeases (); }; void ResetCurrentOutboundTunnel () { m_CurrentOutboundTunnel = nullptr; }; @@ -52,10 +53,6 @@ namespace client void HandleDataMessage (const uint8_t * buf, size_t len); I2NPMessage * CreateDataMessage (const uint8_t * payload, size_t len); - protected: - - virtual void HandleNextPacket (i2p::stream::Packet * packet) = 0; // TODO - private: void Run (); @@ -77,55 +74,14 @@ namespace client i2p::data::LeaseSet * m_LeaseSet; bool m_IsPublic; + i2p::stream::StreamingDestination * m_StreamingDestination; + public: // for HTTP only int GetNumRemoteLeaseSets () const { return m_RemoteLeaseSets.size (); }; }; -} - -namespace stream -{ - class StreamingDestination: public i2p::client::ClientDestination - { - public: - - StreamingDestination (bool isPublic, i2p::data::SigningKeyType sigType): - ClientDestination (isPublic, sigType) {}; - StreamingDestination (const std::string& fullPath, bool isPublic): - ClientDestination (fullPath, isPublic) {}; - StreamingDestination (const i2p::data::PrivateKeys& keys, bool isPublic): - ClientDestination (keys, isPublic) {}; - ~StreamingDestination () {}; - - void Start (); - void Stop (); - - Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote); - void DeleteStream (Stream * stream); - void SetAcceptor (const std::function& acceptor) { m_Acceptor = acceptor; }; - void ResetAcceptor () { m_Acceptor = nullptr; }; - bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; - - // ClientDestination - void HandleNextPacket (Packet * packet); - - private: - - Stream * CreateNewIncomingStream (); - - private: - - std::mutex m_StreamsMutex; - std::map m_Streams; - std::function m_Acceptor; - - public: - - // for HTTP only - const decltype(m_Streams)& GetStreams () const { return m_Streams; }; - }; -} +} } #endif diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 06ed3e74..78e3bd38 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -517,7 +517,7 @@ namespace util if (m_Stream) { m_Stream->Close (); - i2p::client::context.GetSharedLocalDestination ()->DeleteStream (m_Stream); + i2p::client::context.GetSharedLocalDestination ()->GetStreamingDestination ()->DeleteStream (m_Stream); m_Stream = nullptr; } m_Socket->close (); @@ -813,7 +813,7 @@ namespace util } } s << "
Streams:
"; - for (auto it: dest->GetStreams ()) + for (auto it: dest->GetStreamingDestination ()->GetStreams ()) { s << it.first << "->" << it.second->GetRemoteIdentity ().GetIdentHash ().ToBase32 () << ".b32.i2p "; s << " [" << it.second->GetNumSentBytes () << ":" << it.second->GetNumReceivedBytes () << "]"; @@ -880,7 +880,7 @@ namespace util void HTTPConnection::SendToDestination (const i2p::data::LeaseSet * remote, const char * buf, size_t len) { if (!m_Stream) - m_Stream = i2p::client::context.GetSharedLocalDestination ()->CreateNewOutgoingStream (*remote); + m_Stream = i2p::client::context.GetSharedLocalDestination ()->GetStreamingDestination ()->CreateNewOutgoingStream (*remote); if (m_Stream) { m_Stream->Send ((uint8_t *)buf, len); diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index 3c8b9234..9569a25e 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -14,7 +14,7 @@ namespace client boost::asio::ip::tcp::socket * socket, const i2p::data::LeaseSet * leaseSet): m_Socket (socket), m_Owner (owner) { - m_Stream = m_Owner->GetLocalDestination ()->CreateNewOutgoingStream (*leaseSet); + m_Stream = m_Owner->GetLocalDestination ()->GetStreamingDestination ()->CreateNewOutgoingStream (*leaseSet); m_Stream->Send (m_Buffer, 0); // connect StreamReceive (); Receive (); @@ -39,7 +39,7 @@ namespace client if (m_Stream) { m_Stream->Close (); - m_Owner->GetLocalDestination ()->DeleteStream (m_Stream); + m_Owner->GetLocalDestination ()->GetStreamingDestination ()->DeleteStream (m_Stream); m_Stream = nullptr; } m_Socket->close (); @@ -115,7 +115,7 @@ namespace client if (ecode != boost::asio::error::operation_aborted) { if (m_Stream) m_Stream->Close (); - m_Owner->GetLocalDestination ()->DeleteStream (m_Stream); + m_Owner->GetLocalDestination ()->GetStreamingDestination ()->DeleteStream (m_Stream); m_Stream = nullptr; } } @@ -145,7 +145,7 @@ namespace client } I2PClientTunnel::I2PClientTunnel (boost::asio::io_service& service, const std::string& destination, - int port, i2p::stream::StreamingDestination * localDestination): + int port, ClientDestination * localDestination): I2PTunnel (service, localDestination ? localDestination : i2p::client::context.CreateNewLocalDestination (false, i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256)), m_Acceptor (service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port)), @@ -251,7 +251,7 @@ namespace client } I2PServerTunnel::I2PServerTunnel (boost::asio::io_service& service, const std::string& address, int port, - i2p::stream::StreamingDestination * localDestination): I2PTunnel (service, localDestination), + ClientDestination * localDestination): I2PTunnel (service, localDestination), m_Endpoint (boost::asio::ip::address::from_string (address), port) { } @@ -270,7 +270,7 @@ namespace client { auto localDestination = GetLocalDestination (); if (localDestination) - localDestination->SetAcceptor (std::bind (&I2PServerTunnel::HandleAccept, this, std::placeholders::_1)); + localDestination->GetStreamingDestination ()->SetAcceptor (std::bind (&I2PServerTunnel::HandleAccept, this, std::placeholders::_1)); else LogPrint ("Local destination not set for server tunnel"); } diff --git a/I2PTunnel.h b/I2PTunnel.h index 47d7e212..4f99bc79 100644 --- a/I2PTunnel.h +++ b/I2PTunnel.h @@ -6,6 +6,7 @@ #include #include #include "Identity.h" +#include "Destination.h" #include "Streaming.h" namespace i2p @@ -51,22 +52,22 @@ namespace client { public: - I2PTunnel (boost::asio::io_service& service, i2p::stream::StreamingDestination * localDestination): + I2PTunnel (boost::asio::io_service& service, ClientDestination * localDestination): m_Service (service), m_LocalDestination (localDestination) {}; virtual ~I2PTunnel () { ClearConnections (); }; void AddConnection (I2PTunnelConnection * conn); void RemoveConnection (I2PTunnelConnection * conn); void ClearConnections (); - i2p::stream::StreamingDestination * GetLocalDestination () { return m_LocalDestination; }; - void SetLocalDestination (i2p::stream::StreamingDestination * dest) { m_LocalDestination = dest; }; + ClientDestination * GetLocalDestination () { return m_LocalDestination; }; + void SetLocalDestination (ClientDestination * dest) { m_LocalDestination = dest; }; boost::asio::io_service& GetService () { return m_Service; }; private: boost::asio::io_service& m_Service; - i2p::stream::StreamingDestination * m_LocalDestination; + ClientDestination * m_LocalDestination; std::set m_Connections; }; @@ -75,7 +76,7 @@ namespace client public: I2PClientTunnel (boost::asio::io_service& service, const std::string& destination, int port, - i2p::stream::StreamingDestination * localDestination = nullptr); + ClientDestination * localDestination = nullptr); ~I2PClientTunnel (); void Start (); @@ -102,7 +103,7 @@ namespace client public: I2PServerTunnel (boost::asio::io_service& service, const std::string& address, int port, - i2p::stream::StreamingDestination * localDestination); + ClientDestination * localDestination); void Start (); void Stop (); diff --git a/SAM.cpp b/SAM.cpp index 4b96fd26..dd5ed5b7 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -26,7 +26,7 @@ namespace client { m_Stream->Close (); if (m_Session && m_Session->localDestination) - m_Session->localDestination->DeleteStream (m_Stream); + m_Session->localDestination->GetStreamingDestination ()->DeleteStream (m_Stream); } } @@ -36,7 +36,7 @@ namespace client { m_Stream->Close (); if (m_Session && m_Session->localDestination) - m_Session->localDestination->DeleteStream (m_Stream); + m_Session->localDestination->GetStreamingDestination ()->DeleteStream (m_Stream); m_Stream = nullptr; } switch (m_SocketType) @@ -55,7 +55,7 @@ namespace client if (m_Session) { m_Session->sockets.remove (this); - m_Session->localDestination->ResetAcceptor (); + m_Session->localDestination->GetStreamingDestination ()->ResetAcceptor (); } break; } @@ -295,7 +295,7 @@ namespace client { m_SocketType = eSAMSocketTypeStream; m_Session->sockets.push_back (this); - m_Stream = m_Session->localDestination->CreateNewOutgoingStream (remote); + m_Stream = m_Session->localDestination->GetStreamingDestination ()->CreateNewOutgoingStream (remote); m_Stream->Send ((uint8_t *)m_Buffer, 0); // connect I2PReceive (); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); @@ -344,11 +344,11 @@ namespace client m_Session = m_Owner.FindSession (id); if (m_Session) { - if (!m_Session->localDestination->IsAcceptorSet ()) + if (!m_Session->localDestination->GetStreamingDestination ()->IsAcceptorSet ()) { m_SocketType = eSAMSocketTypeAcceptor; m_Session->sockets.push_back (this); - m_Session->localDestination->SetAcceptor (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1)); + m_Session->localDestination->GetStreamingDestination ()->SetAcceptor (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1)); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); } else @@ -507,7 +507,7 @@ namespace client m_Stream = stream; auto session = m_Owner.FindSession (m_ID); if (session) - session->localDestination->ResetAcceptor (); + session->localDestination->GetStreamingDestination ()->ResetAcceptor (); if (!m_IsSilent) { // send remote peer address @@ -595,7 +595,7 @@ namespace client SAMSession * SAMBridge::CreateSession (const std::string& id, const std::string& destination) { - i2p::stream::StreamingDestination * localDestination = nullptr; + ClientDestination * localDestination = nullptr; if (destination != "") { uint8_t * buf = new uint8_t[destination.length ()]; diff --git a/SAM.h b/SAM.h index dff0e8e2..00ce366d 100644 --- a/SAM.h +++ b/SAM.h @@ -11,6 +11,7 @@ #include "Identity.h" #include "LeaseSet.h" #include "Streaming.h" +#include "Destination.h" namespace i2p { @@ -115,7 +116,7 @@ namespace client struct SAMSession { - i2p::stream::StreamingDestination * localDestination; + ClientDestination * localDestination; std::list sockets; }; diff --git a/SOCKS.cpp b/SOCKS.cpp index 83f8a5d0..9d5a5209 100644 --- a/SOCKS.cpp +++ b/SOCKS.cpp @@ -224,7 +224,7 @@ namespace proxy void SOCKS4AHandler::SentConnectionSuccess(const boost::system::error_code & ecode) { LogPrint("--- socks4a making connection"); - m_stream = i2p::client::context.GetSharedLocalDestination ()->CreateNewOutgoingStream(*m_ls); + m_stream = i2p::client::context.GetSharedLocalDestination ()->GetStreamingDestination ()->CreateNewOutgoingStream(*m_ls); m_state = OKAY; LogPrint("--- socks4a state is ", m_state); AsyncSockRead(); diff --git a/Streaming.cpp b/Streaming.cpp index 5b871cf2..f6762370 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -115,7 +115,7 @@ namespace stream { // we have received duplicate. Most likely our outbound tunnel is dead LogPrint ("Duplicate message ", receivedSeqn, " received"); - m_LocalDestination.ResetCurrentOutboundTunnel (); // pick another outbound tunnel + m_LocalDestination.GetOwner ().ResetCurrentOutboundTunnel (); // pick another outbound tunnel UpdateCurrentRemoteLease (); // pick another lease SendQuickAck (); // resend ack for previous message again delete packet; // packet dropped @@ -274,11 +274,11 @@ namespace stream if (isNoAck) flags |= PACKET_FLAG_NO_ACK; *(uint16_t *)(packet + size) = htobe16 (flags); size += 2; // flags - size_t identityLen = m_LocalDestination.GetIdentity ().GetFullLen (); - size_t signatureLen = m_LocalDestination.GetIdentity ().GetSignatureLen (); + size_t identityLen = m_LocalDestination.GetOwner ().GetIdentity ().GetFullLen (); + size_t signatureLen = m_LocalDestination.GetOwner ().GetIdentity ().GetSignatureLen (); *(uint16_t *)(packet + size) = htobe16 (identityLen + signatureLen + 2); // identity + signature + packet size size += 2; // options size - m_LocalDestination.GetIdentity ().ToBuffer (packet + size, identityLen); + m_LocalDestination.GetOwner ().GetIdentity ().ToBuffer (packet + size, identityLen); size += identityLen; // from *(uint16_t *)(packet + size) = htobe16 (STREAMING_MTU); size += 2; // max packet size @@ -291,7 +291,7 @@ namespace stream buf += sentLen; len -= sentLen; size += sentLen; // payload - m_LocalDestination.Sign (packet, size, signature); + m_LocalDestination.GetOwner ().Sign (packet, size, signature); } else { @@ -362,13 +362,13 @@ namespace stream size++; // resend delay *(uint16_t *)(packet + size) = htobe16 (PACKET_FLAG_CLOSE | PACKET_FLAG_SIGNATURE_INCLUDED); size += 2; // flags - size_t signatureLen = m_LocalDestination.GetIdentity ().GetSignatureLen (); + size_t signatureLen = m_LocalDestination.GetOwner ().GetIdentity ().GetSignatureLen (); *(uint16_t *)(packet + size) = htobe16 (signatureLen); // signature only size += 2; // options size uint8_t * signature = packet + size; memset (packet + size, 0, signatureLen); size += signatureLen; // signature - m_LocalDestination.Sign (packet, size, signature); + m_LocalDestination.GetOwner ().Sign (packet, size, signature); p->len = size; SendPacket (p); @@ -441,7 +441,7 @@ namespace stream for (auto it: packets) { auto msg = m_RoutingSession->WrapSingleMessage ( - m_LocalDestination.CreateDataMessage (it->GetBuffer (), it->GetLength ())); + m_LocalDestination.GetOwner ().CreateDataMessage (it->GetBuffer (), it->GetLength ())); msgs.push_back (i2p::tunnel::TunnelMessageBlock { i2p::tunnel::eDeliveryTypeTunnel, @@ -450,7 +450,7 @@ namespace stream }); m_NumSentBytes += it->GetLength (); } - m_LocalDestination.SendTunnelDataMsgs (msgs); + m_LocalDestination.GetOwner ().SendTunnelDataMsgs (msgs); } else LogPrint ("All leases are expired"); @@ -484,7 +484,7 @@ namespace stream } if (packets.size () > 0) { - m_LocalDestination.ResetCurrentOutboundTunnel (); // pick another outbound tunnel + m_LocalDestination.GetOwner ().ResetCurrentOutboundTunnel (); // pick another outbound tunnel UpdateCurrentRemoteLease (); // pick another lease SendPackets (packets); } @@ -506,14 +506,14 @@ namespace stream { if (!m_RemoteLeaseSet) { - m_RemoteLeaseSet = m_LocalDestination.FindLeaseSet (m_RemoteIdentity.GetIdentHash ()); + m_RemoteLeaseSet = m_LocalDestination.GetOwner ().FindLeaseSet (m_RemoteIdentity.GetIdentHash ()); if (!m_RemoteLeaseSet) LogPrint ("LeaseSet ", m_RemoteIdentity.GetIdentHash ().ToBase64 (), " not found"); } if (m_RemoteLeaseSet) { if (!m_RoutingSession) - m_RoutingSession = m_LocalDestination.GetRoutingSession (*m_RemoteLeaseSet, 32); + m_RoutingSession = m_LocalDestination.GetOwner ().GetRoutingSession (*m_RemoteLeaseSet, 32); auto leases = m_RemoteLeaseSet->GetNonExpiredLeases (); if (!leases.empty ()) { @@ -522,12 +522,88 @@ namespace stream } else { - m_RemoteLeaseSet = m_LocalDestination.FindLeaseSet (m_RemoteIdentity.GetIdentHash ()); // re-request expired + m_RemoteLeaseSet = m_LocalDestination.GetOwner ().FindLeaseSet (m_RemoteIdentity.GetIdentHash ()); // re-request expired m_CurrentRemoteLease.endDate = 0; } } else m_CurrentRemoteLease.endDate = 0; + } + + void StreamingDestination::Start () + { + } + + void StreamingDestination::Stop () + { + ResetAcceptor (); + { + std::unique_lock l(m_StreamsMutex); + for (auto it: m_Streams) + delete it.second; + m_Streams.clear (); + } + } + + void StreamingDestination::HandleNextPacket (Packet * packet) + { + uint32_t sendStreamID = packet->GetSendStreamID (); + if (sendStreamID) + { + auto it = m_Streams.find (sendStreamID); + if (it != m_Streams.end ()) + it->second->HandleNextPacket (packet); + else + { + LogPrint ("Unknown stream ", sendStreamID); + delete packet; + } + } + else // new incoming stream + { + auto incomingStream = CreateNewIncomingStream (); + incomingStream->HandleNextPacket (packet); + if (m_Acceptor != nullptr) + m_Acceptor (incomingStream); + else + { + LogPrint ("Acceptor for incoming stream is not set"); + DeleteStream (incomingStream); + } + } + } + + Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote) + { + Stream * s = new Stream (*m_Owner.GetService (), *this, remote); + std::unique_lock l(m_StreamsMutex); + m_Streams[s->GetRecvStreamID ()] = s; + return s; + } + + Stream * StreamingDestination::CreateNewIncomingStream () + { + Stream * s = new Stream (*m_Owner.GetService (), *this); + std::unique_lock l(m_StreamsMutex); + m_Streams[s->GetRecvStreamID ()] = s; + return s; + } + + void StreamingDestination::DeleteStream (Stream * stream) + { + if (stream) + { + std::unique_lock l(m_StreamsMutex); + auto it = m_Streams.find (stream->GetRecvStreamID ()); + if (it != m_Streams.end ()) + { + m_Streams.erase (it); + if (m_Owner.GetService ()) + m_Owner.GetService ()->post ([stream](void) { delete stream; }); + else + delete stream; + } + } } } } diff --git a/Streaming.h b/Streaming.h index 5c02913d..29f98088 100644 --- a/Streaming.h +++ b/Streaming.h @@ -18,6 +18,10 @@ namespace i2p { +namespace client +{ + class ClientDestination; +} namespace stream { const uint16_t PACKET_FLAG_SYNCHRONIZE = 0x0001; @@ -141,6 +145,43 @@ namespace stream size_t m_NumSentBytes, m_NumReceivedBytes; }; + class StreamingDestination + { + public: + + StreamingDestination (i2p::client::ClientDestination& owner): m_Owner (owner) {}; + ~StreamingDestination () {}; + + void Start (); + void Stop (); + + Stream * CreateNewOutgoingStream (const i2p::data::LeaseSet& remote); + void DeleteStream (Stream * stream); + void SetAcceptor (const std::function& acceptor) { m_Acceptor = acceptor; }; + void ResetAcceptor () { m_Acceptor = nullptr; }; + bool IsAcceptorSet () const { return m_Acceptor != nullptr; }; + + // ClientDestination + i2p::client::ClientDestination& GetOwner () { return m_Owner; }; + void HandleNextPacket (Packet * packet); + + private: + + Stream * CreateNewIncomingStream (); + + private: + + i2p::client::ClientDestination& m_Owner; + std::mutex m_StreamsMutex; + std::map m_Streams; + std::function m_Acceptor; + + public: + + // for HTTP only + const decltype(m_Streams)& GetStreams () const { return m_Streams; }; + }; + //------------------------------------------------- template