diff --git a/llarp/handlers/null.hpp b/llarp/handlers/null.hpp index c89b596f0..e03a546ed 100644 --- a/llarp/handlers/null.hpp +++ b/llarp/handlers/null.hpp @@ -16,7 +16,7 @@ namespace llarp virtual bool HandleInboundPacket( - const service::ConvoTag, const llarp_buffer_t&, service::ProtocolType) override + const service::ConvoTag, const llarp_buffer_t&, service::ProtocolType, uint64_t) override { return true; } diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 1e4b9a497..354d25789 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -31,10 +31,14 @@ namespace llarp namespace handlers { void - TunEndpoint::FlushToUser(std::function send) + TunEndpoint::FlushToUser(std::function send) { // flush network to user - m_NetworkToUserPktQueue.Process(send); + while (not m_NetworkToUserPktQueue.empty()) + { + send(m_NetworkToUserPktQueue.top().pkt); + m_NetworkToUserPktQueue.pop(); + } } bool @@ -54,7 +58,6 @@ namespace llarp TunEndpoint::TunEndpoint(AbstractRouter* r, service::Context* parent, bool lazyVPN) : service::Endpoint(r, parent) , m_UserToNetworkPktQueue("endpoint_sendq", r->netloop(), r->netloop()) - , m_NetworkToUserPktQueue("endpoint_recvq", r->netloop(), r->netloop()) , m_Resolver(std::make_shared( r->netloop(), r->logic(), r->netloop(), r->logic(), this)) { @@ -223,13 +226,6 @@ namespace llarp return m_IPToAddr.find(ip) != m_IPToAddr.end(); } - bool - TunEndpoint::QueueOutboundTraffic(llarp::net::IPPacket&& pkt) - { - return m_NetworkToUserPktQueue.EmplaceIf( - [](llarp::net::IPPacket&) -> bool { return true; }, std::move(pkt)); - } - void TunEndpoint::Flush() { @@ -669,7 +665,7 @@ namespace llarp llarp::LogInfo(Name(), " got vpn interface"); auto self = shared_from_this(); // function to queue a packet to send to vpn interface - auto sendpkt = [self](net::IPPacket& pkt) -> bool { + auto sendpkt = [self](const net::IPPacket& pkt) -> bool { // drop if no endpoint auto impl = self->GetVPNImpl(); // drop if no vpn interface @@ -860,7 +856,7 @@ namespace llarp const auto icmp = pkt.MakeICMPUnreachable(); if (icmp.has_value()) { - HandleWriteIPPacket(icmp->ConstBuffer(), dst, src); + HandleWriteIPPacket(icmp->ConstBuffer(), dst, src, 0); } } else @@ -927,7 +923,10 @@ namespace llarp bool TunEndpoint::HandleInboundPacket( - const service::ConvoTag tag, const llarp_buffer_t& buf, service::ProtocolType t) + const service::ConvoTag tag, + const llarp_buffer_t& buf, + service::ProtocolType t, + uint64_t seqno) { if (t != service::eProtocolTrafficV4 && t != service::eProtocolTrafficV6 && t != service::eProtocolExit) @@ -972,28 +971,31 @@ namespace llarp src = ObtainIPForAddr(addr, snode); dst = m_OurIP; } - HandleWriteIPPacket(buf, src, dst); + HandleWriteIPPacket(buf, src, dst, seqno); return true; } bool - TunEndpoint::HandleWriteIPPacket(const llarp_buffer_t& b, huint128_t src, huint128_t dst) + TunEndpoint::HandleWriteIPPacket( + const llarp_buffer_t& b, huint128_t src, huint128_t dst, uint64_t seqno) { ManagedBuffer buf(b); - return m_NetworkToUserPktQueue.EmplaceIf([buf, src, dst](net::IPPacket& pkt) -> bool { - // load - if (!pkt.Load(buf)) - return false; - if (pkt.IsV4()) - { - pkt.UpdateIPv4Address(xhtonl(net::TruncateV6(src)), xhtonl(net::TruncateV6(dst))); - } - else if (pkt.IsV6()) - { - pkt.UpdateIPv6Address(src, dst); - } - return true; - }); + WritePacket write; + write.seqno = seqno; + auto& pkt = write.pkt; + // load + if (!pkt.Load(buf)) + return false; + if (pkt.IsV4()) + { + pkt.UpdateIPv4Address(xhtonl(net::TruncateV6(src)), xhtonl(net::TruncateV6(dst))); + } + else if (pkt.IsV6()) + { + pkt.UpdateIPv6Address(src, dst); + } + m_NetworkToUserPktQueue.push(std::move(write)); + return true; } huint128_t @@ -1097,8 +1099,8 @@ namespace llarp // called in the isolated network thread auto* self = static_cast(tun->user); self->Flush(); - self->FlushToUser([self, tun](net::IPPacket& pkt) -> bool { - if (not llarp_ev_tun_async_write(tun, pkt.Buffer())) + self->FlushToUser([self, tun](const net::IPPacket& pkt) -> bool { + if (not llarp_ev_tun_async_write(tun, pkt.ConstBuffer())) { llarp::LogWarn(self->Name(), " packet dropped"); } diff --git a/llarp/handlers/tun.hpp b/llarp/handlers/tun.hpp index d8a1bbe1d..b994d7302 100644 --- a/llarp/handlers/tun.hpp +++ b/llarp/handlers/tun.hpp @@ -12,6 +12,7 @@ #include #include +#include namespace llarp { @@ -87,11 +88,15 @@ namespace llarp /// overrides Endpoint bool HandleInboundPacket( - const service::ConvoTag tag, const llarp_buffer_t& pkt, service::ProtocolType t) override; + const service::ConvoTag tag, + const llarp_buffer_t& pkt, + service::ProtocolType t, + uint64_t seqno) override; /// handle inbound traffic bool - HandleWriteIPPacket(const llarp_buffer_t& buf, huint128_t src, huint128_t dst); + HandleWriteIPPacket( + const llarp_buffer_t& buf, huint128_t src, huint128_t dst, uint64_t seqno); /// queue outbound packet to the world bool @@ -190,8 +195,21 @@ namespace llarp /// queue for sending packets over the network from us PacketQueue_t m_UserToNetworkPktQueue; + + struct WritePacket + { + uint64_t seqno; + net::IPPacket pkt; + + bool + operator<(const WritePacket& other) const + { + return other.seqno < seqno; + } + }; + /// queue for sending packets to user from network - PacketQueue_t m_NetworkToUserPktQueue; + std::priority_queue m_NetworkToUserPktQueue; /// return true if we have a remote loki address for this ip address bool HasRemoteForIP(huint128_t ipv4) const; @@ -279,7 +297,7 @@ namespace llarp /// send function returns true to indicate stop iteration and do codel /// drop void - FlushToUser(std::function sendfunc); + FlushToUser(std::function sendfunc); }; } // namespace handlers diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 08d2d6d3d..d4d4d82d4 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -1189,7 +1189,7 @@ namespace llarp return false; pkt.UpdateIPv4Address(src, dst); /// TODO: V6 - return HandleInboundPacket(tag, pkt.ConstBuffer(), eProtocolTrafficV4); + return HandleInboundPacket(tag, pkt.ConstBuffer(), eProtocolTrafficV4, 0); }, Router(), numPaths, @@ -1244,7 +1244,7 @@ namespace llarp { auto msg = queue.popFront(); const llarp_buffer_t buf(msg->payload); - HandleInboundPacket(msg->tag, buf, msg->proto); + HandleInboundPacket(msg->tag, buf, msg->proto, msg->seqno); } }; @@ -1352,7 +1352,7 @@ namespace llarp PutReplyIntroFor(f.T, m->introReply); m->sender = m_Identity.pub; m->seqno = GetSeqNoForConvo(f.T); - f.S = 1; + f.S = m->seqno; f.F = m->introReply.pathID; transfer->P = remoteIntro.pathID; auto self = this; @@ -1420,7 +1420,8 @@ namespace llarp auto itr = Sessions().find(tag); if (itr == Sessions().end()) return 0; - return ++(itr->second.seqno); + itr->second.seqno += 1; + return itr->second.seqno; } bool diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index 11f8adfa8..ecc05d42d 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -208,7 +208,8 @@ namespace llarp /// handle packet io from service node or hidden service to frontend virtual bool - HandleInboundPacket(const ConvoTag tag, const llarp_buffer_t& pkt, ProtocolType t) = 0; + HandleInboundPacket( + const ConvoTag tag, const llarp_buffer_t& pkt, ProtocolType t, uint64_t seqno) = 0; // virtual bool // HandleWriteIPPacket(const llarp_buffer_t& pkt, diff --git a/pybind/llarp/handlers/pyhandler.hpp b/pybind/llarp/handlers/pyhandler.hpp index 49bee11cd..0e32b748b 100644 --- a/pybind/llarp/handlers/pyhandler.hpp +++ b/pybind/llarp/handlers/pyhandler.hpp @@ -25,7 +25,8 @@ namespace llarp HandleInboundPacket( const service::ConvoTag tag, const llarp_buffer_t& pktbuf, - service::ProtocolType proto) override + service::ProtocolType proto, + uint64_t) override { if (handlePacket) {