try calling stuff in logic thread from event loop

pull/830/head
jeff 5 years ago
parent ac2a2aed1d
commit 14c9ef15ed

@ -36,6 +36,7 @@ void
llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
std::shared_ptr< llarp::Logic > logic) std::shared_ptr< llarp::Logic > logic)
{ {
ev->give_logic(logic);
while(ev->running()) while(ev->running())
{ {
ev->update_time(); ev->update_time();

@ -783,6 +783,9 @@ struct llarp_ev_loop
return false; return false;
} }
/// give this event loop a logic thread for calling
virtual void give_logic(std::shared_ptr< llarp::Logic >) = 0;
/// register event listener /// register event listener
virtual bool virtual bool
add_ev(llarp::ev_io* ev, bool write) = 0; add_ev(llarp::ev_io* ev, bool write) = 0;

@ -1,10 +1,19 @@
#include <ev/ev_libuv.hpp> #include <ev/ev_libuv.hpp>
#include <net/net_addr.hpp> #include <net/net_addr.hpp>
#include <util/thread/logic.hpp>
#include <cstring> #include <cstring>
namespace libuv namespace libuv
{ {
/// call a function in logic thread via a handle
template < typename Handle, typename Func >
void
Call(Handle* h, Func&& f)
{
static_cast< Loop* >(h->loop->data)->Call(f);
}
struct glue struct glue
{ {
virtual ~glue() = default; virtual ~glue() = default;
@ -65,8 +74,9 @@ namespace libuv
static void static void
OnOutboundConnect(uv_connect_t* c, int status) OnOutboundConnect(uv_connect_t* c, int status)
{ {
auto* self = static_cast< conn_glue* >(c->data); conn_glue* self = static_cast< conn_glue* >(c->data);
self->HandleConnectResult(status); Call(self->Stream(),
std::bind(&conn_glue::HandleConnectResult, self, status));
c->data = nullptr; c->data = nullptr;
} }
@ -165,13 +175,14 @@ namespace libuv
static void static void
OnWritten(uv_write_t* req, int status) OnWritten(uv_write_t* req, int status)
{ {
conn_glue* conn = static_cast< conn_glue* >(req->data);
if(status) if(status)
{ {
llarp::LogError("write failed on tcp: ", uv_strerror(status)); llarp::LogError("write failed on tcp: ", uv_strerror(status));
static_cast< conn_glue* >(req->data)->Close(); conn->Close();
} }
else else
static_cast< conn_glue* >(req->data)->DrainOne(); Call(conn->Stream(), std::bind(&conn_glue::DrainOne, conn));
delete req; delete req;
} }
@ -195,7 +206,8 @@ namespace libuv
static void static void
OnClosed(uv_handle_t* h) OnClosed(uv_handle_t* h)
{ {
static_cast< conn_glue* >(h->data)->HandleClosed(); conn_glue* conn = static_cast< conn_glue* >(h->data);
Call(h, std::bind(&conn_glue::HandleClosed, conn));
} }
static void static void
@ -251,7 +263,8 @@ namespace libuv
{ {
if(status == 0) if(status == 0)
{ {
static_cast< conn_glue* >(stream->data)->Accept(); conn_glue* conn = static_cast< conn_glue* >(stream->data);
Call(stream, std::bind(&conn_glue::Accept, conn));
} }
else else
{ {
@ -262,8 +275,8 @@ namespace libuv
static void static void
OnTick(uv_check_t* t) OnTick(uv_check_t* t)
{ {
auto* conn = static_cast< conn_glue* >(t->data); conn_glue* conn = static_cast< conn_glue* >(t->data);
conn->Tick(); Call(t, std::bind(&conn_glue::Tick, conn));
} }
void void
@ -331,7 +344,8 @@ namespace libuv
static void static void
OnTick(uv_check_t* t) OnTick(uv_check_t* t)
{ {
static_cast< ticker_glue* >(t->data)->func(); ticker_glue* ticker = static_cast< ticker_glue* >(t->data);
Call(&ticker->m_Ticker, [ticker]() { ticker->func(); });
} }
bool bool
@ -393,14 +407,14 @@ namespace libuv
const size_t pktsz = sz; const size_t pktsz = sz;
const llarp_buffer_t pkt{(const byte_t*)buf->base, pktsz}; const llarp_buffer_t pkt{(const byte_t*)buf->base, pktsz};
m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt}); m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt});
gotpkts = true;
} }
} }
static void static void
OnTick(uv_check_t* t) OnTick(uv_check_t* t)
{ {
static_cast< udp_glue* >(t->data)->Tick(); udp_glue* udp = static_cast< udp_glue* >(t->data);
udp->Tick();
} }
void void
@ -408,7 +422,6 @@ namespace libuv
{ {
if(m_UDP && m_UDP->tick) if(m_UDP && m_UDP->tick)
m_UDP->tick(m_UDP); m_UDP->tick(m_UDP);
gotpkts = false;
} }
static int static int
@ -443,6 +456,7 @@ namespace libuv
if(uv_fileno((const uv_handle_t*)&m_Handle, &m_UDP->fd)) if(uv_fileno((const uv_handle_t*)&m_Handle, &m_UDP->fd))
return false; return false;
m_UDP->sendto = &SendTo; m_UDP->sendto = &SendTo;
m_UDP->impl = this;
return true; return true;
} }
@ -452,8 +466,7 @@ namespace libuv
auto* glue = static_cast< udp_glue* >(h->data); auto* glue = static_cast< udp_glue* >(h->data);
if(glue) if(glue)
{ {
h->data = nullptr; h->data = nullptr;
glue->m_UDP->impl = nullptr;
delete glue; delete glue;
} }
} }
@ -461,6 +474,7 @@ namespace libuv
void void
Close() override Close() override
{ {
m_UDP->impl = nullptr;
uv_check_stop(&m_Ticker); uv_check_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed); uv_close((uv_handle_t*)&m_Handle, &OnClosed);
} }
@ -481,7 +495,7 @@ namespace libuv
void void
Tick() Tick()
{ {
m_Pipe->tick(); Call(&m_Handle, std::bind(&llarp_ev_pkt_pipe::tick, m_Pipe));
} }
static void static void
@ -520,7 +534,8 @@ namespace libuv
static void static void
OnTick(uv_check_t* h) OnTick(uv_check_t* h)
{ {
static_cast< pipe_glue* >(h->data)->Tick(); pipe_glue* pipe = static_cast< pipe_glue* >(h->data);
Call(h, std::bind(&pipe_glue::Tick, pipe));
} }
bool bool
@ -561,7 +576,8 @@ namespace libuv
static void static void
OnTick(uv_check_t* timer) OnTick(uv_check_t* timer)
{ {
static_cast< tun_glue* >(timer->data)->Tick(); tun_glue* tun = static_cast< tun_glue* >(timer->data);
Call(timer, std::bind(&tun_glue::Tick, tun));
} }
static void static void
@ -601,8 +617,7 @@ namespace libuv
auto* self = static_cast< tun_glue* >(h->data); auto* self = static_cast< tun_glue* >(h->data);
if(self) if(self)
{ {
self->m_Tun->impl = nullptr; h->data = nullptr;
h->data = nullptr;
delete self; delete self;
} }
} }
@ -610,6 +625,7 @@ namespace libuv
void void
Close() override Close() override
{ {
m_Tun->impl = nullptr;
uv_check_stop(&m_Ticker); uv_check_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed); uv_close((uv_handle_t*)&m_Handle, &OnClosed);
} }
@ -623,7 +639,8 @@ namespace libuv
static bool static bool
WritePkt(llarp_tun_io* tun, const byte_t* pkt, size_t sz) WritePkt(llarp_tun_io* tun, const byte_t* pkt, size_t sz)
{ {
return static_cast< tun_glue* >(tun->impl)->Write(pkt, sz); tun_glue* glue = static_cast< tun_glue* >(tun->impl);
return glue && glue->Write(pkt, sz);
} }
bool bool
@ -670,6 +687,7 @@ namespace libuv
return false; return false;
} }
m_Tun->writepkt = &WritePkt; m_Tun->writepkt = &WritePkt;
m_Tun->impl = this;
return true; return true;
} }
}; };
@ -680,6 +698,7 @@ namespace libuv
m_Impl.reset(uv_loop_new()); m_Impl.reset(uv_loop_new());
if(uv_loop_init(m_Impl.get()) == -1) if(uv_loop_init(m_Impl.get()) == -1)
return false; return false;
m_Impl->data = this;
uv_loop_configure(m_Impl.get(), UV_LOOP_BLOCK_SIGNAL, SIGPIPE); uv_loop_configure(m_Impl.get(), UV_LOOP_BLOCK_SIGNAL, SIGPIPE);
m_TickTimer.data = this; m_TickTimer.data = this;
m_Run.store(true); m_Run.store(true);

