From aac1141ca6884bd4693127f800cc911c81c5d359 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Sun, 11 Feb 2018 06:05:41 -0500 Subject: [PATCH 01/10] fix issue #1107 --- libi2pd/HTTP.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libi2pd/HTTP.cpp b/libi2pd/HTTP.cpp index 24e55457..9bca11d7 100644 --- a/libi2pd/HTTP.cpp +++ b/libi2pd/HTTP.cpp @@ -84,7 +84,7 @@ namespace http { pos_c = url.find('@', pos_p); /* find end of 'user' or 'user:pass' part */ if (pos_c != std::string::npos && (pos_s == std::string::npos || pos_s > pos_c)) { std::size_t delim = url.find(':', pos_p); - if (delim != std::string::npos && delim < pos_c) { + if (delim && delim != std::string::npos && delim < pos_c) { user = url.substr(pos_p, delim - pos_p); delim += 1; pass = url.substr(delim, pos_c - delim); From 4643c92d33a1d43861b0aac74dec2f01689d01bf Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 24 Apr 2018 09:45:16 -0400 Subject: [PATCH 02/10] Initial SAM cleanup --- daemon/HTTPServer.cpp | 2 +- daemon/I2PControl.cpp | 2 +- libi2pd/Streaming.cpp | 4 +- libi2pd/Streaming.h | 3 + libi2pd_client/SAM.cpp | 208 ++++++++++++++++++----------------------- libi2pd_client/SAM.h | 54 +++++------ 6 files changed, 119 insertions(+), 154 deletions(-) diff --git a/daemon/HTTPServer.cpp b/daemon/HTTPServer.cpp index 6f884a9b..554ba45d 100644 --- a/daemon/HTTPServer.cpp +++ b/daemon/HTTPServer.cpp @@ -649,7 +649,7 @@ namespace http { s << i2p::client::context.GetAddressBook ().ToAddress(ident) << "
\r\n"; s << "
\r\n"; s << "Streams:
\r\n"; - for (const auto& it: session->ListSockets()) + for (const auto& it: sam->ListSockets(id)) { switch (it->GetSocketType ()) { diff --git a/daemon/I2PControl.cpp b/daemon/I2PControl.cpp index fcff78cd..6ac87cbb 100644 --- a/daemon/I2PControl.cpp +++ b/daemon/I2PControl.cpp @@ -727,7 +727,7 @@ namespace client sam_session.put("name", name); sam_session.put("address", i2p::client::context.GetAddressBook ().ToAddress(ident)); - for (const auto& socket: it.second->ListSockets()) + for (const auto& socket: sam->ListSockets(it.first)) { boost::property_tree::ptree stream; stream.put("type", socket->GetSocketType ()); diff --git a/libi2pd/Streaming.cpp b/libi2pd/Streaming.cpp index dd8e3634..e8386b61 100644 --- a/libi2pd/Streaming.cpp +++ b/libi2pd/Streaming.cpp @@ -578,9 +578,7 @@ namespace stream if (m_SentPackets.empty () && m_SendBuffer.IsEmpty ()) // nothing to send { m_Status = eStreamStatusClosed; - // close could be called from another thread so do SendClose from the destination thread - // this is so m_LocalDestination.NewPacket () does not trigger a race condition - m_Service.post(std::bind(&Stream::SendClose, shared_from_this())); + SendClose(); } break; case eStreamStatusClosed: diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 47f99833..7f2598c0 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -165,6 +165,9 @@ namespace stream void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0); size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); }; + void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); }; + + /** only call close from destination thread, use Stream::AsyncClose for other threads */ void Close (); void Cancel () { m_ReceiveTimer.cancel (); }; diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index 05943981..b8a72f0c 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -15,8 +15,8 @@ namespace i2p { namespace client { - SAMSocket::SAMSocket (SAMBridge& owner, std::shared_ptr socket): - m_Owner (owner), m_Socket(socket), m_Timer (m_Owner.GetService ()), + SAMSocket::SAMSocket (SAMBridge& owner): + m_Owner (owner), m_Socket(owner.GetService()), m_Timer (m_Owner.GetService ()), m_BufferOffset (0), m_SocketType (eSAMSocketTypeUnknown), m_IsSilent (false), m_IsAccepting (false), m_Stream (nullptr) @@ -25,51 +25,18 @@ namespace client SAMSocket::~SAMSocket () { - if(m_Stream) - { - m_Stream->Close (); - m_Stream.reset (); - } - auto Session = m_Owner.FindSession(m_ID); - - switch (m_SocketType) - { - case eSAMSocketTypeSession: - m_Owner.CloseSession (m_ID); - break; - case eSAMSocketTypeStream: - { - if (Session) - Session->DelSocket (this); - break; - } - case eSAMSocketTypeAcceptor: - { - if (Session) - { - Session->DelSocket (this); - if (m_IsAccepting && Session->localDestination) - Session->localDestination->StopAcceptingStreams (); - } - break; - } - default: - ; - } - m_SocketType = eSAMSocketTypeTerminated; - if (m_Socket && m_Socket->is_open()) m_Socket->close (); - m_Socket.reset (); + m_Stream.reset (); + if (m_Socket.is_open()) m_Socket.close (); } void SAMSocket::Terminate (const char* reason) { if(m_Stream) { - m_Stream->Close (); + m_Stream->AsyncClose (); m_Stream.reset (); } auto Session = m_Owner.FindSession(m_ID); - switch (m_SocketType) { case eSAMSocketTypeSession: @@ -77,15 +44,12 @@ namespace client break; case eSAMSocketTypeStream: { - if (Session) - Session->DelSocket (this); break; } case eSAMSocketTypeAcceptor: { if (Session) { - Session->DelSocket (this); if (m_IsAccepting && Session->localDestination) Session->localDestination->StopAcceptingStreams (); } @@ -95,16 +59,15 @@ namespace client ; } m_SocketType = eSAMSocketTypeTerminated; - if (m_Socket && m_Socket->is_open()) m_Socket->close (); - m_Socket.reset (); + if (m_Socket.is_open()) m_Socket.close (); + m_Owner.RemoveSocket(this); } void SAMSocket::ReceiveHandshake () - { - if(m_Socket) - m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), - std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (), - std::placeholders::_1, std::placeholders::_2)); + { + m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), + std::bind(&SAMSocket::HandleHandshakeReceived, shared_from_this (), + std::placeholders::_1, std::placeholders::_2)); } static bool SAMVersionAcceptable(const std::string & ver) @@ -125,7 +88,7 @@ namespace client void SAMSocket::HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (ecode) - { + { LogPrint (eLogError, "SAM: handshake read error: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate ("SAM: handshake read error"); @@ -184,7 +147,7 @@ namespace client #else size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_HANDSHAKE_REPLY, version.c_str ()); #endif - boost::asio::async_write (*m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (), + boost::asio::async_write (m_Socket, boost::asio::buffer (m_Buffer, l), boost::asio::transfer_all (), std::bind(&SAMSocket::HandleHandshakeReplySent, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } @@ -199,17 +162,22 @@ namespace client } } + bool SAMSocket::IsSession(const std::string & id) const + { + return id == m_ID; + } + void SAMSocket::HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (ecode) - { + { LogPrint (eLogError, "SAM: handshake reply send error: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate ("SAM: handshake reply send error"); } - else if(m_Socket) + else { - m_Socket->async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), + m_Socket.async_read_some (boost::asio::buffer(m_Buffer, SAM_SOCKET_BUFFER_SIZE), std::bind(&SAMSocket::HandleMessage, shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } @@ -220,7 +188,7 @@ namespace client LogPrint (eLogDebug, "SAMSocket::SendMessageReply, close=",close?"true":"false", " reason: ", msg); if (!m_IsSilent) - boost::asio::async_write (*m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (), + boost::asio::async_write (m_Socket, boost::asio::buffer (msg, len), boost::asio::transfer_all (), std::bind(&SAMSocket::HandleMessageReplySent, shared_from_this (), std::placeholders::_1, std::placeholders::_2, close)); else @@ -501,7 +469,6 @@ namespace client if(session) { m_SocketType = eSAMSocketTypeStream; - session->AddSocket (shared_from_this ()); m_Stream = session->localDestination->CreateStream (remote); m_Stream->Send ((uint8_t *)m_Buffer, m_BufferOffset); // connect and send m_BufferOffset = 0; @@ -534,7 +501,6 @@ namespace client if (session) { m_SocketType = eSAMSocketTypeAcceptor; - session->AddSocket (shared_from_this ()); if (!session->localDestination->IsAcceptingStreams ()) { m_IsAccepting = true; @@ -704,17 +670,9 @@ namespace client void SAMSocket::Receive () { - if (m_BufferOffset >= SAM_SOCKET_BUFFER_SIZE) - { - LogPrint (eLogError, "SAM: Buffer is full, terminate"); - Terminate ("Buffer is full"); - return; - } else if (m_Socket) - m_Socket->async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset), - std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage, - shared_from_this (), std::placeholders::_1, std::placeholders::_2)); - else - LogPrint(eLogError, "SAM: receive with no native socket"); + m_Socket.async_read_some (boost::asio::buffer(m_Buffer + m_BufferOffset, SAM_SOCKET_BUFFER_SIZE - m_BufferOffset), + std::bind((m_SocketType == eSAMSocketTypeStream) ? &SAMSocket::HandleReceived : &SAMSocket::HandleMessage, + shared_from_this (), std::placeholders::_1, std::placeholders::_2)); } void SAMSocket::HandleReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred) @@ -731,15 +689,12 @@ namespace client { bytes_transferred += m_BufferOffset; m_BufferOffset = 0; - auto s = shared_from_this (); m_Stream->AsyncSend ((uint8_t *)m_Buffer, bytes_transferred, - [s](const boost::system::error_code& ecode) - { - if (!ecode) - s->m_Owner.GetService ().post ([s] { s->Receive (); }); - else - s->m_Owner.GetService ().post ([s] { s->Terminate ("AsyncSend failed"); }); - }); + std::bind(&SAMSocket::HandleStreamSend, shared_from_this(), std::placeholders::_1)); + } + else + { + Terminate("No Stream Remaining"); } } } @@ -773,14 +728,11 @@ namespace client void SAMSocket::WriteI2PDataImmediate(uint8_t * buff, size_t sz) { - if(m_Socket) - boost::asio::async_write ( - *m_Socket, - boost::asio::buffer (buff, sz), - boost::asio::transfer_all(), - std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination - else - LogPrint(eLogError, "SAM: no native socket"); + boost::asio::async_write ( + m_Socket, + boost::asio::buffer (buff, sz), + boost::asio::transfer_all(), + std::bind (&SAMSocket::HandleWriteI2PDataImmediate, shared_from_this (), std::placeholders::_1, buff)); // postpone termination } void SAMSocket::HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff) @@ -858,7 +810,7 @@ namespace client if (session) { // find more pending acceptors - for (auto it: session->ListSockets ()) + for (auto it: m_Owner.ListSockets (m_ID)) if (it->m_SocketType == eSAMSocketTypeAcceptor) { it->m_IsAccepting = true; @@ -930,12 +882,19 @@ namespace client } } - SAMSession::SAMSession (std::shared_ptr dest): + void SAMSocket::HandleStreamSend(const boost::system::error_code & ec) + { + m_Owner.GetService ().post (std::bind( !ec ? &SAMSocket::Receive : &SAMSocket::TerminateClose, shared_from_this())); + } + + SAMSession::SAMSession (SAMBridge & parent, const std::string & id, std::shared_ptr dest): + m_Bridge(parent), localDestination (dest), - UDPEndpoint(nullptr) + UDPEndpoint(nullptr), + Name(id) { } - + SAMSession::~SAMSession () { CloseStreams(); @@ -944,15 +903,10 @@ namespace client void SAMSession::CloseStreams () { - std::vector > socks; + for(const auto & itr : m_Bridge.ListSockets(Name)) { - std::lock_guard lock(m_SocketsMutex); - for (const auto& sock : m_Sockets) { - socks.push_back(sock); - } + itr->Terminate(nullptr); } - for (auto & sock : socks ) sock->Terminate("SAMSession::CloseStreams()"); - m_Sockets.clear(); } SAMBridge::SAMBridge (const std::string& address, int port): @@ -1009,12 +963,16 @@ namespace client void SAMBridge::Accept () { - auto native = std::make_shared(m_Service); - auto newSocket = std::make_shared (*this, native); - m_Acceptor.async_accept (*native, std::bind (&SAMBridge::HandleAccept, this, + auto newSocket = std::make_shared(*this); + m_Acceptor.async_accept (newSocket->GetSocket(), std::bind (&SAMBridge::HandleAccept, this, std::placeholders::_1, newSocket)); } + void SAMBridge::RemoveSocket(const SAMSocket * socket) + { + m_OpenSockets.remove_if([socket](const std::shared_ptr & item) -> bool { return item.get() == socket; }); + } + void SAMBridge::HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket) { if (!ecode) @@ -1024,6 +982,7 @@ namespace client if (!ec) { LogPrint (eLogDebug, "SAM: new connection from ", ep); + m_OpenSockets.push_back(socket); socket->ReceiveHandshake (); } else @@ -1066,7 +1025,7 @@ namespace client if (localDestination) { localDestination->Acquire (); - auto session = std::make_shared(localDestination); + auto session = std::make_shared(*this, id, localDestination); std::unique_lock l(m_SessionsMutex); auto ret = m_Sessions.insert (std::make_pair(id, session)); if (!ret.second) @@ -1105,6 +1064,18 @@ namespace client return nullptr; } + std::list > SAMBridge::ListSockets(const std::string & id) const + { + std::list > list; + { + std::unique_lock l(m_SessionsMutex); + for (const auto & itr : m_OpenSockets) + if (itr->IsSession(id)) + list.push_back(itr); + } + return list; + } + void SAMBridge::SendTo(const uint8_t * buf, size_t len, std::shared_ptr remote) { if(remote) @@ -1127,33 +1098,38 @@ namespace client { m_DatagramReceiveBuffer[bytes_transferred] = 0; char * eol = strchr ((char *)m_DatagramReceiveBuffer, '\n'); - *eol = 0; eol++; - size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer); - LogPrint (eLogDebug, "SAM: datagram received ", m_DatagramReceiveBuffer," size=", payloadLen); - char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' '); - if (sessionID) + if(eol) { - sessionID++; - char * destination = strchr (sessionID, ' '); - if (destination) + *eol = 0; eol++; + size_t payloadLen = bytes_transferred - ((uint8_t *)eol - m_DatagramReceiveBuffer); + LogPrint (eLogDebug, "SAM: datagram received ", m_DatagramReceiveBuffer," size=", payloadLen); + char * sessionID = strchr ((char *)m_DatagramReceiveBuffer, ' '); + if (sessionID) { - *destination = 0; destination++; - auto session = FindSession (sessionID); - if (session) + sessionID++; + char * destination = strchr (sessionID, ' '); + if (destination) { - i2p::data::IdentityEx dest; - dest.FromBase64 (destination); - session->localDestination->GetDatagramDestination ()-> - SendDatagramTo ((uint8_t *)eol, payloadLen, dest.GetIdentHash ()); + *destination = 0; destination++; + auto session = FindSession (sessionID); + if (session) + { + i2p::data::IdentityEx dest; + dest.FromBase64 (destination); + session->localDestination->GetDatagramDestination ()-> + SendDatagramTo ((uint8_t *)eol, payloadLen, dest.GetIdentHash ()); + } + else + LogPrint (eLogError, "SAM: Session ", sessionID, " not found"); } else - LogPrint (eLogError, "SAM: Session ", sessionID, " not found"); + LogPrint (eLogError, "SAM: Missing destination key"); } else - LogPrint (eLogError, "SAM: Missing destination key"); + LogPrint (eLogError, "SAM: Missing sessionID"); } else - LogPrint (eLogError, "SAM: Missing sessionID"); + LogPrint(eLogError, "SAM: invalid datagram"); ReceiveDatagram (); } else diff --git a/libi2pd_client/SAM.h b/libi2pd_client/SAM.h index 6ecd14a4..d14e5e39 100644 --- a/libi2pd_client/SAM.h +++ b/libi2pd_client/SAM.h @@ -77,24 +77,27 @@ namespace client class SAMBridge; struct SAMSession; - class SAMSocket: public std::enable_shared_from_this + class SAMSocket :public std::enable_shared_from_this { public: typedef boost::asio::ip::tcp::socket Socket_t; - SAMSocket (SAMBridge& owner, std::shared_ptr socket); + SAMSocket (SAMBridge& owner); ~SAMSocket (); - boost::asio::ip::tcp::socket& GetSocket () { return *m_Socket; }; + Socket_t& GetSocket () { return m_Socket; }; void ReceiveHandshake (); void SetSocketType (SAMSocketType socketType) { m_SocketType = socketType; }; SAMSocketType GetSocketType () const { return m_SocketType; }; void Terminate (const char* reason); + bool IsSession(const std::string & id) const; + private: - - void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void TerminateClose() { Terminate(nullptr); } + + void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred); void SendMessageReply (const char * msg, size_t len, bool close); @@ -128,10 +131,12 @@ namespace client void WriteI2PDataImmediate(uint8_t * ptr, size_t sz); void HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff); + void HandleStreamSend(const boost::system::error_code & ec); + private: SAMBridge& m_Owner; - std::shared_ptr m_Socket; + Socket_t m_Socket; boost::asio::deadline_timer m_Timer; char m_Buffer[SAM_SOCKET_BUFFER_SIZE + 1]; size_t m_BufferOffset; @@ -145,34 +150,12 @@ namespace client struct SAMSession { + SAMBridge & m_Bridge; std::shared_ptr localDestination; - std::list > m_Sockets; std::shared_ptr UDPEndpoint; - std::mutex m_SocketsMutex; - - /** safely add a socket to this session */ - void AddSocket(std::shared_ptr sock) { - std::lock_guard lock(m_SocketsMutex); - m_Sockets.push_back(sock); - } - - /** safely remove a socket from this session */ - void DelSocket(SAMSocket * sock) { - std::lock_guard lock(m_SocketsMutex); - m_Sockets.remove_if([sock](const std::shared_ptr s) -> bool { return s.get() == sock; }); - } - - /** get a list holding a copy of all sam sockets from this session */ - std::list > ListSockets() { - std::list > l; - { - std::lock_guard lock(m_SocketsMutex); - for(const auto& sock : m_Sockets ) l.push_back(sock); - } - return l; - } - - SAMSession (std::shared_ptr dest); + std::string Name; + + SAMSession (SAMBridge & parent, const std::string & name, std::shared_ptr dest); ~SAMSession (); void CloseStreams (); @@ -194,15 +177,19 @@ namespace client void CloseSession (const std::string& id); std::shared_ptr FindSession (const std::string& id) const; + std::list > ListSockets(const std::string & id) const; + /** send raw data to remote endpoint from our UDP Socket */ void SendTo(const uint8_t * buf, size_t len, std::shared_ptr remote); + void RemoveSocket(const SAMSocket * socket); + private: void Run (); void Accept (); - void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket); + void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket); void ReceiveDatagram (); void HandleReceivedDatagram (const boost::system::error_code& ecode, std::size_t bytes_transferred); @@ -217,6 +204,7 @@ namespace client boost::asio::ip::udp::socket m_DatagramSocket; mutable std::mutex m_SessionsMutex; std::map > m_Sessions; + std::list > m_OpenSockets; uint8_t m_DatagramReceiveBuffer[i2p::datagram::MAX_DATAGRAM_SIZE+1]; public: From b7a67b4b03c319fd9fc4432e2ac8751c75d5ff39 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 24 Apr 2018 09:56:24 -0400 Subject: [PATCH 03/10] use refernce not copy --- libi2pd_client/SAM.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index b8a72f0c..71b5bea1 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -810,7 +810,7 @@ namespace client if (session) { // find more pending acceptors - for (auto it: m_Owner.ListSockets (m_ID)) + for (auto & it: m_Owner.ListSockets (m_ID)) if (it->m_SocketType == eSAMSocketTypeAcceptor) { it->m_IsAccepting = true; From 60463fdafa97ba68f41430c78bb946ff41bbc1d7 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 24 Apr 2018 11:11:48 -0400 Subject: [PATCH 04/10] shut down socket and don't allocate buffer for each write in WriteI2PData --- libi2pd_client/SAM.cpp | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index 71b5bea1..b0ffda51 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -25,8 +25,7 @@ namespace client SAMSocket::~SAMSocket () { - m_Stream.reset (); - if (m_Socket.is_open()) m_Socket.close (); + m_Stream.reset (); } void SAMSocket::Terminate (const char* reason) @@ -59,7 +58,11 @@ namespace client ; } m_SocketType = eSAMSocketTypeTerminated; - if (m_Socket.is_open()) m_Socket.close (); + if (m_Socket.is_open ()) + { + m_Socket.shutdown (); + m_Socket.close (); + } m_Owner.RemoveSocket(this); } @@ -742,9 +745,11 @@ namespace client void SAMSocket::WriteI2PData(size_t sz) { - uint8_t * sendbuff = new uint8_t[sz]; - memcpy(sendbuff, m_StreamBuffer, sz); - WriteI2PDataImmediate(sendbuff, sz); + boost::asio::async_write ( + m_Socket, + boost::asio::buffer (m_StreamBuffer, sz), + boost::asio::transfer_all(), + std::bind(&SAMSocket::HandleWriteI2PData, shared_from_this(), std::placeholders::_1)); } void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) @@ -778,7 +783,8 @@ namespace client { WriteI2PData(bytes_transferred); } - I2PReceive(); + else + I2PReceive(); } } } @@ -897,7 +903,6 @@ namespace client SAMSession::~SAMSession () { - CloseStreams(); i2p::client::context.DeleteLocalDestination (localDestination); } From 5f525d0e4344bb5d062bb22be285d62aa58d4e52 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 24 Apr 2018 11:16:15 -0400 Subject: [PATCH 05/10] fix previous commit --- libi2pd_client/SAM.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index b0ffda51..6511d8ba 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -60,7 +60,8 @@ namespace client m_SocketType = eSAMSocketTypeTerminated; if (m_Socket.is_open ()) { - m_Socket.shutdown (); + boost::system::error_code ec; + m_Socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec); m_Socket.close (); } m_Owner.RemoveSocket(this); @@ -749,7 +750,7 @@ namespace client m_Socket, boost::asio::buffer (m_StreamBuffer, sz), boost::asio::transfer_all(), - std::bind(&SAMSocket::HandleWriteI2PData, shared_from_this(), std::placeholders::_1)); + std::bind(&SAMSocket::HandleWriteI2PData, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } void SAMSocket::HandleI2PReceive (const boost::system::error_code& ecode, std::size_t bytes_transferred) From 73b3fbc2daefc3258d3d1bd1c16f9fd17d528866 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 24 Apr 2018 11:42:37 -0400 Subject: [PATCH 06/10] wrap m_OpenSockets with mutex --- libi2pd_client/SAM.cpp | 16 ++++++++++------ libi2pd_client/SAM.h | 3 ++- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index 6511d8ba..0edc8252 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -33,7 +33,7 @@ namespace client if(m_Stream) { m_Stream->AsyncClose (); - m_Stream.reset (); + m_Stream = nullptr; } auto Session = m_Owner.FindSession(m_ID); switch (m_SocketType) @@ -64,7 +64,7 @@ namespace client m_Socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec); m_Socket.close (); } - m_Owner.RemoveSocket(this); + m_Owner.RemoveSocket(shared_from_this()); } void SAMSocket::ReceiveHandshake () @@ -974,9 +974,10 @@ namespace client std::placeholders::_1, newSocket)); } - void SAMBridge::RemoveSocket(const SAMSocket * socket) + void SAMBridge::RemoveSocket(const std::shared_ptr & socket) { - m_OpenSockets.remove_if([socket](const std::shared_ptr & item) -> bool { return item.get() == socket; }); + std::unique_lock lock(m_OpenSocketsMutex); + m_OpenSockets.remove_if([socket](const std::shared_ptr & item) -> bool { return item == socket; }); } void SAMBridge::HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket) @@ -988,7 +989,10 @@ namespace client if (!ec) { LogPrint (eLogDebug, "SAM: new connection from ", ep); - m_OpenSockets.push_back(socket); + { + std::unique_lock l(m_OpenSocketsMutex); + m_OpenSockets.push_back(socket); + } socket->ReceiveHandshake (); } else @@ -1074,7 +1078,7 @@ namespace client { std::list > list; { - std::unique_lock l(m_SessionsMutex); + std::unique_lock l(m_OpenSocketsMutex); for (const auto & itr : m_OpenSockets) if (itr->IsSession(id)) list.push_back(itr); diff --git a/libi2pd_client/SAM.h b/libi2pd_client/SAM.h index d14e5e39..23cdf170 100644 --- a/libi2pd_client/SAM.h +++ b/libi2pd_client/SAM.h @@ -182,7 +182,7 @@ namespace client /** send raw data to remote endpoint from our UDP Socket */ void SendTo(const uint8_t * buf, size_t len, std::shared_ptr remote); - void RemoveSocket(const SAMSocket * socket); + void RemoveSocket(const std::shared_ptr & socket); private: @@ -204,6 +204,7 @@ namespace client boost::asio::ip::udp::socket m_DatagramSocket; mutable std::mutex m_SessionsMutex; std::map > m_Sessions; + mutable std::mutex m_OpenSocketsMutex; std::list > m_OpenSockets; uint8_t m_DatagramReceiveBuffer[i2p::datagram::MAX_DATAGRAM_SIZE+1]; From 623433099b2bd8456adad3f40abeda4ce8dee26d Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 24 Apr 2018 11:50:51 -0400 Subject: [PATCH 07/10] don't use reset --- libi2pd_client/SAM.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index 0edc8252..41db644d 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -25,7 +25,7 @@ namespace client SAMSocket::~SAMSocket () { - m_Stream.reset (); + m_Stream = nullptr; } void SAMSocket::Terminate (const char* reason) From 1e1e4da14471b190be0997b9be3948183fb7fbd7 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 24 Apr 2018 14:02:48 -0400 Subject: [PATCH 08/10] delete buffer --- libi2pd_client/SAM.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index 41db644d..d228e317 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -725,7 +725,10 @@ namespace client WriteI2PDataImmediate(buff, len); } else // no more data + { + delete [] buff; Terminate ("no more data"); + } } } } From b046c45a9e7db86906684441818cc7153f269314 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 25 Apr 2018 11:25:49 -0400 Subject: [PATCH 09/10] tabify --- libi2pd_client/SAM.cpp | 6 +++--- libi2pd_client/SAM.h | 32 ++++++++++++++++---------------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index d228e317..ac2dd853 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -207,7 +207,7 @@ namespace client void SAMSocket::HandleMessageReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred, bool close) { if (ecode) - { + { LogPrint (eLogError, "SAM: reply send error: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate ("SAM: reply send error"); @@ -224,7 +224,7 @@ namespace client void SAMSocket::HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred) { if (ecode) - { + { LogPrint (eLogError, "SAM: read error: ", ecode.message ()); if (ecode != boost::asio::error::operation_aborted) Terminate ("SAM: read error"); @@ -569,7 +569,7 @@ namespace client keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ()); #else size_t l = snprintf (m_Buffer, SAM_SOCKET_BUFFER_SIZE, SAM_DEST_REPLY, - keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ()); + keys.GetPublic ()->ToBase64 ().c_str (), keys.ToBase64 ().c_str ()); #endif SendMessageReply (m_Buffer, l, false); } diff --git a/libi2pd_client/SAM.h b/libi2pd_client/SAM.h index 23cdf170..0c70758f 100644 --- a/libi2pd_client/SAM.h +++ b/libi2pd_client/SAM.h @@ -92,12 +92,12 @@ namespace client void Terminate (const char* reason); - bool IsSession(const std::string & id) const; - + bool IsSession(const std::string & id) const; + private: - void TerminateClose() { Terminate(nullptr); } - - void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); + void TerminateClose() { Terminate(nullptr); } + + void HandleHandshakeReceived (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleHandshakeReplySent (const boost::system::error_code& ecode, std::size_t bytes_transferred); void HandleMessage (const boost::system::error_code& ecode, std::size_t bytes_transferred); void SendMessageReply (const char * msg, size_t len, bool close); @@ -131,8 +131,8 @@ namespace client void WriteI2PDataImmediate(uint8_t * ptr, size_t sz); void HandleWriteI2PDataImmediate(const boost::system::error_code & ec, uint8_t * buff); - void HandleStreamSend(const boost::system::error_code & ec); - + void HandleStreamSend(const boost::system::error_code & ec); + private: SAMBridge& m_Owner; @@ -150,10 +150,10 @@ namespace client struct SAMSession { - SAMBridge & m_Bridge; + SAMBridge & m_Bridge; std::shared_ptr localDestination; std::shared_ptr UDPEndpoint; - std::string Name; + std::string Name; SAMSession (SAMBridge & parent, const std::string & name, std::shared_ptr dest); ~SAMSession (); @@ -172,24 +172,24 @@ namespace client void Stop (); boost::asio::io_service& GetService () { return m_Service; }; - std::shared_ptr CreateSession (const std::string& id, const std::string& destination, // empty string means transient + std::shared_ptr CreateSession (const std::string& id, const std::string& destination, // empty string means transient const std::map * params); void CloseSession (const std::string& id); std::shared_ptr FindSession (const std::string& id) const; - std::list > ListSockets(const std::string & id) const; + std::list > ListSockets(const std::string & id) const; /** send raw data to remote endpoint from our UDP Socket */ void SendTo(const uint8_t * buf, size_t len, std::shared_ptr remote); - void RemoveSocket(const std::shared_ptr & socket); - + void RemoveSocket(const std::shared_ptr & socket); + private: void Run (); void Accept (); - void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket); + void HandleAccept(const boost::system::error_code& ecode, std::shared_ptr socket); void ReceiveDatagram (); void HandleReceivedDatagram (const boost::system::error_code& ecode, std::size_t bytes_transferred); @@ -204,8 +204,8 @@ namespace client boost::asio::ip::udp::socket m_DatagramSocket; mutable std::mutex m_SessionsMutex; std::map > m_Sessions; - mutable std::mutex m_OpenSocketsMutex; - std::list > m_OpenSockets; + mutable std::mutex m_OpenSocketsMutex; + std::list > m_OpenSockets; uint8_t m_DatagramReceiveBuffer[i2p::datagram::MAX_DATAGRAM_SIZE+1]; public: From 0ced38cdcbcae6f6bfe1ab653d85f201f194c247 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 25 Apr 2018 11:27:56 -0400 Subject: [PATCH 10/10] tabify --- libi2pd/Streaming.h | 4 ++-- libi2pd_client/SAM.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libi2pd/Streaming.h b/libi2pd/Streaming.h index 7f2598c0..3db8d760 100644 --- a/libi2pd/Streaming.h +++ b/libi2pd/Streaming.h @@ -165,9 +165,9 @@ namespace stream void AsyncReceive (const Buffer& buffer, ReceiveHandler handler, int timeout = 0); size_t ReadSome (uint8_t * buf, size_t len) { return ConcatenatePackets (buf, len); }; - void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); }; + void AsyncClose() { m_Service.post(std::bind(&Stream::Close, shared_from_this())); }; - /** only call close from destination thread, use Stream::AsyncClose for other threads */ + /** only call close from destination thread, use Stream::AsyncClose for other threads */ void Close (); void Cancel () { m_ReceiveTimer.cancel (); }; diff --git a/libi2pd_client/SAM.h b/libi2pd_client/SAM.h index 0c70758f..953af1cd 100644 --- a/libi2pd_client/SAM.h +++ b/libi2pd_client/SAM.h @@ -77,7 +77,7 @@ namespace client class SAMBridge; struct SAMSession; - class SAMSocket :public std::enable_shared_from_this + class SAMSocket: public std::enable_shared_from_this { public: