move stuff arround so that flushing queues are done in the correct event loops

TODO: locking
pull/576/head
Jeff Becker 5 years ago
parent 2a7ebce8f4
commit 0b68d3db5d
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -1020,27 +1020,18 @@ namespace llarp
} }
void void
Endpoint::Pump(llarp_time_t now) Endpoint::Pump(llarp_time_t)
{ {
auto itr = m_RemoteSessions.begin();
while(itr != m_RemoteSessions.end())
{
itr->second->Pump(now);
++itr;
}
RouterLogic()->queue_func([&]() {
for(const auto& item : m_SNodeSessions)
item.second->FlushUpstream();
});
EndpointLogic()->queue_func([&]() { EndpointLogic()->queue_func([&]() {
for(const auto& item : m_SNodeSessions) for(const auto& item : m_SNodeSessions)
item.second->FlushDownstream(); item.second->FlushDownstream();
}); });
RouterLogic()->queue_func([&]() { RouterLogic()->queue_func([&]() {
auto router = Router(); auto router = Router();
for(const auto & item : m_RemoteSessions)
item.second->FlushUpstream();
for(const auto& item : m_SNodeSessions)
item.second->FlushUpstream();
util::Lock lock(&m_SendQueueMutex); util::Lock lock(&m_SendQueueMutex);
for(const auto& item : m_SendQueue) for(const auto& item : m_SendQueue)
item.second->SendRoutingMessage(*item.first, router); item.second->SendRoutingMessage(*item.first, router);

@ -90,7 +90,7 @@ namespace llarp
auto itr = remoteSessions.begin(); auto itr = remoteSessions.begin();
while(itr != remoteSessions.end()) while(itr != remoteSessions.end())
{ {
if(itr->second->Tick(now)) if(itr->second->Pump(now))
{ {
itr->second->Stop(); itr->second->Stop();
deadSessions.emplace(std::move(*itr)); deadSessions.emplace(std::move(*itr));

@ -29,20 +29,13 @@ namespace llarp
msg, remoteIntro.pathID); msg, remoteIntro.pathID);
{ {
util::Lock lock(&m_SendQueueMutex); util::Lock lock(&m_SendQueueMutex);
const auto sz = m_SendQueue.size();
m_SendQueue.emplace_back(transfer, path); m_SendQueue.emplace_back(transfer, path);
if(sz == 0)
{
// TODO: use shared_from_this()
m_Endpoint->RouterLogic()->queue_func(
std::bind(&SendContext::FlushSend, this));
}
} }
return true; return true;
} }
void void
SendContext::FlushSend() SendContext::FlushUpstream()
{ {
auto r = m_Endpoint->Router(); auto r = m_Endpoint->Router();
util::Lock lock(&m_SendQueueMutex); util::Lock lock(&m_SendQueueMutex);

@ -30,9 +30,9 @@ namespace llarp
Send(const ProtocolFrame& f, path::Path_ptr path) Send(const ProtocolFrame& f, path::Path_ptr path)
LOCKS_EXCLUDED(m_SendQueueMutex); LOCKS_EXCLUDED(m_SendQueueMutex);
/// flush send when in router thread /// flush upstream traffic when in router thread
void void
FlushSend() LOCKS_EXCLUDED(m_SendQueueMutex); FlushUpstream() LOCKS_EXCLUDED(m_SendQueueMutex);
SharedSecret sharedKey; SharedSecret sharedKey;
ServiceInfo remoteIdent; ServiceInfo remoteIdent;

Loading…
Cancel
Save