@ -5,6 +5,7 @@
#include <uv.h> #include <uv.h>
#include <vector> #include <vector>
#include <functional> #include <functional>
#include <util/thread/logic.hpp>
namespace libuv namespace libuv
{ {
@ -88,6 +89,20 @@ namespace libuv
return false; return false;
} }
void
give_logic(std::shared_ptr< llarp::Logic > l) override
{
m_Logic = l;
}
/// call function in logic thread
template < typename F >
void
Call(F f)
{
m_Logic->queue_func(f);
}
private: private:
struct DestructLoop struct DestructLoop
{ {
@ -101,6 +116,7 @@ namespace libuv
std::unique_ptr< uv_loop_t, DestructLoop > m_Impl; std::unique_ptr< uv_loop_t, DestructLoop > m_Impl;
uv_timer_t m_TickTimer; uv_timer_t m_TickTimer;
std::atomic< bool > m_Run; std::atomic< bool > m_Run;
std::shared_ptr< llarp::Logic > m_Logic;
}; };
} // namespace libuv } // namespace libuv

@ -314,11 +314,7 @@ namespace llarp
RouterLogic()->queue_func([=] { RouterLogic()->queue_func([=] {
self->m_ExitMap.ForEachValue( self->m_ExitMap.ForEachValue(
[](const auto &exit) { exit->FlushUpstream(); }); [](const auto &exit) { exit->FlushUpstream(); });
self->Router()->PumpLL();
});
RouterLogic()->queue_func([=]() {
self->Pump(self->Now()); self->Pump(self->Now());
self->Router()->PumpLL();
}); });
} }
@ -788,6 +784,7 @@ namespace llarp
} }
llarp::LogWarn(Name(), " did not flush packets"); llarp::LogWarn(Name(), " did not flush packets");
}); });
SendAllDownstream(Router());
} }
bool bool

