From 14c9ef15edef1607ec93f985736f22cd5b2e82cb Mon Sep 17 00:00:00 2001 From: jeff Date: Mon, 16 Sep 2019 06:21:12 -0400 Subject: [PATCH] try calling stuff in logic thread from event loop --- llarp/ev/ev.cpp | 1 + llarp/ev/ev.hpp | 3 ++ llarp/ev/ev_libuv.cpp | 59 ++++++++++++++++++++++++------------ llarp/ev/ev_libuv.hpp | 16 ++++++++++ llarp/handlers/tun.cpp | 5 +-- llarp/iwp/linklayer.cpp | 2 +- llarp/iwp/message_buffer.cpp | 6 +++- llarp/iwp/session.cpp | 34 +++++++++++++-------- llarp/iwp/session.hpp | 5 ++- llarp/link/server.cpp | 3 +- llarp/path/ihophandler.hpp | 30 +++++++++++------- llarp/path/path.cpp | 29 ++++++++++++------ llarp/path/path.hpp | 9 ++++-- llarp/path/path_context.cpp | 14 +++++++-- llarp/path/path_context.hpp | 5 ++- llarp/path/pathbuilder.cpp | 4 +-- llarp/path/pathset.cpp | 9 ++++-- llarp/path/pathset.hpp | 5 ++- llarp/path/transit_hop.cpp | 32 ++++++++++++------- llarp/path/transit_hop.hpp | 9 ++++-- llarp/router/router.cpp | 7 ++--- llarp/service/endpoint.cpp | 1 + llarp/util/thread/logic.cpp | 17 +++++++++-- llarp/util/thread/logic.hpp | 8 ++--- 24 files changed, 213 insertions(+), 100 deletions(-) diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index f2e979a64..da98c5f32 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -36,6 +36,7 @@ void llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, std::shared_ptr< llarp::Logic > logic) { + ev->give_logic(logic); while(ev->running()) { ev->update_time(); diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index dbbd16440..5e18fe699 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -783,6 +783,9 @@ struct llarp_ev_loop return false; } + /// give this event loop a logic thread for calling + virtual void give_logic(std::shared_ptr< llarp::Logic >) = 0; + /// register event listener virtual bool add_ev(llarp::ev_io* ev, bool write) = 0; diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index a4e0b4735..a2f35200b 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -1,10 +1,19 @@ #include #include +#include #include namespace libuv { + /// call a function in logic thread via a handle + template < typename Handle, typename Func > + void + Call(Handle* h, Func&& f) + { + static_cast< Loop* >(h->loop->data)->Call(f); + } + struct glue { virtual ~glue() = default; @@ -65,8 +74,9 @@ namespace libuv static void OnOutboundConnect(uv_connect_t* c, int status) { - auto* self = static_cast< conn_glue* >(c->data); - self->HandleConnectResult(status); + conn_glue* self = static_cast< conn_glue* >(c->data); + Call(self->Stream(), + std::bind(&conn_glue::HandleConnectResult, self, status)); c->data = nullptr; } @@ -165,13 +175,14 @@ namespace libuv static void OnWritten(uv_write_t* req, int status) { + conn_glue* conn = static_cast< conn_glue* >(req->data); if(status) { llarp::LogError("write failed on tcp: ", uv_strerror(status)); - static_cast< conn_glue* >(req->data)->Close(); + conn->Close(); } else - static_cast< conn_glue* >(req->data)->DrainOne(); + Call(conn->Stream(), std::bind(&conn_glue::DrainOne, conn)); delete req; } @@ -195,7 +206,8 @@ namespace libuv static void OnClosed(uv_handle_t* h) { - static_cast< conn_glue* >(h->data)->HandleClosed(); + conn_glue* conn = static_cast< conn_glue* >(h->data); + Call(h, std::bind(&conn_glue::HandleClosed, conn)); } static void @@ -251,7 +263,8 @@ namespace libuv { if(status == 0) { - static_cast< conn_glue* >(stream->data)->Accept(); + conn_glue* conn = static_cast< conn_glue* >(stream->data); + Call(stream, std::bind(&conn_glue::Accept, conn)); } else { @@ -262,8 +275,8 @@ namespace libuv static void OnTick(uv_check_t* t) { - auto* conn = static_cast< conn_glue* >(t->data); - conn->Tick(); + conn_glue* conn = static_cast< conn_glue* >(t->data); + Call(t, std::bind(&conn_glue::Tick, conn)); } void @@ -331,7 +344,8 @@ namespace libuv static void OnTick(uv_check_t* t) { - static_cast< ticker_glue* >(t->data)->func(); + ticker_glue* ticker = static_cast< ticker_glue* >(t->data); + Call(&ticker->m_Ticker, [ticker]() { ticker->func(); }); } bool @@ -393,14 +407,14 @@ namespace libuv const size_t pktsz = sz; const llarp_buffer_t pkt{(const byte_t*)buf->base, pktsz}; m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt}); - gotpkts = true; } } static void OnTick(uv_check_t* t) { - static_cast< udp_glue* >(t->data)->Tick(); + udp_glue* udp = static_cast< udp_glue* >(t->data); + udp->Tick(); } void @@ -408,7 +422,6 @@ namespace libuv { if(m_UDP && m_UDP->tick) m_UDP->tick(m_UDP); - gotpkts = false; } static int @@ -443,6 +456,7 @@ namespace libuv if(uv_fileno((const uv_handle_t*)&m_Handle, &m_UDP->fd)) return false; m_UDP->sendto = &SendTo; + m_UDP->impl = this; return true; } @@ -452,8 +466,7 @@ namespace libuv auto* glue = static_cast< udp_glue* >(h->data); if(glue) { - h->data = nullptr; - glue->m_UDP->impl = nullptr; + h->data = nullptr; delete glue; } } @@ -461,6 +474,7 @@ namespace libuv void Close() override { + m_UDP->impl = nullptr; uv_check_stop(&m_Ticker); uv_close((uv_handle_t*)&m_Handle, &OnClosed); } @@ -481,7 +495,7 @@ namespace libuv void Tick() { - m_Pipe->tick(); + Call(&m_Handle, std::bind(&llarp_ev_pkt_pipe::tick, m_Pipe)); } static void @@ -520,7 +534,8 @@ namespace libuv static void OnTick(uv_check_t* h) { - static_cast< pipe_glue* >(h->data)->Tick(); + pipe_glue* pipe = static_cast< pipe_glue* >(h->data); + Call(h, std::bind(&pipe_glue::Tick, pipe)); } bool @@ -561,7 +576,8 @@ namespace libuv static void OnTick(uv_check_t* timer) { - static_cast< tun_glue* >(timer->data)->Tick(); + tun_glue* tun = static_cast< tun_glue* >(timer->data); + Call(timer, std::bind(&tun_glue::Tick, tun)); } static void @@ -601,8 +617,7 @@ namespace libuv auto* self = static_cast< tun_glue* >(h->data); if(self) { - self->m_Tun->impl = nullptr; - h->data = nullptr; + h->data = nullptr; delete self; } } @@ -610,6 +625,7 @@ namespace libuv void Close() override { + m_Tun->impl = nullptr; uv_check_stop(&m_Ticker); uv_close((uv_handle_t*)&m_Handle, &OnClosed); } @@ -623,7 +639,8 @@ namespace libuv static bool WritePkt(llarp_tun_io* tun, const byte_t* pkt, size_t sz) { - return static_cast< tun_glue* >(tun->impl)->Write(pkt, sz); + tun_glue* glue = static_cast< tun_glue* >(tun->impl); + return glue && glue->Write(pkt, sz); } bool @@ -670,6 +687,7 @@ namespace libuv return false; } m_Tun->writepkt = &WritePkt; + m_Tun->impl = this; return true; } }; @@ -680,6 +698,7 @@ namespace libuv m_Impl.reset(uv_loop_new()); if(uv_loop_init(m_Impl.get()) == -1) return false; + m_Impl->data = this; uv_loop_configure(m_Impl.get(), UV_LOOP_BLOCK_SIGNAL, SIGPIPE); m_TickTimer.data = this; m_Run.store(true); diff --git a/llarp/ev/ev_libuv.hpp b/llarp/ev/ev_libuv.hpp index ab19fec73..c04e6a370 100644 --- a/llarp/ev/ev_libuv.hpp +++ b/llarp/ev/ev_libuv.hpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace libuv { @@ -88,6 +89,20 @@ namespace libuv return false; } + void + give_logic(std::shared_ptr< llarp::Logic > l) override + { + m_Logic = l; + } + + /// call function in logic thread + template < typename F > + void + Call(F f) + { + m_Logic->queue_func(f); + } + private: struct DestructLoop { @@ -101,6 +116,7 @@ namespace libuv std::unique_ptr< uv_loop_t, DestructLoop > m_Impl; uv_timer_t m_TickTimer; std::atomic< bool > m_Run; + std::shared_ptr< llarp::Logic > m_Logic; }; } // namespace libuv diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 27fa27c53..57588e343 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -314,11 +314,7 @@ namespace llarp RouterLogic()->queue_func([=] { self->m_ExitMap.ForEachValue( [](const auto &exit) { exit->FlushUpstream(); }); - self->Router()->PumpLL(); - }); - RouterLogic()->queue_func([=]() { self->Pump(self->Now()); - self->Router()->PumpLL(); }); } @@ -788,6 +784,7 @@ namespace llarp } llarp::LogWarn(Name(), " did not flush packets"); }); + SendAllDownstream(Router()); } bool diff --git a/llarp/iwp/linklayer.cpp b/llarp/iwp/linklayer.cpp index 51bba1958..8cee5e636 100644 --- a/llarp/iwp/linklayer.cpp +++ b/llarp/iwp/linklayer.cpp @@ -28,7 +28,7 @@ namespace llarp auto itr = m_AuthedLinks.begin(); while(itr != m_AuthedLinks.end()) { - sessions.insert(itr->first); + sessions.emplace(itr->first); ++itr; } } diff --git a/llarp/iwp/message_buffer.cpp b/llarp/iwp/message_buffer.cpp index 33b3458c5..03b59a240 100644 --- a/llarp/iwp/message_buffer.cpp +++ b/llarp/iwp/message_buffer.cpp @@ -121,7 +121,11 @@ namespace llarp llarp_time_t now) { if(idx + buf.sz > m_Data.size()) + { + LogWarn("invalid fragment offset ", idx); return; + } + auto *dst = m_Data.data() + idx; std::copy_n(buf.base, buf.sz, dst); m_Acks.set(idx / FragmentSize); @@ -134,7 +138,7 @@ namespace llarp { auto acks = CreatePacket(Command::eACKS, 9, 0, 0); htobe64buf(acks.data() + 2 + PacketOverhead, m_MsgID); - acks[PacketOverhead + 8] = AcksBitmask(); + acks[PacketOverhead + 10] = AcksBitmask(); return acks; } diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index 967dc797d..5f612e3e9 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -198,19 +198,21 @@ namespace llarp Session::SendMACK() { // send multi acks - while(!m_SendMACKS.empty()) + while(m_SendMACKS.size() > 0) { - const auto sz = m_SendMACKS.size(); - const auto max = Session::MaxACKSInMACK; - auto numAcks = std::min(sz, max); - auto mack = - CreatePacket(Command::eMACK, numAcks * sizeof(uint64_t), 0, 0); + const auto sz = m_SendMACKS.size(); + const auto max = Session::MaxACKSInMACK; + auto numAcks = std::min(sz, max); + auto mack = CreatePacket(Command::eMACK, + 1 + (numAcks * sizeof(uint64_t)), 0, 0); mack[PacketOverhead + 2] = byte_t{numAcks}; byte_t* ptr = mack.data() + 3 + PacketOverhead; + LogDebug("send ", numAcks, " macks to ", m_RemoteAddr); + auto itr = m_SendMACKS.begin(); while(numAcks > 0) { - htobe64buf(ptr, m_SendMACKS.back()); - m_SendMACKS.pop_back(); + htobe64buf(ptr, *itr); + itr = m_SendMACKS.erase(itr); numAcks--; ptr += sizeof(uint64_t); } @@ -226,7 +228,6 @@ namespace llarp { if(ShouldPing()) SendKeepAlive(); - SendMACK(); for(auto& item : m_RXMsgs) { if(item.second.ShouldSendACKS(now)) @@ -562,6 +563,7 @@ namespace llarp { for(auto& result : msgs) { + LogDebug("Command ", int(result[PacketOverhead + 1])); switch(result[PacketOverhead + 1]) { case Command::eXMIT: @@ -590,6 +592,7 @@ namespace llarp " from ", m_RemoteAddr); } } + SendMACK(); } void @@ -601,16 +604,18 @@ namespace llarp return; } byte_t numAcks = data[2 + PacketOverhead]; - if(data.size() < ((numAcks * sizeof(uint64_t)) + 3)) + if(data.size() < 3 + PacketOverhead + (numAcks * sizeof(uint64_t))) { LogError("short mack from ", m_RemoteAddr); return; } + LogDebug("got ", int(numAcks), " mack from ", m_RemoteAddr); byte_t* ptr = data.data() + 3 + PacketOverhead; while(numAcks > 0) { uint64_t acked = bufbe64toh(ptr); - auto itr = m_TXMsgs.find(acked); + LogDebug("mack containing txid=", acked, " from ", m_RemoteAddr); + auto itr = m_TXMsgs.find(acked); if(itr != m_TXMsgs.end()) { itr->second.Completed(); @@ -697,6 +702,11 @@ namespace llarp htobe64buf(nack.data() + PacketOverhead + 2, rxid); EncryptAndSend(std::move(nack)); } + else + { + LogDebug("replay hit for rxid=", rxid, " for ", m_RemoteAddr); + m_SendMACKS.emplace(rxid); + } return; } @@ -714,7 +724,7 @@ namespace llarp const llarp_buffer_t buf(msg.m_Data.data(), msg.m_Size); m_Parent->HandleMessage(this, buf); m_ReplayFilter.emplace(itr->first, m_Parent->Now()); - m_SendMACKS.emplace_back(itr->first); + m_SendMACKS.emplace(itr->first); } else { diff --git a/llarp/iwp/session.hpp b/llarp/iwp/session.hpp index 1ed39eb30..4f466ca5e 100644 --- a/llarp/iwp/session.hpp +++ b/llarp/iwp/session.hpp @@ -27,8 +27,7 @@ namespace llarp /// How often to acks RX messages static constexpr llarp_time_t ACKResendInterval = 250; /// How often to retransmit TX fragments - static constexpr llarp_time_t TXFlushInterval = - (ACKResendInterval * 3) / 2; + static constexpr llarp_time_t TXFlushInterval = ACKResendInterval * 2; /// How often we send a keepalive static constexpr llarp_time_t PingInterval = 2000; /// How long we wait for a session to die with no tx from them @@ -165,7 +164,7 @@ namespace llarp /// maps rxid to time recieved std::unordered_map< uint64_t, llarp_time_t > m_ReplayFilter; /// list of rx messages to send in next set of multiacks - std::vector< uint64_t > m_SendMACKS; + std::set< uint64_t > m_SendMACKS; using CryptoQueue_t = std::vector< Packet_t >; using CryptoQueue_ptr = std::shared_ptr< CryptoQueue_t >; diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 09d37dbc6..a8075894b 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -484,7 +484,8 @@ namespace llarp ILinkLayer* link = static_cast< ILinkLayer* >(udp->user); if(link->m_Recv == nullptr) return; - link->m_Recv->emplace_back(std::make_pair(*from, buf.underlying.sz)); + link->m_Recv->emplace_back( + std::make_pair(Addr(*from), ILinkSession::Packet_t(buf.underlying.sz))); std::copy_n(buf.underlying.base, buf.underlying.sz, link->m_Recv->back().second.begin()); } diff --git a/llarp/path/ihophandler.hpp b/llarp/path/ihophandler.hpp index c0495e57c..35e4958e4 100644 --- a/llarp/path/ihophandler.hpp +++ b/llarp/path/ihophandler.hpp @@ -24,8 +24,9 @@ namespace llarp { struct IHopHandler { - using TrafficEvent_t = std::pair< std::vector< byte_t >, TunnelNonce >; - using TrafficQueue_t = std::vector< TrafficEvent_t >; + using TrafficEvent_t = std::pair< std::vector< byte_t >, TunnelNonce >; + using TrafficQueue_t = std::vector< TrafficEvent_t >; + using TrafficQueue_ptr = std::shared_ptr< TrafficQueue_t >; virtual ~IHopHandler() = default; @@ -44,8 +45,10 @@ namespace llarp HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter*) { - m_UpstreamQueue.emplace_back(); - auto& pkt = m_UpstreamQueue.back(); + if(m_UpstreamQueue == nullptr) + m_UpstreamQueue = std::make_shared< TrafficQueue_t >(); + m_UpstreamQueue->emplace_back(); + auto& pkt = m_UpstreamQueue->back(); pkt.first.resize(X.sz); std::copy_n(X.base, X.sz, pkt.first.begin()); pkt.second = Y; @@ -57,8 +60,10 @@ namespace llarp HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter*) { - m_DownstreamQueue.emplace_back(); - auto& pkt = m_DownstreamQueue.back(); + if(m_DownstreamQueue == nullptr) + m_DownstreamQueue = std::make_shared< TrafficQueue_t >(); + m_DownstreamQueue->emplace_back(); + auto& pkt = m_DownstreamQueue->back(); pkt.first.resize(X.sz); std::copy_n(X.base, X.sz, pkt.first.begin()); pkt.second = Y; @@ -79,18 +84,21 @@ namespace llarp return m_SequenceNum++; } virtual void - FlushQueues(AbstractRouter* r) = 0; + FlushUpstream(AbstractRouter* r) = 0; + + virtual void + FlushDownstream(AbstractRouter* r) = 0; protected: uint64_t m_SequenceNum = 0; - TrafficQueue_t m_UpstreamQueue; - TrafficQueue_t m_DownstreamQueue; + TrafficQueue_ptr m_UpstreamQueue; + TrafficQueue_ptr m_DownstreamQueue; virtual void - UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0; + UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0; virtual void - DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0; + DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0; virtual void HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs, diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index d24ca336b..930d6813b 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -377,11 +377,11 @@ namespace llarp } void - Path::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r) + Path::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r) { - std::vector< RelayUpstreamMessage > sendmsgs(msgs.size()); + std::vector< RelayUpstreamMessage > sendmsgs(msgs->size()); size_t idx = 0; - for(const auto& ev : msgs) + for(auto& ev : *msgs) { const llarp_buffer_t buf(ev.first); TunnelNonce n = ev.second; @@ -402,16 +402,27 @@ namespace llarp } void - Path::FlushQueues(AbstractRouter* r) + Path::FlushUpstream(AbstractRouter* r) { - if(!m_UpstreamQueue.empty()) + if(m_UpstreamQueue && !m_UpstreamQueue->empty()) + { r->threadpool()->addJob(std::bind(&Path::UpstreamWork, shared_from_this(), std::move(m_UpstreamQueue), r)); - if(!m_DownstreamQueue.empty()) + } + m_UpstreamQueue = nullptr; + } + + void + Path::FlushDownstream(AbstractRouter* r) + { + if(m_DownstreamQueue && !m_DownstreamQueue->empty()) + { r->threadpool()->addJob(std::bind(&Path::DownstreamWork, shared_from_this(), std::move(m_DownstreamQueue), r)); + } + m_DownstreamQueue = nullptr; } bool @@ -438,11 +449,11 @@ namespace llarp } void - Path::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r) + Path::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r) { - std::vector< RelayDownstreamMessage > sendMsgs(msgs.size()); + std::vector< RelayDownstreamMessage > sendMsgs(msgs->size()); size_t idx = 0; - for(auto& ev : msgs) + for(auto& ev : *msgs) { const llarp_buffer_t buf(ev.first); sendMsgs[idx].Y = ev.second; diff --git a/llarp/path/path.hpp b/llarp/path/path.hpp index e1a9b45c7..72a1c7918 100644 --- a/llarp/path/path.hpp +++ b/llarp/path/path.hpp @@ -325,14 +325,17 @@ namespace llarp SendExitClose(const routing::CloseExitMessage& msg, AbstractRouter* r); void - FlushQueues(AbstractRouter* r) override; + FlushUpstream(AbstractRouter* r) override; + + void + FlushDownstream(AbstractRouter* r) override; protected: void - UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override; + UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override; void - DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override; + DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override; void HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs, diff --git a/llarp/path/path_context.cpp b/llarp/path/path_context.cpp index c4608bbc7..f67ecce40 100644 --- a/llarp/path/path_context.cpp +++ b/llarp/path/path_context.cpp @@ -257,10 +257,18 @@ namespace llarp } void - PathContext::Pump() + PathContext::PumpUpstream() { - m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushQueues(m_Router); }); - m_OurPaths.ForEach([&](auto& ptr) { ptr->FlushQueues(m_Router); }); + m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushUpstream(m_Router); }); + m_OurPaths.ForEach([&](auto& ptr) { ptr->SendAllUpstream(m_Router); }); + } + + void + PathContext::PumpDownstream() + { + m_TransitPaths.ForEach( + [&](auto& ptr) { ptr->FlushDownstream(m_Router); }); + m_OurPaths.ForEach([&](auto& ptr) { ptr->SendAllDownstream(m_Router); }); } void diff --git a/llarp/path/path_context.hpp b/llarp/path/path_context.hpp index 412080a15..8b7c48179 100644 --- a/llarp/path/path_context.hpp +++ b/llarp/path/path_context.hpp @@ -38,7 +38,10 @@ namespace llarp ExpirePaths(llarp_time_t now); void - Pump(); + PumpUpstream(); + + void + PumpDownstream(); void AllowTransit(); diff --git a/llarp/path/pathbuilder.cpp b/llarp/path/pathbuilder.cpp index 4ef91fb9b..b67b018d4 100644 --- a/llarp/path/pathbuilder.cpp +++ b/llarp/path/pathbuilder.cpp @@ -396,7 +396,7 @@ namespace llarp for(size_t idx = 0; idx < hops.size(); ++idx) { hops[idx].Clear(); - size_t tries = 4; + size_t tries = 32; while(tries > 0 && !SelectHop(nodedb, exclude, hops[idx], idx, roles)) { --tries; @@ -406,7 +406,7 @@ namespace llarp LogWarn(Name(), " failed to select hop ", idx); return false; } - exclude.insert(hops[idx].pubkey); + exclude.emplace(hops[idx].pubkey); } return true; } diff --git a/llarp/path/pathset.cpp b/llarp/path/pathset.cpp index 79b3af3b4..2db9377cb 100644 --- a/llarp/path/pathset.cpp +++ b/llarp/path/pathset.cpp @@ -376,10 +376,15 @@ namespace llarp } void - PathSet::FlushQueues(AbstractRouter* r) + PathSet::SendAllUpstream(AbstractRouter* r) + { + ForEachPath([r](const Path_ptr& p) { p->FlushUpstream(r); }); + } + void + PathSet::SendAllDownstream(AbstractRouter* r) { - ForEachPath([r](const Path_ptr& ptr) { ptr->FlushQueues(r); }); + ForEachPath([r](const Path_ptr& p) { p->FlushDownstream(r); }); } } // namespace path diff --git a/llarp/path/pathset.hpp b/llarp/path/pathset.hpp index ed59cb532..b5beb5242 100644 --- a/llarp/path/pathset.hpp +++ b/llarp/path/pathset.hpp @@ -276,7 +276,10 @@ namespace llarp } void - FlushQueues(AbstractRouter* r); + SendAllUpstream(AbstractRouter* r); + + void + SendAllDownstream(AbstractRouter* r); size_t numPaths; diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index 858b8f1f1..4ccaf4d9b 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -115,11 +115,11 @@ namespace llarp } void - TransitHop::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r) + TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r) { - std::vector< RelayDownstreamMessage > sendmsgs(msgs.size()); + std::vector< RelayDownstreamMessage > sendmsgs(msgs->size()); size_t idx = 0; - for(auto& ev : msgs) + for(auto& ev : *msgs) { const llarp_buffer_t buf(ev.first); auto& msg = sendmsgs[idx]; @@ -137,11 +137,11 @@ namespace llarp } void - TransitHop::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r) + TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r) { - std::vector< RelayUpstreamMessage > sendmsgs(msgs.size()); + std::vector< RelayUpstreamMessage > sendmsgs(msgs->size()); size_t idx = 0; - for(auto& ev : msgs) + for(auto& ev : *msgs) { const llarp_buffer_t buf(ev.first); auto& msg = sendmsgs[idx]; @@ -166,7 +166,9 @@ namespace llarp { const llarp_buffer_t buf(msg.X); if(!r->ParseRoutingMessageBuffer(buf, this, info.rxID)) - continue; + { + LogWarn("invalid upstream data on endpoint ", info); + } m_LastActivity = r->Now(); } } @@ -188,22 +190,30 @@ namespace llarp for(const auto& msg : msgs) { llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ", - info.downstream, " to ", info.upstream); + info.upstream, " to ", info.downstream); r->SendToOrQueue(info.downstream, &msg); } } void - TransitHop::FlushQueues(AbstractRouter* r) + TransitHop::FlushUpstream(AbstractRouter* r) { - if(!m_UpstreamQueue.empty()) + if(m_UpstreamQueue && !m_UpstreamQueue->empty()) r->threadpool()->addJob(std::bind(&TransitHop::UpstreamWork, shared_from_this(), std::move(m_UpstreamQueue), r)); - if(!m_DownstreamQueue.empty()) + + m_UpstreamQueue = nullptr; + } + + void + TransitHop::FlushDownstream(AbstractRouter* r) + { + if(m_DownstreamQueue && !m_DownstreamQueue->empty()) r->threadpool()->addJob(std::bind(&TransitHop::DownstreamWork, shared_from_this(), std::move(m_DownstreamQueue), r)); + m_DownstreamQueue = nullptr; } bool diff --git a/llarp/path/transit_hop.hpp b/llarp/path/transit_hop.hpp index c29a5b6b4..008ec7841 100644 --- a/llarp/path/transit_hop.hpp +++ b/llarp/path/transit_hop.hpp @@ -196,14 +196,17 @@ namespace llarp HandleDHTMessage(const dht::IMessage& msg, AbstractRouter* r) override; void - FlushQueues(AbstractRouter* r) override; + FlushUpstream(AbstractRouter* r) override; + + void + FlushDownstream(AbstractRouter* r) override; protected: void - UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override; + UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override; void - DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override; + DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override; void HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs, diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 99e713cdd..53af76f9a 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -164,10 +164,9 @@ namespace llarp void Router::PumpLL() { - _logic->tick(time_now_ms()); - paths.Pump(); - _logic->tick(time_now_ms()); + paths.PumpDownstream(); _linkManager.PumpLinks(); + paths.PumpUpstream(); } bool @@ -1063,7 +1062,7 @@ namespace llarp _exitContext.Stop(); if(rpcServer) rpcServer->Stop(); - paths.Pump(); + paths.PumpUpstream(); _linkManager.PumpLinks(); _logic->call_later({200, this, &RouterAfterStopIssued}); } diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index c0e94bc58..2de65698e 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -1065,6 +1065,7 @@ namespace llarp for(const auto& item : m_state->m_SendQueue) item.second->SendRoutingMessage(*item.first, router); m_state->m_SendQueue.clear(); + SendAllUpstream(Router()); } bool diff --git a/llarp/util/thread/logic.cpp b/llarp/util/thread/logic.cpp index 432b73b53..22b01b2b6 100644 --- a/llarp/util/thread/logic.cpp +++ b/llarp/util/thread/logic.cpp @@ -13,6 +13,15 @@ namespace llarp llarp_threadpool_tick(this->thread); } + Logic::Logic() + : thread(llarp_init_threadpool(1, "llarp-logic")) + , timer(llarp_init_timer()) + { + llarp_threadpool_start(thread); + /// set thread id + thread->impl->addJob([&]() { id.emplace(std::this_thread::get_id()); }); + } + Logic::~Logic() { llarp_threadpool_stop(this->thread); @@ -64,7 +73,11 @@ namespace llarp bool Logic::queue_func(std::function< void(void) >&& f) { - return this->thread->impl->addJob(f); + if(!this->thread->impl->tryAddJob(f)) + { + call_later(0, f); + } + return true; } void @@ -98,7 +111,7 @@ namespace llarp bool Logic::can_flush() const { - return false; + return id.has_value() && id.value() == std::this_thread::get_id(); } } // namespace llarp diff --git a/llarp/util/thread/logic.hpp b/llarp/util/thread/logic.hpp index ec3b2fae2..4373ba0d2 100644 --- a/llarp/util/thread/logic.hpp +++ b/llarp/util/thread/logic.hpp @@ -12,13 +12,9 @@ namespace llarp public: struct llarp_threadpool* thread; struct llarp_timer_context* timer; + std::optional< std::thread::id > id; - Logic() - : thread(llarp_init_threadpool(1, "llarp-logic")) - , timer(llarp_init_timer()) - { - llarp_threadpool_start(thread); - } + Logic(); ~Logic();