work in progress

pull/916/head
Jeff Becker 5 years ago
parent ac686a9329
commit 6f95fbfece
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -424,8 +424,11 @@ namespace libuv
Close() override Close() override
{ {
uv_check_stop(&m_Ticker); uv_check_stop(&m_Ticker);
m_Ticker.data = nullptr; uv_close((uv_handle_t*)&m_Ticker, [](auto h) {
delete this; ticker_glue* self = (ticker_glue*)h->data;
h->data = nullptr;
delete self;
});
} }
uv_check_t m_Ticker; uv_check_t m_Ticker;
@ -663,7 +666,7 @@ namespace libuv
OnTick(uv_check_t* timer) OnTick(uv_check_t* timer)
{ {
tun_glue* tun = static_cast< tun_glue* >(timer->data); tun_glue* tun = static_cast< tun_glue* >(timer->data);
LoopCall(timer, std::bind(&tun_glue::Tick, tun)); tun->Tick();
} }
static void static void
@ -682,7 +685,7 @@ namespace libuv
if(sz > 0) if(sz > 0)
{ {
llarp::LogDebug("tun read ", sz); llarp::LogDebug("tun read ", sz);
llarp_buffer_t pkt(m_Buffer, sz); const llarp_buffer_t pkt(m_Buffer, sz);
if(m_Tun && m_Tun->recvpkt) if(m_Tun && m_Tun->recvpkt)
m_Tun->recvpkt(m_Tun, pkt); m_Tun->recvpkt(m_Tun, pkt);
} }
@ -711,9 +714,14 @@ namespace libuv
void void
Close() override Close() override
{ {
if(m_Tun->impl == nullptr)
return;
m_Tun->impl = nullptr; m_Tun->impl = nullptr;
uv_check_stop(&m_Ticker); uv_check_stop(&m_Ticker);
uv_close((uv_handle_t*)&m_Handle, &OnClosed); uv_close((uv_handle_t*)&m_Ticker, [](uv_handle_t* h) {
tun_glue* glue = static_cast< tun_glue* >(h->data);
uv_close((uv_handle_t*)&glue->m_Handle, &OnClosed);
});
} }
bool bool
@ -824,18 +832,24 @@ namespace libuv
int int
Loop::tick(int ms) Loop::tick(int ms)
{ {
uv_timer_start(&m_TickTimer, &OnTickTimeout, ms, 0); if(m_Run)
uv_run(&m_Impl, UV_RUN_ONCE); {
uv_timer_start(&m_TickTimer, &OnTickTimeout, ms, 0);
uv_run(&m_Impl, UV_RUN_ONCE);
}
return 0; return 0;
} }
void void
Loop::stop() Loop::stop()
{ {
uv_stop(&m_Impl); if(m_Run)
llarp::LogInfo("stopping event loop"); {
llarp::LogInfo("stopping event loop");
CloseAll();
// uv_stop(&m_Impl);
}
m_Run.store(false); m_Run.store(false);
CloseAll();
} }
void void

@ -16,7 +16,13 @@ namespace llarp
static void static void
ExitHandlerRecvPkt(llarp_tun_io *tun, const llarp_buffer_t &buf) ExitHandlerRecvPkt(llarp_tun_io *tun, const llarp_buffer_t &buf)
{ {
static_cast< ExitEndpoint * >(tun->user)->OnInetPacket(buf); std::vector< byte_t > pkt;
pkt.resize(buf.sz);
std::copy_n(buf.base, buf.sz, pkt.data());
auto self = static_cast< ExitEndpoint * >(tun->user);
LogicCall(self->GetRouter()->logic(), [self, pktbuf = std::move(pkt)]() {
self->OnInetPacket(std::move(pktbuf));
});
} }
static void static void
@ -457,10 +463,13 @@ namespace llarp
} }
void void
ExitEndpoint::OnInetPacket(const llarp_buffer_t &buf) ExitEndpoint::OnInetPacket(std::vector< byte_t > buf)
{ {
const llarp_buffer_t buffer(buf);
m_InetToNetwork.EmplaceIf( m_InetToNetwork.EmplaceIf(
[b = ManagedBuffer(buf)](Pkt_t &pkt) -> bool { return pkt.Load(b); }); [b = ManagedBuffer(buffer)](Pkt_t &pkt) -> bool {
return pkt.Load(b);
});
} }
bool bool