@ -28,7 +28,7 @@ namespace llarp
auto itr = m_AuthedLinks.begin(); auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end()) while(itr != m_AuthedLinks.end())
{ {
sessions.insert(itr->first); sessions.emplace(itr->first);
++itr; ++itr;
} }
} }

@ -121,7 +121,11 @@ namespace llarp
llarp_time_t now) llarp_time_t now)
{ {
if(idx + buf.sz > m_Data.size()) if(idx + buf.sz > m_Data.size())
{
LogWarn("invalid fragment offset ", idx);
return; return;
}
auto *dst = m_Data.data() + idx; auto *dst = m_Data.data() + idx;
std::copy_n(buf.base, buf.sz, dst); std::copy_n(buf.base, buf.sz, dst);
m_Acks.set(idx / FragmentSize); m_Acks.set(idx / FragmentSize);
@ -134,7 +138,7 @@ namespace llarp
{ {
auto acks = CreatePacket(Command::eACKS, 9, 0, 0); auto acks = CreatePacket(Command::eACKS, 9, 0, 0);
htobe64buf(acks.data() + 2 + PacketOverhead, m_MsgID); htobe64buf(acks.data() + 2 + PacketOverhead, m_MsgID);
acks[PacketOverhead + 8] = AcksBitmask(); acks[PacketOverhead + 10] = AcksBitmask();
return acks; return acks;
} }

@ -198,19 +198,21 @@ namespace llarp
Session::SendMACK() Session::SendMACK()
{ {
// send multi acks // send multi acks
while(!m_SendMACKS.empty()) while(m_SendMACKS.size() > 0)
{ {
const auto sz = m_SendMACKS.size(); const auto sz = m_SendMACKS.size();
const auto max = Session::MaxACKSInMACK; const auto max = Session::MaxACKSInMACK;
auto numAcks = std::min(sz, max); auto numAcks = std::min(sz, max);
auto mack = auto mack = CreatePacket(Command::eMACK,
CreatePacket(Command::eMACK, numAcks * sizeof(uint64_t), 0, 0); 1 + (numAcks * sizeof(uint64_t)), 0, 0);
mack[PacketOverhead + 2] = byte_t{numAcks}; mack[PacketOverhead + 2] = byte_t{numAcks};
byte_t* ptr = mack.data() + 3 + PacketOverhead; byte_t* ptr = mack.data() + 3 + PacketOverhead;
LogDebug("send ", numAcks, " macks to ", m_RemoteAddr);
auto itr = m_SendMACKS.begin();
while(numAcks > 0) while(numAcks > 0)
{ {
htobe64buf(ptr, m_SendMACKS.back()); htobe64buf(ptr, *itr);
m_SendMACKS.pop_back(); itr = m_SendMACKS.erase(itr);
numAcks--; numAcks--;
ptr += sizeof(uint64_t); ptr += sizeof(uint64_t);
} }
@ -226,7 +228,6 @@ namespace llarp
{ {
if(ShouldPing()) if(ShouldPing())
SendKeepAlive(); SendKeepAlive();
SendMACK();
for(auto& item : m_RXMsgs) for(auto& item : m_RXMsgs)
{ {
if(item.second.ShouldSendACKS(now)) if(item.second.ShouldSendACKS(now))
@ -562,6 +563,7 @@ namespace llarp
{ {
for(auto& result : msgs) for(auto& result : msgs)
{ {
LogDebug("Command ", int(result[PacketOverhead + 1]));
switch(result[PacketOverhead + 1]) switch(result[PacketOverhead + 1])
{ {
case Command::eXMIT: case Command::eXMIT:
@ -590,6 +592,7 @@ namespace llarp
" from ", m_RemoteAddr); " from ", m_RemoteAddr);
} }
} }
SendMACK();
} }
void void
@ -601,16 +604,18 @@ namespace llarp
return; return;
} }
byte_t numAcks = data[2 + PacketOverhead]; byte_t numAcks = data[2 + PacketOverhead];
if(data.size() < ((numAcks * sizeof(uint64_t)) + 3)) if(data.size() < 3 + PacketOverhead + (numAcks * sizeof(uint64_t)))
{ {
LogError("short mack from ", m_RemoteAddr); LogError("short mack from ", m_RemoteAddr);
return; return;
} }
LogDebug("got ", int(numAcks), " mack from ", m_RemoteAddr);
byte_t* ptr = data.data() + 3 + PacketOverhead; byte_t* ptr = data.data() + 3 + PacketOverhead;
while(numAcks > 0) while(numAcks > 0)
{ {
uint64_t acked = bufbe64toh(ptr); uint64_t acked = bufbe64toh(ptr);
auto itr = m_TXMsgs.find(acked); LogDebug("mack containing txid=", acked, " from ", m_RemoteAddr);
auto itr = m_TXMsgs.find(acked);
if(itr != m_TXMsgs.end()) if(itr != m_TXMsgs.end())
{ {
itr->second.Completed(); itr->second.Completed();
@ -697,6 +702,11 @@ namespace llarp
htobe64buf(nack.data() + PacketOverhead + 2, rxid); htobe64buf(nack.data() + PacketOverhead + 2, rxid);
EncryptAndSend(std::move(nack)); EncryptAndSend(std::move(nack));
} }
else
{
LogDebug("replay hit for rxid=", rxid, " for ", m_RemoteAddr);
m_SendMACKS.emplace(rxid);
}
return; return;
} }
@ -714,7 +724,7 @@ namespace llarp
const llarp_buffer_t buf(msg.m_Data.data(), msg.m_Size); const llarp_buffer_t buf(msg.m_Data.data(), msg.m_Size);
m_Parent->HandleMessage(this, buf); m_Parent->HandleMessage(this, buf);
m_ReplayFilter.emplace(itr->first, m_Parent->Now()); m_ReplayFilter.emplace(itr->first, m_Parent->Now());
m_SendMACKS.emplace_back(itr->first); m_SendMACKS.emplace(itr->first);
} }
else else
{ {

@ -27,8 +27,7 @@ namespace llarp
/// How often to acks RX messages /// How often to acks RX messages
static constexpr llarp_time_t ACKResendInterval = 250; static constexpr llarp_time_t ACKResendInterval = 250;
/// How often to retransmit TX fragments /// How often to retransmit TX fragments
static constexpr llarp_time_t TXFlushInterval = static constexpr llarp_time_t TXFlushInterval = ACKResendInterval * 2;
(ACKResendInterval * 3) / 2;
/// How often we send a keepalive /// How often we send a keepalive
static constexpr llarp_time_t PingInterval = 2000; static constexpr llarp_time_t PingInterval = 2000;
/// How long we wait for a session to die with no tx from them /// How long we wait for a session to die with no tx from them
@ -165,7 +164,7 @@ namespace llarp
/// maps rxid to time recieved /// maps rxid to time recieved
std::unordered_map< uint64_t, llarp_time_t > m_ReplayFilter; std::unordered_map< uint64_t, llarp_time_t > m_ReplayFilter;
/// list of rx messages to send in next set of multiacks /// list of rx messages to send in next set of multiacks
std::vector< uint64_t > m_SendMACKS; std::set< uint64_t > m_SendMACKS;
using CryptoQueue_t = std::vector< Packet_t >; using CryptoQueue_t = std::vector< Packet_t >;
using CryptoQueue_ptr = std::shared_ptr< CryptoQueue_t >; using CryptoQueue_ptr = std::shared_ptr< CryptoQueue_t >;

@ -484,7 +484,8 @@ namespace llarp
ILinkLayer* link = static_cast< ILinkLayer* >(udp->user); ILinkLayer* link = static_cast< ILinkLayer* >(udp->user);
if(link->m_Recv == nullptr) if(link->m_Recv == nullptr)
return; return;
link->m_Recv->emplace_back(std::make_pair(*from, buf.underlying.sz)); link->m_Recv->emplace_back(
std::make_pair(Addr(*from), ILinkSession::Packet_t(buf.underlying.sz)));
std::copy_n(buf.underlying.base, buf.underlying.sz, std::copy_n(buf.underlying.base, buf.underlying.sz,
link->m_Recv->back().second.begin()); link->m_Recv->back().second.begin());
} }

@ -24,8 +24,9 @@ namespace llarp
{ {
struct IHopHandler struct IHopHandler
{ {
using TrafficEvent_t = std::pair< std::vector< byte_t >, TunnelNonce >; using TrafficEvent_t = std::pair< std::vector< byte_t >, TunnelNonce >;
using TrafficQueue_t = std::vector< TrafficEvent_t >; using TrafficQueue_t = std::vector< TrafficEvent_t >;
using TrafficQueue_ptr = std::shared_ptr< TrafficQueue_t >;
virtual ~IHopHandler() = default; virtual ~IHopHandler() = default;
@ -44,8 +45,10 @@ namespace llarp
HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter*) AbstractRouter*)
{ {
m_UpstreamQueue.emplace_back(); if(m_UpstreamQueue == nullptr)
auto& pkt = m_UpstreamQueue.back(); m_UpstreamQueue = std::make_shared< TrafficQueue_t >();
m_UpstreamQueue->emplace_back();
auto& pkt = m_UpstreamQueue->back();
pkt.first.resize(X.sz); pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin()); std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y; pkt.second = Y;
@ -57,8 +60,10 @@ namespace llarp
HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter*) AbstractRouter*)
{ {
m_DownstreamQueue.emplace_back(); if(m_DownstreamQueue == nullptr)
auto& pkt = m_DownstreamQueue.back(); m_DownstreamQueue = std::make_shared< TrafficQueue_t >();
m_DownstreamQueue->emplace_back();
auto& pkt = m_DownstreamQueue->back();
pkt.first.resize(X.sz); pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin()); std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y; pkt.second = Y;
@ -79,18 +84,21 @@ namespace llarp
return m_SequenceNum++; return m_SequenceNum++;
} }
virtual void virtual void
FlushQueues(AbstractRouter* r) = 0; FlushUpstream(AbstractRouter* r) = 0;
virtual void
FlushDownstream(AbstractRouter* r) = 0;
protected: protected:
uint64_t m_SequenceNum = 0; uint64_t m_SequenceNum = 0;
TrafficQueue_t m_UpstreamQueue; TrafficQueue_ptr m_UpstreamQueue;
TrafficQueue_t m_DownstreamQueue; TrafficQueue_ptr m_DownstreamQueue;
virtual void virtual void
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0; UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0;
virtual void virtual void
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0; DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0;
virtual void virtual void
HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs, HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,

@ -377,11 +377,11 @@ namespace llarp
} }
void void
Path::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r) Path::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{ {
std::vector< RelayUpstreamMessage > sendmsgs(msgs.size()); std::vector< RelayUpstreamMessage > sendmsgs(msgs->size());
size_t idx = 0; size_t idx = 0;
for(const auto& ev : msgs) for(auto& ev : *msgs)
{ {
const llarp_buffer_t buf(ev.first); const llarp_buffer_t buf(ev.first);
TunnelNonce n = ev.second; TunnelNonce n = ev.second;
@ -402,16 +402,27 @@ namespace llarp
} }
void void
Path::FlushQueues(AbstractRouter* r) Path::FlushUpstream(AbstractRouter* r)
{ {
if(!m_UpstreamQueue.empty()) if(m_UpstreamQueue && !m_UpstreamQueue->empty())
{
r->threadpool()->addJob(std::bind(&Path::UpstreamWork, r->threadpool()->addJob(std::bind(&Path::UpstreamWork,
shared_from_this(), shared_from_this(),
std::move(m_UpstreamQueue), r)); std::move(m_UpstreamQueue), r));
if(!m_DownstreamQueue.empty()) }
m_UpstreamQueue = nullptr;
}
void
Path::FlushDownstream(AbstractRouter* r)
{
if(m_DownstreamQueue && !m_DownstreamQueue->empty())
{
r->threadpool()->addJob(std::bind(&Path::DownstreamWork, r->threadpool()->addJob(std::bind(&Path::DownstreamWork,
shared_from_this(), shared_from_this(),
std::move(m_DownstreamQueue), r)); std::move(m_DownstreamQueue), r));
}
m_DownstreamQueue = nullptr;
} }
bool bool
@ -438,11 +449,11 @@ namespace llarp
} }
void void
Path::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r) Path::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{ {
std::vector< RelayDownstreamMessage > sendMsgs(msgs.size()); std::vector< RelayDownstreamMessage > sendMsgs(msgs->size());
size_t idx = 0; size_t idx = 0;
for(auto& ev : msgs) for(auto& ev : *msgs)
{ {
const llarp_buffer_t buf(ev.first); const llarp_buffer_t buf(ev.first);
sendMsgs[idx].Y = ev.second; sendMsgs[idx].Y = ev.second;

@ -325,14 +325,17 @@ namespace llarp
SendExitClose(const routing::CloseExitMessage& msg, AbstractRouter* r); SendExitClose(const routing::CloseExitMessage& msg, AbstractRouter* r);
void void
FlushQueues(AbstractRouter* r) override; FlushUpstream(AbstractRouter* r) override;
void
FlushDownstream(AbstractRouter* r) override;
protected: protected:
void void
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override; UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
void void
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override; DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
void void
HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs, HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,

@ -257,10 +257,18 @@ namespace llarp
} }
void void
PathContext::Pump() PathContext::PumpUpstream()
{ {
m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushQueues(m_Router); }); m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushUpstream(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->FlushQueues(m_Router); }); m_OurPaths.ForEach([&](auto& ptr) { ptr->SendAllUpstream(m_Router); });
}
void
PathContext::PumpDownstream()
{
m_TransitPaths.ForEach(
[&](auto& ptr) { ptr->FlushDownstream(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->SendAllDownstream(m_Router); });
} }
void void

