diff --git a/libi2pd_client/SAM.cpp b/libi2pd_client/SAM.cpp index 1a281c57..54da4654 100644 --- a/libi2pd_client/SAM.cpp +++ b/libi2pd_client/SAM.cpp @@ -601,10 +601,15 @@ namespace client SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, shared_from_this (), std::placeholders::_1)); } - else // already accepting + else if (session->acceptQueue.size () < SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE) + { + // already accepting, queue up + SendMessageReply (SAM_STREAM_STATUS_OK, strlen(SAM_STREAM_STATUS_OK), false); + session->acceptQueue.push_back (shared_from_this()); + } + else { - // TODO: implement queue - LogPrint (eLogInfo, "SAM: Session ", m_ID, " is already accepting"); + LogPrint (eLogInfo, "SAM: Session ", m_ID, " accept queue is full ", session->acceptQueue.size ()); SendStreamI2PError ("Already accepting"); } } @@ -1057,16 +1062,16 @@ namespace client m_Stream = stream; context.GetAddressBook ().InsertFullAddress (stream->GetRemoteIdentity ()); auto session = m_Owner.FindSession (m_ID); - if (session) + if (session && !session->acceptQueue.empty ()) { - // find more pending acceptors - for (auto & it: m_Owner.ListSockets (m_ID)) - if (it->m_SocketType == eSAMSocketTypeAcceptor) - { - it->m_IsAccepting = true; - session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, it, std::placeholders::_1)); - break; - } + // pending acceptors + auto socket = session->acceptQueue.front (); + session->acceptQueue.pop_front (); + if (socket && socket->GetSocketType () == eSAMSocketTypeAcceptor) + { + socket->m_IsAccepting = true; + session->GetLocalDestination ()->AcceptOnce (std::bind (&SAMSocket::HandleI2PAccept, socket, std::placeholders::_1)); + } } if (!m_IsSilent) { diff --git a/libi2pd_client/SAM.h b/libi2pd_client/SAM.h index 736c0945..2841afb9 100644 --- a/libi2pd_client/SAM.h +++ b/libi2pd_client/SAM.h @@ -31,6 +31,8 @@ namespace client const size_t SAM_SOCKET_BUFFER_SIZE = 8192; const int SAM_SOCKET_CONNECTION_MAX_IDLE = 3600; // in seconds const int SAM_SESSION_READINESS_CHECK_INTERVAL = 3; // in seconds + const size_t SAM_SESSION_MAX_ACCEPT_QUEUE_SIZE = 64; + const char SAM_HANDSHAKE[] = "HELLO VERSION"; const char SAM_HANDSHAKE_REPLY[] = "HELLO REPLY RESULT=OK VERSION=%s\n"; const char SAM_HANDSHAKE_NOVERSION[] = "HELLO REPLY RESULT=NOVERSION\n"; @@ -189,7 +191,8 @@ namespace client std::string Name; SAMSessionType Type; std::shared_ptr UDPEndpoint; // TODO: move - + std::list > acceptQueue; + SAMSession (SAMBridge & parent, const std::string & name, SAMSessionType type); virtual ~SAMSession () {};