From b11877d002b02807deca2c137aeea580a099e095 Mon Sep 17 00:00:00 2001 From: orignal Date: Wed, 22 Oct 2014 14:01:23 -0400 Subject: [PATCH] create streams through ClientDestination --- Destination.cpp | 33 +++++++++++++++++++++++++++++---- Destination.h | 12 +++++++++++- HTTPServer.cpp | 4 ++-- I2PTunnel.cpp | 8 ++++---- SAM.cpp | 17 ++++++++--------- SOCKS.cpp | 2 +- Streaming.cpp | 6 ++++++ Streaming.h | 2 ++ 8 files changed, 63 insertions(+), 21 deletions(-) diff --git a/Destination.cpp b/Destination.cpp index 254f9829..9053c271 100644 --- a/Destination.cpp +++ b/Destination.cpp @@ -244,7 +244,7 @@ namespace client uint32_t length = be32toh (*(uint32_t *)buf); buf += 4; // we assume I2CP payload - if (buf[9] == 6) // streaming protocol + if (buf[9] == PROTOCOL_TYPE_STREAMING && m_StreamingDestination) // streaming protocol { // unzip it CryptoPP::Gunzip decompressor; @@ -261,7 +261,6 @@ namespace client else { LogPrint ("Received packet size ", uncompressed->len, " exceeds max packet size. Skipped"); - decompressor.Skip (); delete uncompressed; } } @@ -285,11 +284,37 @@ namespace client buf += 4; compressor.Get (buf, size); memset (buf + 4, 0, 4); // source and destination ports. TODO: fill with proper values later - buf[9] = 6; // streaming protocol + buf[9] = PROTOCOL_TYPE_STREAMING; // streaming protocol. TODO: msg->len += size + 4; FillI2NPMessageHeader (msg, eI2NPData); return msg; - } + } + + i2p::stream::Stream * ClientDestination::CreateStream (const i2p::data::LeaseSet& remote) + { + if (m_StreamingDestination) + return m_StreamingDestination->CreateNewOutgoingStream (remote); + return nullptr; + } + + void ClientDestination::AcceptStreams (const std::function& acceptor) + { + if (m_StreamingDestination) + m_StreamingDestination->SetAcceptor (acceptor); + } + + void ClientDestination::StopAcceptingStreams () + { + if (m_StreamingDestination) + m_StreamingDestination->ResetAcceptor (); + } + + bool ClientDestination::IsAcceptingStreams () const + { + if (m_StreamingDestination) + return m_StreamingDestination->IsAcceptorSet (); + return false; + } } } diff --git a/Destination.h b/Destination.h index c7f0b981..f15c1588 100644 --- a/Destination.h +++ b/Destination.h @@ -14,6 +14,10 @@ namespace i2p { namespace client { + const uint8_t PROTOCOL_TYPE_STREAMING = 6; + const uint8_t PROTOCOL_TYPE_DATAGRAM = 17; + const uint8_t PROTOCOL_TYPE_RAW = 18; + class ClientDestination: public i2p::garlic::GarlicDestination { public: @@ -28,13 +32,19 @@ namespace client bool IsRunning () const { return m_IsRunning; }; boost::asio::io_service * GetService () { return m_Service; }; i2p::tunnel::TunnelPool * GetTunnelPool () { return m_Pool; }; - i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; }; bool IsReady () const { return m_LeaseSet && m_LeaseSet->HasNonExpiredLeases (); }; void ResetCurrentOutboundTunnel () { m_CurrentOutboundTunnel = nullptr; }; const i2p::data::LeaseSet * FindLeaseSet (const i2p::data::IdentHash& ident); void SendTunnelDataMsgs (const std::vector& msgs); + // streaming + i2p::stream::StreamingDestination * GetStreamingDestination () const { return m_StreamingDestination; }; + i2p::stream::Stream * CreateStream (const i2p::data::LeaseSet& remote); + void AcceptStreams (const std::function& acceptor); + void StopAcceptingStreams (); + bool IsAcceptingStreams () const; + // implements LocalDestination const i2p::data::PrivateKeys& GetPrivateKeys () const { return m_Keys; }; const uint8_t * GetEncryptionPrivateKey () const { return m_EncryptionPrivateKey; }; diff --git a/HTTPServer.cpp b/HTTPServer.cpp index 78e3bd38..7ab07005 100644 --- a/HTTPServer.cpp +++ b/HTTPServer.cpp @@ -517,7 +517,7 @@ namespace util if (m_Stream) { m_Stream->Close (); - i2p::client::context.GetSharedLocalDestination ()->GetStreamingDestination ()->DeleteStream (m_Stream); + i2p::stream::DeleteStream (m_Stream); m_Stream = nullptr; } m_Socket->close (); @@ -880,7 +880,7 @@ namespace util void HTTPConnection::SendToDestination (const i2p::data::LeaseSet * remote, const char * buf, size_t len) { if (!m_Stream) - m_Stream = i2p::client::context.GetSharedLocalDestination ()->GetStreamingDestination ()->CreateNewOutgoingStream (*remote); + m_Stream = i2p::client::context.GetSharedLocalDestination ()->CreateStream (*remote); if (m_Stream) { m_Stream->Send ((uint8_t *)buf, len); diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index 9569a25e..a8193bdd 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -14,7 +14,7 @@ namespace client boost::asio::ip::tcp::socket * socket, const i2p::data::LeaseSet * leaseSet): m_Socket (socket), m_Owner (owner) { - m_Stream = m_Owner->GetLocalDestination ()->GetStreamingDestination ()->CreateNewOutgoingStream (*leaseSet); + m_Stream = m_Owner->GetLocalDestination ()->CreateStream (*leaseSet); m_Stream->Send (m_Buffer, 0); // connect StreamReceive (); Receive (); @@ -39,7 +39,7 @@ namespace client if (m_Stream) { m_Stream->Close (); - m_Owner->GetLocalDestination ()->GetStreamingDestination ()->DeleteStream (m_Stream); + i2p::stream::DeleteStream (m_Stream); m_Stream = nullptr; } m_Socket->close (); @@ -115,7 +115,7 @@ namespace client if (ecode != boost::asio::error::operation_aborted) { if (m_Stream) m_Stream->Close (); - m_Owner->GetLocalDestination ()->GetStreamingDestination ()->DeleteStream (m_Stream); + i2p::stream::DeleteStream (m_Stream); m_Stream = nullptr; } } @@ -270,7 +270,7 @@ namespace client { auto localDestination = GetLocalDestination (); if (localDestination) - localDestination->GetStreamingDestination ()->SetAcceptor (std::bind (&I2PServerTunnel::HandleAccept, this, std::placeholders::_1)); + localDestination->AcceptStreams (std::bind (&I2PServerTunnel::HandleAccept, this, std::placeholders::_1)); else LogPrint ("Local destination not set for server tunnel"); } diff --git a/SAM.cpp b/SAM.cpp index dd5ed5b7..897fe5a0 100644 --- a/SAM.cpp +++ b/SAM.cpp @@ -25,8 +25,8 @@ namespace client if (m_Stream) { m_Stream->Close (); - if (m_Session && m_Session->localDestination) - m_Session->localDestination->GetStreamingDestination ()->DeleteStream (m_Stream); + i2p::stream::DeleteStream (m_Stream); + m_Stream = nullptr; } } @@ -35,8 +35,7 @@ namespace client if (m_Stream) { m_Stream->Close (); - if (m_Session && m_Session->localDestination) - m_Session->localDestination->GetStreamingDestination ()->DeleteStream (m_Stream); + i2p::stream::DeleteStream (m_Stream); m_Stream = nullptr; } switch (m_SocketType) @@ -55,7 +54,7 @@ namespace client if (m_Session) { m_Session->sockets.remove (this); - m_Session->localDestination->GetStreamingDestination ()->ResetAcceptor (); + m_Session->localDestination->StopAcceptingStreams (); } break; } @@ -295,7 +294,7 @@ namespace client { m_SocketType = eSAMSocketTypeStream; m_Session->sockets.push_back (this); - m_Stream = m_Session->localDestination->GetStreamingDestination ()->CreateNewOutgoingStream (remote); + m_Stream = m_Session->localDestination->CreateStream (remote); m_Stream->Send ((uint8_t *)m_Buffer, 0); // connect I2PReceive (); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); @@ -344,11 +343,11 @@ namespace client m_Session = m_Owner.FindSession (id); if (m_Session) { - if (!m_Session->localDestination->GetStreamingDestination ()->IsAcceptorSet ()) + if (!m_Session->localDestination->IsAcceptingStreams ()) { m_SocketType = eSAMSocketTypeAcceptor; m_Session->sockets.push_back (this); - m_Session->localDestination->GetStreamingDestination ()->SetAcceptor (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1)); + m_Session->localDestination->AcceptStreams (std::bind (&SAMSocket::HandleI2PAccept, this, std::placeholders::_1)); SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); } else @@ -507,7 +506,7 @@ namespace client m_Stream = stream; auto session = m_Owner.FindSession (m_ID); if (session) - session->localDestination->GetStreamingDestination ()->ResetAcceptor (); + session->localDestination->StopAcceptingStreams (); if (!m_IsSilent) { // send remote peer address diff --git a/SOCKS.cpp b/SOCKS.cpp index 9d5a5209..f2386321 100644 --- a/SOCKS.cpp +++ b/SOCKS.cpp @@ -224,7 +224,7 @@ namespace proxy void SOCKS4AHandler::SentConnectionSuccess(const boost::system::error_code & ecode) { LogPrint("--- socks4a making connection"); - m_stream = i2p::client::context.GetSharedLocalDestination ()->GetStreamingDestination ()->CreateNewOutgoingStream(*m_ls); + m_stream = i2p::client::context.GetSharedLocalDestination ()->CreateStream(*m_ls); m_state = OKAY; LogPrint("--- socks4a state is ", m_state); AsyncSockRead(); diff --git a/Streaming.cpp b/Streaming.cpp index f6762370..82bd90dd 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -605,5 +605,11 @@ namespace stream } } } + + void DeleteStream (Stream * stream) + { + if (stream) + stream->GetLocalDestination ().DeleteStream (stream); + } } } diff --git a/Streaming.h b/Streaming.h index 29f98088..ee7d9eb0 100644 --- a/Streaming.h +++ b/Streaming.h @@ -182,6 +182,8 @@ namespace stream const decltype(m_Streams)& GetStreams () const { return m_Streams; }; }; + void DeleteStream (Stream * stream); + //------------------------------------------------- template