@ -38,7 +38,10 @@ namespace llarp
ExpirePaths(llarp_time_t now); ExpirePaths(llarp_time_t now);
void void
Pump(); PumpUpstream();
void
PumpDownstream();
void void
AllowTransit(); AllowTransit();

@ -396,7 +396,7 @@ namespace llarp
for(size_t idx = 0; idx < hops.size(); ++idx) for(size_t idx = 0; idx < hops.size(); ++idx)
{ {
hops[idx].Clear(); hops[idx].Clear();
size_t tries = 4; size_t tries = 32;
while(tries > 0 && !SelectHop(nodedb, exclude, hops[idx], idx, roles)) while(tries > 0 && !SelectHop(nodedb, exclude, hops[idx], idx, roles))
{ {
--tries; --tries;
@ -406,7 +406,7 @@ namespace llarp
LogWarn(Name(), " failed to select hop ", idx); LogWarn(Name(), " failed to select hop ", idx);
return false; return false;
} }
exclude.insert(hops[idx].pubkey); exclude.emplace(hops[idx].pubkey);
} }
return true; return true;
} }

@ -376,10 +376,15 @@ namespace llarp
} }
void void
PathSet::FlushQueues(AbstractRouter* r) PathSet::SendAllUpstream(AbstractRouter* r)
{
ForEachPath([r](const Path_ptr& p) { p->FlushUpstream(r); });
}
void
PathSet::SendAllDownstream(AbstractRouter* r)
{ {
ForEachPath([r](const Path_ptr& ptr) { ptr->FlushQueues(r); }); ForEachPath([r](const Path_ptr& p) { p->FlushDownstream(r); });
} }
} // namespace path } // namespace path

