diff --git a/llarp/ev/ev_epoll.cpp b/llarp/ev/ev_epoll.cpp index 9f92bf4b5..b22ccaf89 100644 --- a/llarp/ev/ev_epoll.cpp +++ b/llarp/ev/ev_epoll.cpp @@ -337,8 +337,9 @@ llarp_epoll_loop::tick(int ms) { epoll_event events[1024]; int result; - result = epoll_wait(epollfd, events, 1024, ms); - bool didIO = false; + result = epoll_wait(epollfd, events, 1024, ms); + bool didRead = false; + bool didWrite = false; if(result > 0) { int idx = 0; @@ -362,6 +363,7 @@ llarp_epoll_loop::tick(int ms) // write THEN READ don't revert me if(events[idx].events & EPOLLOUT) { + didWrite = true; IO([&]() -> ssize_t { llarp::LogDebug("epoll out"); ev->flush_write(); @@ -370,12 +372,11 @@ llarp_epoll_loop::tick(int ms) } if(events[idx].events & EPOLLIN) { - ssize_t amount = IO([&]() -> ssize_t { + didRead = true; + IO([&]() -> ssize_t { llarp::LogDebug("epoll in"); return ev->read(readbuf, sizeof(readbuf)); }); - if(amount > 0) - didIO = true; } } } @@ -385,7 +386,7 @@ llarp_epoll_loop::tick(int ms) if(result != -1) tick_listeners(); /// if we didn't get an io events we sleep to avoid 100% cpu use - if(!didIO) + if(didWrite && !didRead) std::this_thread::sleep_for(std::chrono::milliseconds(5)); return result; } diff --git a/llarp/handlers/exit.cpp b/llarp/handlers/exit.cpp index 15d584daf..ff9f1f378 100644 --- a/llarp/handlers/exit.cpp +++ b/llarp/handlers/exit.cpp @@ -257,6 +257,7 @@ namespace llarp ++itr; } } + m_Router->PumpLL(); } bool diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index de809f38b..4dd2b01ab 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -228,12 +228,19 @@ namespace llarp void TunEndpoint::Flush() { + auto self = shared_from_this(); FlushSend(); if(m_Exit) { - llarp::exit::BaseSession_ptr ex = m_Exit; - RouterLogic()->queue_func([=] { ex->FlushUpstream(); }); + RouterLogic()->queue_func([=] { + self->m_Exit->FlushUpstream(); + self->Router()->PumpLL(); + }); } + RouterLogic()->queue_func([=]() { + self->Pump(self->Now()); + self->Router()->PumpLL(); + }); } static bool @@ -778,13 +785,14 @@ namespace llarp self->FlushSend(); // flush exit traffic queues if it's there if(self->m_Exit) + { self->m_Exit->FlushDownstream(); + } // flush network to user self->m_NetworkToUserPktQueue.Process([tun](net::IPv4Packet &pkt) { if(!llarp_ev_tun_async_write(tun, pkt.Buffer())) llarp::LogWarn("packet dropped"); }); - self->Pump(self->Now()); } void diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index 968e42184..8507f4d3a 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -117,6 +117,10 @@ namespace llarp virtual void Stop() = 0; + /// pump low level links + virtual void + PumpLL() = 0; + virtual bool IsBootstrapNode(RouterID r) const = 0; diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 1e7e7b371..dbf1fe4c4 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -293,6 +293,19 @@ namespace llarp return true; } + void + Router::PumpLL() + { + for(const auto &link : inboundLinks) + { + link->Pump(); + } + for(const auto &link : outboundLinks) + { + link->Pump(); + } + } + constexpr size_t MaxPendingSendQueueSize = 8; bool diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index cd5923cc2..b6fca9efe 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -271,6 +271,9 @@ namespace llarp return IsTrueValue(itr->second.c_str()); } + void + PumpLL() override; + bool CreateDefaultHiddenService(); diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 0e031eb73..4b05c47d3 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -1019,24 +1019,22 @@ namespace llarp return false; } - void - Endpoint::Pump(llarp_time_t) + void Endpoint::Pump(llarp_time_t) { EndpointLogic()->queue_func([&]() { for(const auto& item : m_SNodeSessions) item.second->FlushDownstream(); }); - RouterLogic()->queue_func([&]() { - auto router = Router(); - for(const auto & item : m_RemoteSessions) - item.second->FlushUpstream(); - for(const auto& item : m_SNodeSessions) - item.second->FlushUpstream(); - util::Lock lock(&m_SendQueueMutex); - for(const auto& item : m_SendQueue) - item.second->SendRoutingMessage(*item.first, router); - m_SendQueue.clear(); - }); + + auto router = Router(); + for(const auto& item : m_RemoteSessions) + item.second->FlushUpstream(); + for(const auto& item : m_SNodeSessions) + item.second->FlushUpstream(); + util::Lock lock(&m_SendQueueMutex); + for(const auto& item : m_SendQueue) + item.second->SendRoutingMessage(*item.first, router); + m_SendQueue.clear(); } bool