diff --git a/llarp/exit/session.cpp b/llarp/exit/session.cpp index 98228f740..b836cf157 100644 --- a/llarp/exit/session.cpp +++ b/llarp/exit/session.cpp @@ -59,9 +59,11 @@ namespace llarp p->SetDropHandler(std::bind(&BaseSession::HandleTrafficDrop, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); - p->SetExitTrafficHandler(std::bind(&BaseSession::HandleTraffic, this, - std::placeholders::_1, - std::placeholders::_2)); + + p->SetExitTrafficHandler( + std::bind(&BaseSession::HandleTraffic, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); + p->AddObtainExitHandler(std::bind(&BaseSession::HandleGotExit, this, std::placeholders::_1, std::placeholders::_2)); @@ -91,13 +93,16 @@ namespace llarp } bool - BaseSession::HandleTraffic(llarp::path::Path* p, llarp_buffer_t pkt) + BaseSession::HandleTraffic(llarp::path::Path* p, llarp_buffer_t buf, + uint64_t counter) { (void)p; if(m_WritePacket) { - if(!m_WritePacket(pkt)) + llarp::net::IPv4Packet pkt; + if(!pkt.Load(buf)) return false; + m_Downstream.emplace(counter, pkt); m_LastUse = router->Now(); return true; } @@ -153,7 +158,7 @@ namespace llarp } bool - BaseSession::FlushUpstreamTraffic() + BaseSession::Flush() { auto now = router->Now(); auto path = PickRandomEstablishedPath(llarp::path::ePathRoleExit); @@ -176,6 +181,12 @@ namespace llarp queue.pop_front(); } } + while(m_Downstream.size()) + { + if(m_WritePacket) + m_WritePacket(m_Downstream.top().second.ConstBuffer()); + m_Downstream.pop(); + } return true; } diff --git a/llarp/exit/session.hpp b/llarp/exit/session.hpp index 720e468c5..255ff0038 100644 --- a/llarp/exit/session.hpp +++ b/llarp/exit/session.hpp @@ -7,6 +7,7 @@ #include #include +#include namespace llarp { @@ -37,8 +38,9 @@ namespace llarp bool QueueUpstreamTraffic(llarp::net::IPv4Packet pkt, const size_t packSize); + /// flush upstream and downstream traffic bool - FlushUpstreamTraffic(); + Flush(); bool IsReady() const; @@ -71,13 +73,30 @@ namespace llarp HandleGotExit(llarp::path::Path* p, llarp_time_t b); bool - HandleTraffic(llarp::path::Path* p, llarp_buffer_t buf); + HandleTraffic(llarp::path::Path* p, llarp_buffer_t buf, uint64_t seqno); private: using UpstreamTrafficQueue_t = std::deque< llarp::routing::TransferTrafficMessage >; using TieredQueue_t = std::map< uint8_t, UpstreamTrafficQueue_t >; TieredQueue_t m_Upstream; + + using DownstreamPkt = std::pair< uint64_t, llarp::net::IPv4Packet >; + + struct DownstreamPktSorter + { + bool + operator()(const DownstreamPkt& left, const DownstreamPkt& right) const + { + return left.first < right.first; + } + }; + + using DownstreamTrafficQueue_t = + std::priority_queue< DownstreamPkt, std::vector< DownstreamPkt >, + DownstreamPktSorter >; + DownstreamTrafficQueue_t m_Downstream; + uint64_t m_Counter; llarp_time_t m_LastUse; }; diff --git a/llarp/handlers/exit.cpp b/llarp/handlers/exit.cpp index 5f1301cfd..3f56264d7 100644 --- a/llarp/handlers/exit.cpp +++ b/llarp/handlers/exit.cpp @@ -206,7 +206,7 @@ namespace llarp auto itr = m_SNodeSessions.begin(); while(itr != m_SNodeSessions.end()) { - if(!itr->second->FlushUpstreamTraffic()) + if(!itr->second->Flush()) { llarp::LogWarn("failed to flushsnode traffic to ", itr->first, " via outbound session"); diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index ff0e7dfd7..f44dbdfdf 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -490,7 +490,7 @@ namespace llarp return true; }); if(m_Exit) - m_Exit->FlushUpstreamTraffic(); + m_Exit->Flush(); FlushSNodeTraffic(); } diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 1af523a0e..6312d5b7b 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -129,7 +129,7 @@ namespace llarp auto itr = m_SNodeSessions.begin(); while(itr != m_SNodeSessions.end()) { - itr->second->FlushUpstreamTraffic(); + itr->second->Flush(); ++itr; } }