@ -276,7 +276,10 @@ namespace llarp
} }
void void
FlushQueues(AbstractRouter* r); SendAllUpstream(AbstractRouter* r);
void
SendAllDownstream(AbstractRouter* r);
size_t numPaths; size_t numPaths;

@ -115,11 +115,11 @@ namespace llarp
} }
void void
TransitHop::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r) TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{ {
std::vector< RelayDownstreamMessage > sendmsgs(msgs.size()); std::vector< RelayDownstreamMessage > sendmsgs(msgs->size());
size_t idx = 0; size_t idx = 0;
for(auto& ev : msgs) for(auto& ev : *msgs)
{ {
const llarp_buffer_t buf(ev.first); const llarp_buffer_t buf(ev.first);
auto& msg = sendmsgs[idx]; auto& msg = sendmsgs[idx];
@ -137,11 +137,11 @@ namespace llarp
} }
void void
TransitHop::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r) TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{ {
std::vector< RelayUpstreamMessage > sendmsgs(msgs.size()); std::vector< RelayUpstreamMessage > sendmsgs(msgs->size());
size_t idx = 0; size_t idx = 0;
for(auto& ev : msgs) for(auto& ev : *msgs)
{ {
const llarp_buffer_t buf(ev.first); const llarp_buffer_t buf(ev.first);
auto& msg = sendmsgs[idx]; auto& msg = sendmsgs[idx];
@ -166,7 +166,9 @@ namespace llarp
{ {
const llarp_buffer_t buf(msg.X); const llarp_buffer_t buf(msg.X);
if(!r->ParseRoutingMessageBuffer(buf, this, info.rxID)) if(!r->ParseRoutingMessageBuffer(buf, this, info.rxID))
continue; {
LogWarn("invalid upstream data on endpoint ", info);
}
m_LastActivity = r->Now(); m_LastActivity = r->Now();
} }
} }
@ -188,22 +190,30 @@ namespace llarp
for(const auto& msg : msgs) for(const auto& msg : msgs)
{ {
llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ", llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ",
info.downstream, " to ", info.upstream); info.upstream, " to ", info.downstream);
r->SendToOrQueue(info.downstream, &msg); r->SendToOrQueue(info.downstream, &msg);
} }
} }
void void
TransitHop::FlushQueues(AbstractRouter* r) TransitHop::FlushUpstream(AbstractRouter* r)
{ {
if(!m_UpstreamQueue.empty()) if(m_UpstreamQueue && !m_UpstreamQueue->empty())
r->threadpool()->addJob(std::bind(&TransitHop::UpstreamWork, r->threadpool()->addJob(std::bind(&TransitHop::UpstreamWork,
shared_from_this(), shared_from_this(),
std::move(m_UpstreamQueue), r)); std::move(m_UpstreamQueue), r));
if(!m_DownstreamQueue.empty())
m_UpstreamQueue = nullptr;
}
void
TransitHop::FlushDownstream(AbstractRouter* r)
{
if(m_DownstreamQueue && !m_DownstreamQueue->empty())
r->threadpool()->addJob(std::bind(&TransitHop::DownstreamWork, r->threadpool()->addJob(std::bind(&TransitHop::DownstreamWork,
shared_from_this(), shared_from_this(),
std::move(m_DownstreamQueue), r)); std::move(m_DownstreamQueue), r));
m_DownstreamQueue = nullptr;
} }
bool bool