@ -57,7 +57,7 @@ namespace llarp
/// handle ip packet from outside /// handle ip packet from outside
void void
OnInetPacket(const llarp_buffer_t& buf); OnInetPacket(std::vector< byte_t > buf);
AbstractRouter* AbstractRouter*
GetRouter(); GetRouter();

@ -32,11 +32,11 @@ namespace llarp
m_NetworkToUserPktQueue.Process(send); m_NetworkToUserPktQueue.Process(send);
} }
static void void
tunifTick(llarp_tun_io *tun) TunEndpoint::tunifTick(llarp_tun_io *tun)
{ {
auto *self = static_cast< TunEndpoint * >(tun->user); auto *self = static_cast< TunEndpoint * >(tun->user);
self->Flush(); LogicCall(self->m_router->logic(), [self]() { self->Flush(); });
} }
TunEndpoint::TunEndpoint(const std::string &nickname, AbstractRouter *r, TunEndpoint::TunEndpoint(const std::string &nickname, AbstractRouter *r,
@ -308,13 +308,20 @@ namespace llarp
void void
TunEndpoint::Flush() TunEndpoint::Flush()
{ {
auto self = shared_from_this(); static const auto func = [](auto self) {
FlushSend(); self->FlushSend();
LogicCall(RouterLogic(), [=] {
self->m_ExitMap.ForEachValue( self->m_ExitMap.ForEachValue(
[](const auto &exit) { exit->FlushUpstream(); }); [](const auto &exit) { exit->FlushUpstream(); });
self->Pump(self->Now()); self->Pump(self->Now());
}); };
if(NetworkIsIsolated())
{
LogicCall(RouterLogic(), std::bind(func, shared_from_this()));
}
else
{
func(this);
}
} }
static bool static bool
@ -749,13 +756,11 @@ namespace llarp
void void
TunEndpoint::Tick(llarp_time_t now) TunEndpoint::Tick(llarp_time_t now)
{ {
LogicCall(EndpointLogic(), [&]() { m_ExitMap.ForEachValue([&](const auto &exit) {
m_ExitMap.ForEachValue([&](const auto &exit) { this->EnsureRouterIsKnown(exit->Endpoint());
this->EnsureRouterIsKnown(exit->Endpoint()); exit->Tick(now);
exit->Tick(now);
});
Endpoint::Tick(now);
}); });
Endpoint::Tick(now);
} }
bool bool
@ -983,8 +988,7 @@ namespace llarp
return false; return false;
}; };
LogicCall(self->EndpointLogic(), LogicCall(self->EndpointLogic(),
std::bind(&TunEndpoint::FlushToUser, self->shared_from_this(), std::bind(&TunEndpoint::FlushToUser, self, sendpkt));
sendpkt));
} }
void void
@ -992,9 +996,14 @@ namespace llarp
{ {
// called for every packet read from user in isolated network thread // called for every packet read from user in isolated network thread
auto *self = static_cast< TunEndpoint * >(tun->user); auto *self = static_cast< TunEndpoint * >(tun->user);
const ManagedBuffer pkt(b); std::vector< byte_t > pkt;
self->m_UserToNetworkPktQueue.EmplaceIf( pkt.resize(b.sz);
[&pkt](net::IPPacket &p) -> bool { return p.Load(pkt); }); std::copy_n(b.base, b.sz, pkt.data());
LogicCall(self->RouterLogic(), [self, buffer = std::move(pkt)]() {
const llarp_buffer_t pbuf(buffer);
self->m_UserToNetworkPktQueue.EmplaceIf(
[&pbuf](net::IPPacket &p) -> bool { return p.Load(pbuf); });
});
} }
TunEndpoint::~TunEndpoint() = default; TunEndpoint::~TunEndpoint() = default;

@ -56,6 +56,9 @@ namespace llarp
void void
TickTun(llarp_time_t now); TickTun(llarp_time_t now);
static void
tunifTick(llarp_tun_io*);
bool bool
MapAddress(const service::Address& remote, huint128_t ip, bool SNode); MapAddress(const service::Address& remote, huint128_t ip, bool SNode);

Loading…
Cancel
Save