diff --git a/I2PTunnel.cpp b/I2PTunnel.cpp index ae265c3b..2a857228 100644 --- a/I2PTunnel.cpp +++ b/I2PTunnel.cpp @@ -1,4 +1,7 @@ #include +#include "base64.h" +#include "Log.h" +#include "NetDb.h" #include "I2PTunnel.h" namespace i2p @@ -9,26 +12,129 @@ namespace stream const i2p::data::LeaseSet * leaseSet): m_Socket (socket) { m_Stream = i2p::stream::CreateStream (*leaseSet); + m_Stream->Send (m_Buffer, 0, 0); // connect + StreamReceive (); + Receive (); } I2PTunnelConnection::~I2PTunnelConnection () { + Terminate (); + } + + void I2PTunnelConnection::Terminate () + { if (m_Stream) { m_Stream->Close (); DeleteStream (m_Stream); + m_Stream = nullptr; } - delete m_Socket; + // TODO: remove from I2PTunnel + } + + void I2PTunnelConnection::Receive () + { + m_Socket->async_read_some (boost::asio::buffer(m_Buffer, I2P_TUNNEL_CONNECTION_BUFFER_SIZE), + boost::bind(&I2PTunnelConnection::HandleReceived, this, + boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + } + + void I2PTunnelConnection::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) + { + if (ecode) + { + LogPrint ("I2PTunnel read error: ", ecode.message ()); + Terminate (); + } + else + { + if (m_Stream) + m_Stream->Send (m_Buffer, bytes_transferred, 0); + Receive (); + } + } + + void I2PTunnelConnection::HandleWrite (const boost::system::error_code& ecode) + { + StreamReceive (); + } + + void I2PTunnelConnection::StreamReceive () + { + if (m_Stream) + m_Stream->AsyncReceive (boost::asio::buffer (m_StreamBuffer, I2P_TUNNEL_CONNECTION_BUFFER_SIZE), + boost::bind (&I2PTunnelConnection::HandleStreamReceive, this, + boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred), + I2P_TUNNEL_CONNECTION_MAX_IDLE); } + void I2PTunnelConnection::HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) + { + if (ecode) + { + LogPrint ("I2PTunnel stream read error: ", ecode.message ()); + Terminate (); + } + else + { + boost::asio::async_write (*m_Socket, boost::asio::buffer (m_StreamBuffer, bytes_transferred), + boost::bind (&I2PTunnelConnection::HandleWrite, this, boost::asio::placeholders::error)); + } + } + I2PClientTunnel::I2PClientTunnel (boost::asio::io_service& service, const std::string& destination, int port): m_Service (service), m_Acceptor (m_Service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port)), - m_Destination (destination), m_RemoteLeaseSet (nullptr) + m_Destination (destination), m_DestinationIdentHash (nullptr), m_RemoteLeaseSet (nullptr) { } + I2PClientTunnel::~I2PClientTunnel () + { + Stop (); + } + void I2PClientTunnel::Start () + { + auto pos = m_Destination.find(".b32.i2p"); + if (pos != std::string::npos) + { + uint8_t hash[32]; + i2p::data::Base32ToByteStream (m_Destination.c_str(), pos, hash, 32); + m_DestinationIdentHash = new i2p::data::IdentHash (hash); + } + else + { + pos = m_Destination.find (".i2p"); + if (pos != std::string::npos) + { + auto identHash = i2p::data::netdb.FindAddress (m_Destination.substr (0, pos)); + if (identHash) + m_DestinationIdentHash = new i2p::data::IdentHash (*identHash); + } + } + if (m_DestinationIdentHash) + { + i2p::data::netdb.Subscribe (*m_DestinationIdentHash); + m_RemoteLeaseSet = i2p::data::netdb.FindLeaseSet (*m_DestinationIdentHash); + } + else + LogPrint ("I2PTunnel unknown destination ", m_Destination); + m_Acceptor.listen (); + Accept (); + } + + void I2PClientTunnel::Stop () + { + m_Acceptor.close(); + for (auto it: m_Connections) + delete it; + m_Connections.clear (); + delete m_DestinationIdentHash; + m_DestinationIdentHash = nullptr; + } + void I2PClientTunnel::Accept () { auto newSocket = new boost::asio::ip::tcp::socket (m_Service); @@ -40,8 +146,13 @@ namespace stream { if (!ecode) { + if (!m_RemoteLeaseSet && m_DestinationIdentHash) + m_RemoteLeaseSet = i2p::data::netdb.FindLeaseSet (*m_DestinationIdentHash); if (m_RemoteLeaseSet) - new I2PTunnelConnection (socket, m_RemoteLeaseSet); + { + auto connection = new I2PTunnelConnection (socket, m_RemoteLeaseSet); + m_Connections.insert (connection); + } else delete socket; Accept (); diff --git a/I2PTunnel.h b/I2PTunnel.h index 62f78a9e..9882d4a1 100644 --- a/I2PTunnel.h +++ b/I2PTunnel.h @@ -3,6 +3,7 @@ #include #include +#include #include #include "Identity.h" #include "Streaming.h" @@ -11,6 +12,8 @@ namespace i2p { namespace stream { + const size_t I2P_TUNNEL_CONNECTION_BUFFER_SIZE = 8192; + const int I2P_TUNNEL_CONNECTION_MAX_IDLE = 3600; // in seconds class I2PTunnelConnection { public: @@ -18,9 +21,21 @@ namespace stream I2PTunnelConnection (boost::asio::ip::tcp::socket * socket, const i2p::data::LeaseSet * leaseSet); ~I2PTunnelConnection (); - + private: + void Terminate (); + + void Receive (); + void HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void HandleWrite (const boost::system::error_code& ecode); + + void StreamReceive (); + void HandleStreamReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred); + + private: + + uint8_t m_Buffer[I2P_TUNNEL_CONNECTION_BUFFER_SIZE], m_StreamBuffer[I2P_TUNNEL_CONNECTION_BUFFER_SIZE]; boost::asio::ip::tcp::socket * m_Socket; Stream * m_Stream; }; @@ -30,7 +45,11 @@ namespace stream public: I2PClientTunnel (boost::asio::io_service& service, const std::string& destination, int port); - + ~I2PClientTunnel (); + + void Start (); + void Stop (); + private: void Accept (); @@ -41,8 +60,9 @@ namespace stream boost::asio::io_service& m_Service; boost::asio::ip::tcp::acceptor m_Acceptor; std::string m_Destination; - i2p::data::IdentHash m_DestinationIdentHash; + const i2p::data::IdentHash * m_DestinationIdentHash; const i2p::data::LeaseSet * m_RemoteLeaseSet; + std::set m_Connections; }; } }