diff --git a/llarp/link/utp.cpp b/llarp/link/utp.cpp index 267c6ec6c..d68be1b12 100644 --- a/llarp/link/utp.cpp +++ b/llarp/link/utp.cpp @@ -595,7 +595,7 @@ namespace llarp lastActive = parent->Now(); - Pump = std::bind(&Session::PumpWrite, this); + Pump = std::bind(&Session::DoPump, this); Tick = std::bind(&Session::TickImpl, this, std::placeholders::_1); SendMessageBuffer = std::bind(&Session::QueueWriteBuffers, this, std::placeholders::_1); @@ -717,6 +717,15 @@ namespace llarp return true; } + void + Session::DoPump() + { + // pump write queue + PumpWrite(); + // prune inbound messages + PruneInboundMessages(parent->Now()); + } + bool Session::QueueWriteBuffers(llarp_buffer_t buf) { @@ -894,8 +903,14 @@ namespace llarp Addr remote(*arg->address); llarp::LogDebug("utp accepted from ", remote); Session* session = new Session(self, arg->socket, remote); - self->PutSession(session); - session->OnLinkEstablished(self); + if(!self->PutSession(session)) + { + session->Close(); + delete session; + } + else + session->OnLinkEstablished(self); + return 0; } diff --git a/llarp/link/utp_internal.hpp b/llarp/link/utp_internal.hpp index e73a400a4..bb1bba35b 100644 --- a/llarp/link/utp_internal.hpp +++ b/llarp/link/utp_internal.hpp @@ -174,6 +174,9 @@ namespace llarp void PumpWrite(); + void + DoPump(); + /// verify a fragment buffer and the decrypt it /// buf is assumed to be FragmentBufferSize bytes long bool