pull/830/head
Jeff Becker 5 years ago
parent bcf9135da6
commit e3bb59707e
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -43,6 +43,7 @@ namespace llarp
bool
Context::Configure()
{
logic = std::make_shared< Logic >();
// llarp::LogInfo("loading config at ", configfile);
if(!config->Load(configfile.c_str()))
{
@ -197,7 +198,6 @@ __ ___ ____ _ _ ___ _ _ ____
llarp::LogInfo(LLARP_VERSION, " ", LLARP_RELEASE_MOTTO);
llarp::LogInfo("starting up");
mainloop = llarp_make_ev_loop();
logic = std::make_shared< Logic >();
if(debug)
{

@ -44,7 +44,6 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
{
ev->update_time();
logic->tick_async(ev->time_now());
llarp_threadpool_tick(logic->thread);
}
llarp::LogContext::Instance().logStream->Tick(ev->time_now());
}

@ -98,9 +98,14 @@ namespace libuv
OnRead(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
{
if(nread >= 0)
static_cast< conn_glue* >(stream->data)->Read(buf->base, nread);
{
auto* conn = static_cast< conn_glue* >(stream->data);
conn->Read(buf->base, nread);
}
else if(nread < 0)
{
static_cast< conn_glue* >(stream->data)->Close();
}
delete[] buf->base;
}
@ -257,7 +262,8 @@ namespace libuv
static void
OnTick(uv_check_t* t)
{
static_cast< conn_glue* >(t->data)->Tick();
auto* conn = static_cast< conn_glue* >(t->data);
conn->Tick();
}
void

@ -22,7 +22,8 @@ namespace llarp
static void
ExitHandlerFlush(llarp_tun_io *tun)
{
static_cast< ExitEndpoint * >(tun->user)->Flush();
auto *ep = static_cast< ExitEndpoint * >(tun->user);
ep->GetRouter()->logic()->queue_func(std::bind(&ExitEndpoint::Flush, ep));
}
ExitEndpoint::ExitEndpoint(const std::string &name, AbstractRouter *r)

@ -709,13 +709,13 @@ namespace llarp
void
TunEndpoint::Tick(llarp_time_t now)
{
// call tun code in endpoint logic in case of network isolation
// EndpointLogic()->queue_job({this, handleTickTun});
m_ExitMap.ForEachValue([&](const auto &exit) {
EnsureRouterIsKnown(exit->Endpoint());
exit->Tick(now);
EndpointLogic()->queue_func([&]() {
m_ExitMap.ForEachValue([&](const auto &exit) {
this->EnsureRouterIsKnown(exit->Endpoint());
exit->Tick(now);
});
Endpoint::Tick(now);
});
Endpoint::Tick(now);
}
bool
@ -934,10 +934,13 @@ namespace llarp
// called in the isolated network thread
auto *self = static_cast< TunEndpoint * >(tun->user);
// flush user to network
self->FlushSend();
self->EndpointLogic()->queue_func(
std::bind(&TunEndpoint::FlushSend, self));
// flush exit traffic queues if it's there
self->m_ExitMap.ForEachValue(
[](const auto &exit) { exit->FlushDownstream(); });
self->EndpointLogic()->queue_func([self] {
self->m_ExitMap.ForEachValue(
[](const auto &exit) { exit->FlushDownstream(); });
});
// flush network to user
self->m_NetworkToUserPktQueue.Process([tun](net::IPPacket &pkt) {
if(!llarp_ev_tun_async_write(tun, pkt.Buffer()))
@ -950,9 +953,9 @@ namespace llarp
{
// called for every packet read from user in isolated network thread
auto *self = static_cast< TunEndpoint * >(tun->user);
const ManagedBuffer buf(b);
const ManagedBuffer pkt(b);
self->m_UserToNetworkPktQueue.EmplaceIf(
[&buf](net::IPPacket &pkt) -> bool { return pkt.Load(buf); });
[&pkt](net::IPPacket &p) -> bool { return p.Load(pkt); });
}
TunEndpoint::~TunEndpoint() = default;

@ -79,7 +79,8 @@ namespace llarp
static void
udp_tick(llarp_udp_io* udp)
{
static_cast< ILinkLayer* >(udp->user)->Pump();
ILinkLayer* link = static_cast< ILinkLayer* >(udp->user);
link->logic()->queue_func([link]() { link->Pump(); });
}
static void
@ -90,11 +91,15 @@ namespace llarp
llarp::LogWarn("no udp set");
return;
}
std::vector< byte_t > pkt(buf.underlying.sz);
std::copy_n(buf.underlying.base, buf.underlying.sz, pkt.begin());
const llarp::Addr srcaddr(*from);
// maybe check from too?
// no it's never null
static_cast< ILinkLayer* >(udp->user)->RecvFrom(
srcaddr, buf.underlying.base, buf.underlying.sz);
ILinkLayer* link = static_cast< ILinkLayer* >(udp->user);
link->logic()->queue_func([link, srcaddr, pkt]() {
link->RecvFrom(srcaddr, pkt.data(), pkt.size());
});
}
void

@ -294,7 +294,7 @@ namespace llarp
return;
auto *self = static_cast< Router * >(user);
self->ticker_job_id = 0;
self->Tick();
self->logic()->queue_func(std::bind(&Router::Tick, self));
self->ScheduleTicker(orig);
}
@ -998,7 +998,6 @@ namespace llarp
}
LogInfo("have ", nodedb->num_loaded(), " routers");
_netloop->add_ticker(std::bind(&Router::PumpLL, this));
ScheduleTicker(1000);
_running.store(true);
_startedAt = Now();

@ -13,11 +13,17 @@ namespace llarp
llarp_threadpool_tick(this->thread);
}
Logic::~Logic()
{
llarp_threadpool_stop(this->thread);
llarp_threadpool_join(this->thread);
llarp_free_threadpool(&this->thread);
}
void
Logic::tick_async(llarp_time_t now)
{
llarp_timer_tick_all_async(this->timer, this->thread, now);
llarp_threadpool_tick(this->thread);
}
void
@ -97,7 +103,7 @@ namespace llarp
bool
Logic::can_flush() const
{
return ourID == std::this_thread::get_id();
return false;
}
} // namespace llarp

@ -12,15 +12,16 @@ namespace llarp
public:
struct llarp_threadpool* thread;
struct llarp_timer_context* timer;
const std::thread::id ourID;
Logic()
: thread(llarp_init_same_process_threadpool())
: thread(llarp_init_threadpool(1, "llarp-logic"))
, timer(llarp_init_timer())
, ourID(std::this_thread::get_id())
{
llarp_threadpool_start(thread);
}
~Logic();
/// single threaded tick
void
tick(llarp_time_t now);

Loading…
Cancel
Save