pull/576/head
Jeff Becker 5 years ago
parent 0b68d3db5d
commit bb47d612b3
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -337,8 +337,9 @@ llarp_epoll_loop::tick(int ms)
{ {
epoll_event events[1024]; epoll_event events[1024];
int result; int result;
result = epoll_wait(epollfd, events, 1024, ms); result = epoll_wait(epollfd, events, 1024, ms);
bool didIO = false; bool didRead = false;
bool didWrite = false;
if(result > 0) if(result > 0)
{ {
int idx = 0; int idx = 0;
@ -362,6 +363,7 @@ llarp_epoll_loop::tick(int ms)
// write THEN READ don't revert me // write THEN READ don't revert me
if(events[idx].events & EPOLLOUT) if(events[idx].events & EPOLLOUT)
{ {
didWrite = true;
IO([&]() -> ssize_t { IO([&]() -> ssize_t {
llarp::LogDebug("epoll out"); llarp::LogDebug("epoll out");
ev->flush_write(); ev->flush_write();
@ -370,12 +372,11 @@ llarp_epoll_loop::tick(int ms)
} }
if(events[idx].events & EPOLLIN) if(events[idx].events & EPOLLIN)
{ {
ssize_t amount = IO([&]() -> ssize_t { didRead = true;
IO([&]() -> ssize_t {
llarp::LogDebug("epoll in"); llarp::LogDebug("epoll in");
return ev->read(readbuf, sizeof(readbuf)); return ev->read(readbuf, sizeof(readbuf));
}); });
if(amount > 0)
didIO = true;
} }
} }
} }
@ -385,7 +386,7 @@ llarp_epoll_loop::tick(int ms)
if(result != -1) if(result != -1)
tick_listeners(); tick_listeners();
/// if we didn't get an io events we sleep to avoid 100% cpu use /// if we didn't get an io events we sleep to avoid 100% cpu use
if(!didIO) if(didWrite && !didRead)
std::this_thread::sleep_for(std::chrono::milliseconds(5)); std::this_thread::sleep_for(std::chrono::milliseconds(5));
return result; return result;
} }

@ -257,6 +257,7 @@ namespace llarp
++itr; ++itr;
} }
} }
m_Router->PumpLL();
} }
bool bool

@ -228,12 +228,19 @@ namespace llarp
void void
TunEndpoint::Flush() TunEndpoint::Flush()
{ {
auto self = shared_from_this();
FlushSend(); FlushSend();
if(m_Exit) if(m_Exit)
{ {
llarp::exit::BaseSession_ptr ex = m_Exit; RouterLogic()->queue_func([=] {
RouterLogic()->queue_func([=] { ex->FlushUpstream(); }); self->m_Exit->FlushUpstream();
self->Router()->PumpLL();
});
} }
RouterLogic()->queue_func([=]() {
self->Pump(self->Now());
self->Router()->PumpLL();
});
} }
static bool static bool
@ -778,13 +785,14 @@ namespace llarp
self->FlushSend(); self->FlushSend();
// flush exit traffic queues if it's there // flush exit traffic queues if it's there
if(self->m_Exit) if(self->m_Exit)
{
self->m_Exit->FlushDownstream(); self->m_Exit->FlushDownstream();
}
// flush network to user // flush network to user
self->m_NetworkToUserPktQueue.Process([tun](net::IPv4Packet &pkt) { self->m_NetworkToUserPktQueue.Process([tun](net::IPv4Packet &pkt) {
if(!llarp_ev_tun_async_write(tun, pkt.Buffer())) if(!llarp_ev_tun_async_write(tun, pkt.Buffer()))
llarp::LogWarn("packet dropped"); llarp::LogWarn("packet dropped");
}); });
self->Pump(self->Now());
} }
void void

@ -117,6 +117,10 @@ namespace llarp
virtual void virtual void
Stop() = 0; Stop() = 0;
/// pump low level links
virtual void
PumpLL() = 0;
virtual bool virtual bool
IsBootstrapNode(RouterID r) const = 0; IsBootstrapNode(RouterID r) const = 0;

@ -293,6 +293,19 @@ namespace llarp
return true; return true;
} }
void
Router::PumpLL()
{
for(const auto &link : inboundLinks)
{
link->Pump();
}
for(const auto &link : outboundLinks)
{
link->Pump();
}
}
constexpr size_t MaxPendingSendQueueSize = 8; constexpr size_t MaxPendingSendQueueSize = 8;
bool bool

@ -271,6 +271,9 @@ namespace llarp
return IsTrueValue(itr->second.c_str()); return IsTrueValue(itr->second.c_str());
} }
void
PumpLL() override;
bool bool
CreateDefaultHiddenService(); CreateDefaultHiddenService();

@ -1019,24 +1019,22 @@ namespace llarp
return false; return false;
} }
void void Endpoint::Pump(llarp_time_t)
Endpoint::Pump(llarp_time_t)
{ {
EndpointLogic()->queue_func([&]() { EndpointLogic()->queue_func([&]() {
for(const auto& item : m_SNodeSessions) for(const auto& item : m_SNodeSessions)
item.second->FlushDownstream(); item.second->FlushDownstream();
}); });
RouterLogic()->queue_func([&]() {
auto router = Router(); auto router = Router();
for(const auto & item : m_RemoteSessions) for(const auto& item : m_RemoteSessions)
item.second->FlushUpstream(); item.second->FlushUpstream();
for(const auto& item : m_SNodeSessions) for(const auto& item : m_SNodeSessions)
item.second->FlushUpstream(); item.second->FlushUpstream();
util::Lock lock(&m_SendQueueMutex); util::Lock lock(&m_SendQueueMutex);
for(const auto& item : m_SendQueue) for(const auto& item : m_SendQueue)
item.second->SendRoutingMessage(*item.first, router); item.second->SendRoutingMessage(*item.first, router);
m_SendQueue.clear(); m_SendQueue.clear();
});
} }
bool bool

Loading…
Cancel
Save