diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 97ef79f06..5c49ca216 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -86,13 +86,7 @@ namespace llarp TunEndpoint::TunEndpoint(AbstractRouter* r, service::Context* parent) : service::Endpoint(r, parent) - , m_UserToNetworkPktQueue("endpoint_sendq", r->loop(), r->loop()) { - m_PacketSendWaker = r->loop()->make_waker([this]() { FlushWrite(); }); - m_MessageSendWaker = r->loop()->make_waker([this]() { - FlushSend(); - Pump(Now()); - }); m_PacketRouter = std::make_unique( [this](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); }); #if defined(ANDROID) || defined(__APPLE__) @@ -362,15 +356,7 @@ namespace llarp } void - TunEndpoint::Flush() - { - FlushSend(); - Pump(Now()); - FlushWrite(); - } - - void - TunEndpoint::FlushWrite() + TunEndpoint::Pump(llarp_time_t now) { // flush network to user while (not m_NetworkToUserPktQueue.empty()) @@ -378,6 +364,8 @@ namespace llarp m_NetIf->WritePacket(m_NetworkToUserPktQueue.top().pkt); m_NetworkToUserPktQueue.pop(); } + + service::Endpoint::Pump(now); } static bool @@ -952,7 +940,6 @@ namespace llarp LogInfo(Name(), " has ipv6 address ", m_OurIPv6); } #endif - Router()->loop()->add_ticker([this] { Flush(); }); // Attempt to register DNS on the interface systemd_resolved_set_dns( @@ -1037,149 +1024,151 @@ namespace llarp } void - TunEndpoint::FlushSend() + TunEndpoint::HandleGotUserPacket(net::IPPacket pkt) { - m_UserToNetworkPktQueue.Process([&](net::IPPacket& pkt) { - huint128_t dst, src; - if (pkt.IsV4()) - { - dst = pkt.dst4to6(); - src = pkt.src4to6(); - } - else - { - dst = pkt.dstv6(); - src = pkt.srcv6(); - } - // this is for ipv6 slaac on ipv6 exits - /* - constexpr huint128_t ipv6_multicast_all_nodes = - huint128_t{uint128_t{0xff01'0000'0000'0000UL, 1UL}}; - constexpr huint128_t ipv6_multicast_all_routers = - huint128_t{uint128_t{0xff01'0000'0000'0000UL, 2UL}}; - if (dst == ipv6_multicast_all_nodes and m_state->m_ExitEnabled) + huint128_t dst, src; + if (pkt.IsV4()) + { + dst = pkt.dst4to6(); + src = pkt.src4to6(); + } + else + { + dst = pkt.dstv6(); + src = pkt.srcv6(); + } + // this is for ipv6 slaac on ipv6 exits + /* + constexpr huint128_t ipv6_multicast_all_nodes = + huint128_t{uint128_t{0xff01'0000'0000'0000UL, 1UL}}; + constexpr huint128_t ipv6_multicast_all_routers = + huint128_t{uint128_t{0xff01'0000'0000'0000UL, 2UL}}; + if (dst == ipv6_multicast_all_nodes and m_state->m_ExitEnabled) + { + // send ipv6 multicast + for (const auto& [ip, addr] : m_IPToAddr) { - // send ipv6 multicast - for (const auto& [ip, addr] : m_IPToAddr) - { - (void)ip; - SendToOrQueue( - service::Address{addr.as_array()}, pkt.ConstBuffer(), service::ProtocolType::Exit); - } - return; + (void)ip; + SendToOrQueue( + service::Address{addr.as_array()}, pkt.ConstBuffer(), service::ProtocolType::Exit); } + return; + } - */ - if (m_state->m_ExitEnabled) - { - dst = net::ExpandV4(net::TruncateV6(dst)); - } - auto itr = m_IPToAddr.find(dst); - if (itr == m_IPToAddr.end()) + */ + if (m_state->m_ExitEnabled) + { + dst = net::ExpandV4(net::TruncateV6(dst)); + } + auto itr = m_IPToAddr.find(dst); + if (itr == m_IPToAddr.end()) + { + // find all ranges that match the destination ip + const auto exitEntries = m_ExitMap.FindAllEntries(dst); + if (exitEntries.empty()) { - // find all ranges that match the destination ip - const auto exitEntries = m_ExitMap.FindAllEntries(dst); - if (exitEntries.empty()) + // send icmp unreachable as we dont have any exits for this ip + if (const auto icmp = pkt.MakeICMPUnreachable()) { - // send icmp unreachable as we dont have any exits for this ip - if (const auto icmp = pkt.MakeICMPUnreachable()) - { - HandleWriteIPPacket(icmp->ConstBuffer(), dst, src, 0); - } - return; - } - service::Address addr{}; - for (const auto& [range, exitAddr] : exitEntries) - { - if ( - // we permit this because it matches our rules and we allow bogons: - (range.BogonRange() and range.Contains(dst)) - // allow because the destination is not a bogon and the mapped range is not a bogon - or not IsBogon(dst)) - { - addr = exitAddr; - } - // we do not permit bogons when they don't explicitly match a permitted bogon range + /// ? + HandleWriteIPPacket(icmp->ConstBuffer(), dst, src, 0); } - if (addr.IsZero()) // drop becase no exit was found that matches our rules - return; - pkt.ZeroSourceAddress(); - MarkAddressOutbound(addr); - EnsurePathToService( - addr, - [pkt](service::Address addr, service::OutboundContext* ctx) { - if (ctx) - { - ctx->SendPacketToRemote(pkt.ConstBuffer(), service::ProtocolType::Exit); - return; - } - LogWarn("cannot ensure path to exit ", addr, " so we drop some packets"); - }, - PathAlignmentTimeout()); return; } - std::variant to; - service::ProtocolType type; - if (m_SNodes.at(itr->second)) - { - to = RouterID{itr->second.as_array()}; - type = service::ProtocolType::TrafficV4; - } - else - { - to = service::Address{itr->second.as_array()}; - type = m_state->m_ExitEnabled and src != m_OurIP ? service::ProtocolType::Exit - : pkt.ServiceProtocol(); - } - - // prepare packet for insertion into network - // this includes clearing IP addresses, recalculating checksums, etc - // this does not happen for exits because the point is they don't rewrite addresses - if (type != service::ProtocolType::Exit) - { - if (pkt.IsV4()) - pkt.UpdateIPv4Address({0}, {0}); - else - pkt.UpdateIPv6Address({0}, {0}); - } - // try sending it on an existing convotag - // this succeds for inbound convos, probably. - if (auto maybe = GetBestConvoTagFor(to)) + service::Address addr{}; + for (const auto& [range, exitAddr] : exitEntries) { - if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type)) + if ( + // we permit this because it matches our rules and we allow bogons: + (range.BogonRange() and range.Contains(dst)) + // allow because the destination is not a bogon and the mapped range is not a bogon + or not IsBogon(dst)) { - MarkIPActive(dst); - return; + addr = exitAddr; } + // we do not permit bogons when they don't explicitly match a permitted bogon range } - // try establishing a path to this guy - // will fail if it's an inbound convo - EnsurePathTo( - to, - [pkt, type, dst, to, this](auto maybe) { - if (not maybe) - { - var::visit( - [this](auto&& addr) { - LogWarn(Name(), " failed to ensure path to ", addr, " no convo tag found"); - }, - to); - } - if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type)) - { - MarkIPActive(dst); - } - else + if (addr.IsZero()) // drop becase no exit was found that matches our rules + return; + pkt.ZeroSourceAddress(); + MarkAddressOutbound(addr); + EnsurePathToService( + addr, + [pkt, this](service::Address addr, service::OutboundContext* ctx) { + if (ctx) { - var::visit( - [this](auto&& addr) { - LogWarn(Name(), " failed to send to ", addr, ", SendToOrQueue failed"); - }, - to); + ctx->SendPacketToRemote(pkt.ConstBuffer(), service::ProtocolType::Exit); + Router()->TriggerPump(); + return; } + LogWarn("cannot ensure path to exit ", addr, " so we drop some packets"); }, PathAlignmentTimeout()); - }); + return; + } + std::variant to; + service::ProtocolType type; + if (m_SNodes.at(itr->second)) + { + to = RouterID{itr->second.as_array()}; + type = service::ProtocolType::TrafficV4; + } + else + { + to = service::Address{itr->second.as_array()}; + type = m_state->m_ExitEnabled and src != m_OurIP ? service::ProtocolType::Exit + : pkt.ServiceProtocol(); + } + + // prepare packet for insertion into network + // this includes clearing IP addresses, recalculating checksums, etc + // this does not happen for exits because the point is they don't rewrite addresses + if (type != service::ProtocolType::Exit) + { + if (pkt.IsV4()) + pkt.UpdateIPv4Address({0}, {0}); + else + pkt.UpdateIPv6Address({0}, {0}); + } + // try sending it on an existing convotag + // this succeds for inbound convos, probably. + if (auto maybe = GetBestConvoTagFor(to)) + { + if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type)) + { + MarkIPActive(dst); + Router()->TriggerPump(); + return; + } + } + // try establishing a path to this guy + // will fail if it's an inbound convo + EnsurePathTo( + to, + [pkt, type, dst, to, this](auto maybe) { + if (not maybe) + { + var::visit( + [this](auto&& addr) { + LogWarn(Name(), " failed to ensure path to ", addr, " no convo tag found"); + }, + to); + } + if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type)) + { + MarkIPActive(dst); + Router()->TriggerPump(); + } + else + { + var::visit( + [this](auto&& addr) { + LogWarn(Name(), " failed to send to ", addr, ", SendToOrQueue failed"); + }, + to); + } + }, + PathAlignmentTimeout()); } bool @@ -1333,8 +1322,8 @@ namespace llarp pkt.UpdateIPv6Address(src, dst); } m_NetworkToUserPktQueue.push(std::move(write)); - // wake up packet flushing event so we ensure that all packets are written to user - m_PacketSendWaker->Trigger(); + // wake up so we ensure that all packets are written to user + Router()->TriggerPump(); return true; } @@ -1439,13 +1428,6 @@ namespace llarp m_IPActivity[ip] = std::numeric_limits::max(); } - void - TunEndpoint::HandleGotUserPacket(net::IPPacket pkt) - { - m_UserToNetworkPktQueue.Emplace(std::move(pkt)); - m_MessageSendWaker->Trigger(); - } - TunEndpoint::~TunEndpoint() = default; } // namespace handlers diff --git a/llarp/handlers/tun.hpp b/llarp/handlers/tun.hpp index 6ea44b794..434a3314b 100644 --- a/llarp/handlers/tun.hpp +++ b/llarp/handlers/tun.hpp @@ -172,10 +172,6 @@ namespace llarp huint128_t ObtainIPForAddr(std::variant addr) override; - /// flush network traffic - void - Flush(); - void ResetInternalState() override; @@ -187,9 +183,6 @@ namespace llarp net::IPPacket::CompareOrder, net::IPPacket::GetNow>; - /// queue for sending packets over the network from us - PacketQueue_t m_UserToNetworkPktQueue; - struct WritePacket { uint64_t seqno; @@ -204,6 +197,10 @@ namespace llarp /// queue for sending packets to user from network std::priority_queue m_NetworkToUserPktQueue; + + void + Pump(llarp_time_t now) override; + /// return true if we have a remote loki address for this ip address bool HasRemoteForIP(huint128_t ipv4) const; @@ -216,10 +213,6 @@ namespace llarp void MarkIPActiveForever(huint128_t ip); - /// flush ip packets - virtual void - FlushSend(); - /// flush writing ip packets to interface void FlushWrite(); @@ -292,11 +285,6 @@ namespace llarp std::set m_OwnedRanges; /// how long to wait for path alignment llarp_time_t m_PathAlignmentTimeout; - /// idempotent wakeup for writing packets to user - std::shared_ptr m_PacketSendWaker; - - /// idempotent wakeup for writing messages to network - std::shared_ptr m_MessageSendWaker; /// a file to load / store the ephemeral address map to std::optional m_PersistAddrMapFile; diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index cad2d0ae5..e7ca84ad8 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -89,6 +89,7 @@ namespace llarp return; _outboundMessageHandler.Pump(); _linkManager.PumpLinks(); + _hiddenServiceContext.Pump(); llarp::LogTrace("Router::PumpLL() end"); } diff --git a/llarp/service/context.cpp b/llarp/service/context.cpp index 3f75ca828..a376b14b5 100644 --- a/llarp/service/context.cpp +++ b/llarp/service/context.cpp @@ -80,6 +80,14 @@ namespace llarp } } + void + Context::Pump() + { + auto now = time_now_ms(); + for (auto& [name, endpoint] : m_Endpoints) + endpoint->Pump(now); + } + bool Context::RemoveEndpoint(const std::string& name) { diff --git a/llarp/service/context.hpp b/llarp/service/context.hpp index e7bbdc7dc..a3ef35f3f 100644 --- a/llarp/service/context.hpp +++ b/llarp/service/context.hpp @@ -36,6 +36,10 @@ namespace llarp void ForEachService(std::function visit) const; + /// Pumps the hidden service endpoints, called during Router::PumpLL + void + Pump(); + /// add endpoint via config void AddEndpoint(const Config& conf, bool autostart = false); diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 0b4faac33..40537b27d 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -1596,7 +1596,6 @@ namespace llarp } if (not SendToOrQueue(*maybe, pkt, t)) return false; - m_state->m_Router->TriggerPump(); return true; } LogDebug("SendToOrQueue failed: no endpoint for convo tag ", tag); @@ -1610,12 +1609,14 @@ namespace llarp auto pkt = std::make_shared(); if (!pkt->Load(buf)) return false; - EnsurePathToSNode(addr, [=](RouterID, exit::BaseSession_ptr s, ConvoTag) { - if (s) - { - s->SendPacketToRemote(pkt->ConstBuffer(), t); - } - }); + EnsurePathToSNode( + addr, [this, t, pkt = std::move(pkt)](RouterID, exit::BaseSession_ptr s, ConvoTag) { + if (s) + { + s->SendPacketToRemote(pkt->ConstBuffer(), t); + Router()->TriggerPump(); + } + }); return true; } @@ -1673,7 +1674,6 @@ namespace llarp } UpstreamFlush(router); - router->TriggerPump(); } std::optional @@ -1885,6 +1885,7 @@ namespace llarp return; } self->m_SendQueue.tryPushBack(SendEvent_t{transfer, p}); + self->Router()->TriggerPump(); }); return true; } diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index cfb5a2ef9..56f255f69 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -244,6 +244,10 @@ namespace llarp std::string service, std::function)> resultHandler) override; + /// called when something needs a pump to trigger the pump idempotently + void + TriggerPump(); + /// called on event loop pump virtual void Pump(llarp_time_t now);