@ -196,14 +196,17 @@ namespace llarp
HandleDHTMessage(const dht::IMessage& msg, AbstractRouter* r) override; HandleDHTMessage(const dht::IMessage& msg, AbstractRouter* r) override;
void void
FlushQueues(AbstractRouter* r) override; FlushUpstream(AbstractRouter* r) override;
void
FlushDownstream(AbstractRouter* r) override;
protected: protected:
void void
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override; UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
void void
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override; DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
void void
HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs, HandleAllUpstream(std::vector< RelayUpstreamMessage > msgs,

@ -164,10 +164,9 @@ namespace llarp
void void
Router::PumpLL() Router::PumpLL()
{ {
_logic->tick(time_now_ms()); paths.PumpDownstream();
paths.Pump();
_logic->tick(time_now_ms());
_linkManager.PumpLinks(); _linkManager.PumpLinks();
paths.PumpUpstream();
} }
bool bool
@ -1063,7 +1062,7 @@ namespace llarp
_exitContext.Stop(); _exitContext.Stop();
if(rpcServer) if(rpcServer)
rpcServer->Stop(); rpcServer->Stop();
paths.Pump(); paths.PumpUpstream();
_linkManager.PumpLinks(); _linkManager.PumpLinks();
_logic->call_later({200, this, &RouterAfterStopIssued}); _logic->call_later({200, this, &RouterAfterStopIssued});
} }

