diff --git a/include/llarp/service/endpoint.hpp b/include/llarp/service/endpoint.hpp index ea8253a94..9907180b8 100644 --- a/include/llarp/service/endpoint.hpp +++ b/include/llarp/service/endpoint.hpp @@ -161,6 +161,8 @@ namespace llarp IntroSet currentIntroSet; /// the current selected intro Introduction selectedIntro; + /// set to true if we are updating the remote introset right now + bool updatingIntroSet; /// update the current selected intro to be a new best introduction void diff --git a/llarp/link/utp.cpp b/llarp/link/utp.cpp index 1bc33f1e6..84ea5433d 100644 --- a/llarp/link/utp.cpp +++ b/llarp/link/utp.cpp @@ -31,6 +31,8 @@ namespace llarp FragmentOverheadSize + FragmentBodySize; typedef llarp::AlignedBuffer< FragmentBufferSize > FragmentBuffer; + constexpr size_t MaxSend = 64; + typedef llarp::AlignedBuffer< MAX_LINK_MSG_SIZE > MessageBuffer; struct LinkLayer; @@ -47,8 +49,8 @@ namespace llarp llarp_time_t lastActive; const static llarp_time_t sessionTimeout = 30 * 1000; + std::deque< utp_iovec > vecq; std::deque< FragmentBuffer > sendq; - size_t sendBufOffset; FragmentBuffer recvBuf; size_t recvBufOffset; @@ -101,11 +103,41 @@ namespace llarp void PumpWrite() { + if(!sock) + return; + ssize_t expect = 0; + std::vector< utp_iovec > vecs; + for(const auto& vec : vecq) + { + expect += vec.iov_len; + vecs.push_back(vec); + } + if(expect) + { + ssize_t s = utp_writev(sock, vecs.data(), vecs.size()); + llarp::LogDebug("utp_writev wrote=", s, " expect=", expect, + " to=", remoteAddr); + + while(s > vecq.front().iov_len) + { + s -= vecq.front().iov_len; + vecq.pop_front(); + sendq.pop_front(); + } + if(vecq.size()) + { + auto& front = vecq.front(); + front.iov_len -= s; + front.iov_base = ((byte_t*)front.iov_base) + s; + } + } + + /* while(sendq.size() > 0 && !stalled) { ssize_t expect = FragmentBufferSize - sendBufOffset; - ssize_t s = write_ll(sendq.front().data() + sendBufOffset, expect); - if(s != expect) + ssize_t s = write_ll(sendq.front().data() + sendBufOffset, + expect); if(s != expect) { llarp::LogDebug("stalled at offset=", sendBufOffset, " sz=", s, " to ", remoteAddr); @@ -118,6 +150,7 @@ namespace llarp sendq.pop_front(); } } + */ } ssize_t @@ -540,14 +573,14 @@ namespace llarp return true; }; gotLIM = false; - sendBufOffset = 0; recvBufOffset = 0; TimedOut = [&](llarp_time_t now) -> bool { return this->IsTimedOut(now) || this->state == eClose; }; GetPubKey = std::bind(&BaseSession::RemotePubKey, this); lastActive = llarp_time_now_ms(); - Pump = std::bind(&BaseSession::PumpWrite, this); + // Pump = []() {}; + Pump = std::bind(&BaseSession::PumpWrite, this); Tick = std::bind(&BaseSession::TickImpl, this, std::placeholders::_1); SendMessageBuffer = std::bind(&BaseSession::QueueWriteBuffers, this, std::placeholders::_1); @@ -743,12 +776,7 @@ namespace llarp } else if(arg->state == UTP_STATE_WRITABLE) { - if(session->IsEstablished()) - { - llarp::LogDebug("write resumed for ", session->remoteAddr); - session->stalled = false; - session->PumpWrite(); - } + session->PumpWrite(); } else if(arg->state == UTP_STATE_EOF) { @@ -779,6 +807,10 @@ namespace llarp { sendq.emplace_back(); auto& buf = sendq.back(); + vecq.emplace_back(); + auto& vec = vecq.back(); + vec.iov_base = buf.data(); + vec.iov_len = FragmentBufferSize; llarp::LogDebug("encrypt then hash ", sz, " bytes last=", isLastFragment); buf.Randomize(); byte_t* nonce = buf.data() + FragmentHashSize; diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 209522e03..4446b243e 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -720,8 +720,8 @@ namespace llarp auto itr = m_PendingServiceLookups.find(addr); if(itr != m_PendingServiceLookups.end()) { - itr->second(addr, nullptr); m_PendingServiceLookups.erase(itr); + itr->second(addr, nullptr); } return false; } @@ -778,6 +778,7 @@ namespace llarp , m_Parent(parent) { + updatingIntroSet = false; selectedIntro.Clear(); ShiftIntroduction(); } @@ -795,6 +796,7 @@ namespace llarp currentIntroSet = *i; ShiftIntroduction(); } + updatingIntroSet = false; return true; } @@ -843,7 +845,7 @@ namespace llarp for(const auto& intro : currentIntroSet.I) { m_Parent->EnsureRouterIsKnown(selectedIntro.router); - if(intro.expiresAt > selectedIntro.expiresAt) + if(selectedIntro < intro) { selectedIntro = intro; shifted = true; @@ -1008,6 +1010,8 @@ namespace llarp void Endpoint::OutboundContext::UpdateIntroSet() { + if(updatingIntroSet) + return; auto addr = currentIntroSet.A.Addr(); auto path = m_Parent->PickRandomEstablishedPath(); if(path) @@ -1018,8 +1022,7 @@ namespace llarp std::placeholders::_1, std::placeholders::_2), addr, m_Parent->GenTXID()); - if(!job->SendRequestViaPath(path, m_Parent->Router())) - llarp::LogError("send via path failed"); + updatingIntroSet = job->SendRequestViaPath(path, m_Parent->Router()); } else { @@ -1032,10 +1035,9 @@ namespace llarp bool Endpoint::OutboundContext::Tick(llarp_time_t now) { - if(selectedIntro.expiresAt <= now - || selectedIntro.expiresAt - now < 30000) + if(selectedIntro.ExpiresSoon(now)) { - UpdateIntroSet(); + ShiftIntroduction(); } m_Parent->EnsureRouterIsKnown(selectedIntro.router); // TODO: check for expiration of outbound context