diff --git a/libi2pd_client/HTTPProxy.cpp b/libi2pd_client/HTTPProxy.cpp index 8f9f8ada..01678f4a 100644 --- a/libi2pd_client/HTTPProxy.cpp +++ b/libi2pd_client/HTTPProxy.cpp @@ -638,7 +638,7 @@ namespace proxy { void HTTPReqHandler::HandoverToUpstreamProxy() { LogPrint(eLogDebug, "HTTPProxy: Handover to SOCKS proxy"); - auto connection = std::make_shared(GetOwner(), m_proxysock, m_sock); + auto connection = CreateSocketsPipe (GetOwner(), m_proxysock, m_sock); m_sock = nullptr; m_proxysock = nullptr; GetOwner()->AddHandler(connection); diff --git a/libi2pd_client/I2PService.cpp b/libi2pd_client/I2PService.cpp index 9b0b8028..27e34b69 100644 --- a/libi2pd_client/I2PService.cpp +++ b/libi2pd_client/I2PService.cpp @@ -148,25 +148,25 @@ namespace client } } - TCPIPPipe::TCPIPPipe(I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream) : I2PServiceHandler(owner), m_up(upstream), m_down(downstream) + SocketsPipe::SocketsPipe(I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream) : I2PServiceHandler(owner), m_up(upstream), m_down(downstream) { - boost::asio::socket_base::receive_buffer_size option(TCP_IP_PIPE_BUFFER_SIZE); + boost::asio::socket_base::receive_buffer_size option(SOCKETS_PIPE_BUFFER_SIZE); upstream->set_option(option); downstream->set_option(option); } - TCPIPPipe::~TCPIPPipe() + SocketsPipe::~SocketsPipe() { Terminate(); } - void TCPIPPipe::Start() + void SocketsPipe::Start() { AsyncReceiveUpstream(); AsyncReceiveDownstream(); } - void TCPIPPipe::Terminate() + void SocketsPipe::Terminate() { if(Kill()) return; if (m_up) @@ -184,52 +184,52 @@ namespace client Done(shared_from_this()); } - void TCPIPPipe::AsyncReceiveUpstream() + void SocketsPipe::AsyncReceiveUpstream() { if (m_up) { - m_up->async_read_some(boost::asio::buffer(m_upstream_to_down_buf, TCP_IP_PIPE_BUFFER_SIZE), - std::bind(&TCPIPPipe::HandleUpstreamReceived, shared_from_this(), + m_up->async_read_some(boost::asio::buffer(m_upstream_to_down_buf, SOCKETS_PIPE_BUFFER_SIZE), + std::bind(&SocketsPipe::HandleUpstreamReceived, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } else - LogPrint(eLogError, "TCPIPPipe: Upstream receive: No socket"); + LogPrint(eLogError, "SocketsPipe: Upstream receive: No socket"); } - void TCPIPPipe::AsyncReceiveDownstream() + void SocketsPipe::AsyncReceiveDownstream() { if (m_down) { - m_down->async_read_some(boost::asio::buffer(m_downstream_to_up_buf, TCP_IP_PIPE_BUFFER_SIZE), - std::bind(&TCPIPPipe::HandleDownstreamReceived, shared_from_this(), + m_down->async_read_some(boost::asio::buffer(m_downstream_to_up_buf, SOCKETS_PIPE_BUFFER_SIZE), + std::bind(&SocketsPipe::HandleDownstreamReceived, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } else - LogPrint(eLogError, "TCPIPPipe: Downstream receive: No socket"); + LogPrint(eLogError, "SocketsPipe: Downstream receive: No socket"); } - void TCPIPPipe::UpstreamWrite(size_t len) + void SocketsPipe::UpstreamWrite(size_t len) { if (m_up) { - LogPrint(eLogDebug, "TCPIPPipe: Upstream: ", (int) len, " bytes written"); + LogPrint(eLogDebug, "SocketsPipe: Upstream: ", (int) len, " bytes written"); boost::asio::async_write(*m_up, boost::asio::buffer(m_upstream_buf, len), boost::asio::transfer_all(), - std::bind(&TCPIPPipe::HandleUpstreamWrite, + std::bind(&SocketsPipe::HandleUpstreamWrite, shared_from_this(), std::placeholders::_1)); } else - LogPrint(eLogError, "TCPIPPipe: Upstream write: no socket"); + LogPrint(eLogError, "SocketsPipe: Upstream write: no socket"); } - void TCPIPPipe::DownstreamWrite(size_t len) + void SocketsPipe::DownstreamWrite(size_t len) { if (m_down) { LogPrint(eLogDebug, "TCPIPPipe: Downstream: ", (int) len, " bytes written"); boost::asio::async_write(*m_down, boost::asio::buffer(m_downstream_buf, len), boost::asio::transfer_all(), - std::bind(&TCPIPPipe::HandleDownstreamWrite, + std::bind(&SocketsPipe::HandleDownstreamWrite, shared_from_this(), std::placeholders::_1)); } @@ -238,7 +238,7 @@ namespace client } - void TCPIPPipe::HandleDownstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transfered) + void SocketsPipe::HandleDownstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transfered) { if (ecode != boost::asio::error::operation_aborted) { @@ -257,7 +257,7 @@ namespace client } } - void TCPIPPipe::HandleDownstreamWrite(const boost::system::error_code & ecode) + void SocketsPipe::HandleDownstreamWrite(const boost::system::error_code & ecode) { if (ecode != boost::asio::error::operation_aborted) { @@ -271,13 +271,13 @@ namespace client } } - void TCPIPPipe::HandleUpstreamWrite(const boost::system::error_code & ecode) + void SocketsPipe::HandleUpstreamWrite(const boost::system::error_code & ecode) { if (ecode != boost::asio::error::operation_aborted) { if (ecode) { - LogPrint(eLogWarning, "TCPIPPipe: Upstream write error:" , ecode.message()); + LogPrint(eLogWarning, "SocketsPipe: Upstream write error:" , ecode.message()); Terminate(); } else @@ -285,14 +285,14 @@ namespace client } } - void TCPIPPipe::HandleUpstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transfered) + void SocketsPipe::HandleUpstreamReceived(const boost::system::error_code & ecode, std::size_t bytes_transfered) { if (ecode != boost::asio::error::operation_aborted) { - LogPrint(eLogDebug, "TCPIPPipe: Upstream ", (int)bytes_transfered, " bytes received"); + LogPrint(eLogDebug, "SocketsPipe: Upstream ", (int)bytes_transfered, " bytes received"); if (ecode) { - LogPrint(eLogWarning, "TCPIPPipe: Upstream read error:" , ecode.message()); + LogPrint(eLogWarning, "SocketsPipe: Upstream read error:" , ecode.message()); Terminate(); } else diff --git a/libi2pd_client/I2PService.h b/libi2pd_client/I2PService.h index 4f67e19c..4be7f2ca 100644 --- a/libi2pd_client/I2PService.h +++ b/libi2pd_client/I2PService.h @@ -1,5 +1,5 @@ /* -* Copyright (c) 2013-2023, The PurpleI2P Project +* Copyright (c) 2013-2024, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * @@ -99,6 +99,7 @@ namespace client virtual ~I2PServiceHandler() { } //If you override this make sure you call it from the children virtual void Handle() {}; //Start handling the socket + virtual void Start () {}; void Terminate () { Kill (); }; @@ -119,16 +120,16 @@ namespace client std::atomic m_Dead; //To avoid cleaning up multiple times }; - const size_t TCP_IP_PIPE_BUFFER_SIZE = 8192 * 8; + const size_t SOCKETS_PIPE_BUFFER_SIZE = 8192 * 8; - // bidirectional pipe for 2 tcp/ip sockets - class TCPIPPipe: public I2PServiceHandler, public std::enable_shared_from_this + // bidirectional pipe for 2 stream sockets + class SocketsPipe: public I2PServiceHandler, public std::enable_shared_from_this { public: - TCPIPPipe(I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream); - ~TCPIPPipe(); - void Start(); + SocketsPipe(I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream); + ~SocketsPipe(); + void Start() override; protected: @@ -144,11 +145,18 @@ namespace client private: - uint8_t m_upstream_to_down_buf[TCP_IP_PIPE_BUFFER_SIZE], m_downstream_to_up_buf[TCP_IP_PIPE_BUFFER_SIZE]; - uint8_t m_upstream_buf[TCP_IP_PIPE_BUFFER_SIZE], m_downstream_buf[TCP_IP_PIPE_BUFFER_SIZE]; - std::shared_ptr m_up, m_down; + uint8_t m_upstream_to_down_buf[SOCKETS_PIPE_BUFFER_SIZE], m_downstream_to_up_buf[SOCKETS_PIPE_BUFFER_SIZE]; + uint8_t m_upstream_buf[SOCKETS_PIPE_BUFFER_SIZE], m_downstream_buf[SOCKETS_PIPE_BUFFER_SIZE]; + std::shared_ptr m_up; + std::shared_ptr m_down; }; + template + std::shared_ptr CreateSocketsPipe (I2PService * owner, std::shared_ptr upstream, std::shared_ptr downstream) + { + return std::make_shared(owner, upstream, downstream); + } + /* TODO: support IPv6 too */ //This is a service that listens for connections on the IP network and interacts with I2P class TCPIPAcceptor: public I2PService diff --git a/libi2pd_client/SOCKS.cpp b/libi2pd_client/SOCKS.cpp index 49a04b81..65bad1cf 100644 --- a/libi2pd_client/SOCKS.cpp +++ b/libi2pd_client/SOCKS.cpp @@ -700,13 +700,12 @@ namespace proxy break; } m_sock->send(response); - auto forwarder = std::make_shared(GetOwner(), m_sock, m_upstreamSock); + auto forwarder = CreateSocketsPipe (GetOwner(), m_sock, m_upstreamSock); m_upstreamSock = nullptr; m_sock = nullptr; GetOwner()->AddHandler(forwarder); forwarder->Start(); Terminate(); - }