@ -1065,6 +1065,7 @@ namespace llarp
for(const auto& item : m_state->m_SendQueue) for(const auto& item : m_state->m_SendQueue)
item.second->SendRoutingMessage(*item.first, router); item.second->SendRoutingMessage(*item.first, router);
m_state->m_SendQueue.clear(); m_state->m_SendQueue.clear();
SendAllUpstream(Router());
} }
bool bool

@ -13,6 +13,15 @@ namespace llarp
llarp_threadpool_tick(this->thread); llarp_threadpool_tick(this->thread);
} }
Logic::Logic()
: thread(llarp_init_threadpool(1, "llarp-logic"))
, timer(llarp_init_timer())
{
llarp_threadpool_start(thread);
/// set thread id
thread->impl->addJob([&]() { id.emplace(std::this_thread::get_id()); });
}
Logic::~Logic() Logic::~Logic()
{ {
llarp_threadpool_stop(this->thread); llarp_threadpool_stop(this->thread);
@ -64,7 +73,11 @@ namespace llarp
bool bool
Logic::queue_func(std::function< void(void) >&& f) Logic::queue_func(std::function< void(void) >&& f)
{ {
return this->thread->impl->addJob(f); if(!this->thread->impl->tryAddJob(f))
{
call_later(0, f);
}
return true;
} }
void void
@ -98,7 +111,7 @@ namespace llarp
bool bool
Logic::can_flush() const Logic::can_flush() const
{ {
return false; return id.has_value() && id.value() == std::this_thread::get_id();
} }
} // namespace llarp } // namespace llarp

@ -12,13 +12,9 @@ namespace llarp
public: public:
struct llarp_threadpool* thread; struct llarp_threadpool* thread;
struct llarp_timer_context* timer; struct llarp_timer_context* timer;
std::optional< std::thread::id > id;
Logic() Logic();
: thread(llarp_init_threadpool(1, "llarp-logic"))
, timer(llarp_init_timer())
{
llarp_threadpool_start(thread);
}
~Logic(); ~Logic();

Loading…
Cancel
Save