Fix packet delay creep

- Make the main PumpLL also pump hidden services, rather than using
  separate wakers in each TunEndpoint.  It seems there is some
  interactions that just one or the other is not enough.

- Eliminate TunEndpoint send queue -- it isn't needed as we can just
  send directly.
pull/1795/head
Jason Rhinelander 3 years ago
parent 99b12940ad
commit 9844d358e6

@ -86,13 +86,7 @@ namespace llarp
TunEndpoint::TunEndpoint(AbstractRouter* r, service::Context* parent)
: service::Endpoint(r, parent)
, m_UserToNetworkPktQueue("endpoint_sendq", r->loop(), r->loop())
{
m_PacketSendWaker = r->loop()->make_waker([this]() { FlushWrite(); });
m_MessageSendWaker = r->loop()->make_waker([this]() {
FlushSend();
Pump(Now());
});
m_PacketRouter = std::make_unique<vpn::PacketRouter>(
[this](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); });
#if defined(ANDROID) || defined(__APPLE__)
@ -362,15 +356,7 @@ namespace llarp
}
void
TunEndpoint::Flush()
{
FlushSend();
Pump(Now());
FlushWrite();
}
void
TunEndpoint::FlushWrite()
TunEndpoint::Pump(llarp_time_t now)
{
// flush network to user
while (not m_NetworkToUserPktQueue.empty())
@ -378,6 +364,8 @@ namespace llarp
m_NetIf->WritePacket(m_NetworkToUserPktQueue.top().pkt);
m_NetworkToUserPktQueue.pop();
}
service::Endpoint::Pump(now);
}
static bool
@ -952,7 +940,6 @@ namespace llarp
LogInfo(Name(), " has ipv6 address ", m_OurIPv6);
}
#endif
Router()->loop()->add_ticker([this] { Flush(); });
// Attempt to register DNS on the interface
systemd_resolved_set_dns(
@ -1037,149 +1024,151 @@ namespace llarp
}
void
TunEndpoint::FlushSend()
TunEndpoint::HandleGotUserPacket(net::IPPacket pkt)
{
m_UserToNetworkPktQueue.Process([&](net::IPPacket& pkt) {
huint128_t dst, src;
if (pkt.IsV4())
{
dst = pkt.dst4to6();
src = pkt.src4to6();
}
else
{
dst = pkt.dstv6();
src = pkt.srcv6();
}
// this is for ipv6 slaac on ipv6 exits
/*
constexpr huint128_t ipv6_multicast_all_nodes =
huint128_t{uint128_t{0xff01'0000'0000'0000UL, 1UL}};
constexpr huint128_t ipv6_multicast_all_routers =
huint128_t{uint128_t{0xff01'0000'0000'0000UL, 2UL}};
if (dst == ipv6_multicast_all_nodes and m_state->m_ExitEnabled)
huint128_t dst, src;
if (pkt.IsV4())
{
dst = pkt.dst4to6();
src = pkt.src4to6();
}
else
{
dst = pkt.dstv6();
src = pkt.srcv6();
}
// this is for ipv6 slaac on ipv6 exits
/*
constexpr huint128_t ipv6_multicast_all_nodes =
huint128_t{uint128_t{0xff01'0000'0000'0000UL, 1UL}};
constexpr huint128_t ipv6_multicast_all_routers =
huint128_t{uint128_t{0xff01'0000'0000'0000UL, 2UL}};
if (dst == ipv6_multicast_all_nodes and m_state->m_ExitEnabled)
{
// send ipv6 multicast
for (const auto& [ip, addr] : m_IPToAddr)
{
// send ipv6 multicast
for (const auto& [ip, addr] : m_IPToAddr)
{
(void)ip;
SendToOrQueue(
service::Address{addr.as_array()}, pkt.ConstBuffer(), service::ProtocolType::Exit);
}
return;
(void)ip;
SendToOrQueue(
service::Address{addr.as_array()}, pkt.ConstBuffer(), service::ProtocolType::Exit);
}
return;
}
*/
if (m_state->m_ExitEnabled)
{
dst = net::ExpandV4(net::TruncateV6(dst));
}
auto itr = m_IPToAddr.find(dst);
if (itr == m_IPToAddr.end())
*/
if (m_state->m_ExitEnabled)
{
dst = net::ExpandV4(net::TruncateV6(dst));
}
auto itr = m_IPToAddr.find(dst);
if (itr == m_IPToAddr.end())
{
// find all ranges that match the destination ip
const auto exitEntries = m_ExitMap.FindAllEntries(dst);
if (exitEntries.empty())
{
// find all ranges that match the destination ip
const auto exitEntries = m_ExitMap.FindAllEntries(dst);
if (exitEntries.empty())
// send icmp unreachable as we dont have any exits for this ip
if (const auto icmp = pkt.MakeICMPUnreachable())
{
// send icmp unreachable as we dont have any exits for this ip
if (const auto icmp = pkt.MakeICMPUnreachable())
{
HandleWriteIPPacket(icmp->ConstBuffer(), dst, src, 0);
}
return;
}
service::Address addr{};
for (const auto& [range, exitAddr] : exitEntries)
{
if (
// we permit this because it matches our rules and we allow bogons:
(range.BogonRange() and range.Contains(dst))
// allow because the destination is not a bogon and the mapped range is not a bogon
or not IsBogon(dst))
{
addr = exitAddr;
}
// we do not permit bogons when they don't explicitly match a permitted bogon range
/// ?
HandleWriteIPPacket(icmp->ConstBuffer(), dst, src, 0);
}
if (addr.IsZero()) // drop becase no exit was found that matches our rules
return;
pkt.ZeroSourceAddress();
MarkAddressOutbound(addr);
EnsurePathToService(
addr,
[pkt](service::Address addr, service::OutboundContext* ctx) {
if (ctx)
{
ctx->SendPacketToRemote(pkt.ConstBuffer(), service::ProtocolType::Exit);
return;
}
LogWarn("cannot ensure path to exit ", addr, " so we drop some packets");
},
PathAlignmentTimeout());
return;
}
std::variant<service::Address, RouterID> to;
service::ProtocolType type;
if (m_SNodes.at(itr->second))
{
to = RouterID{itr->second.as_array()};
type = service::ProtocolType::TrafficV4;
}
else
{
to = service::Address{itr->second.as_array()};
type = m_state->m_ExitEnabled and src != m_OurIP ? service::ProtocolType::Exit
: pkt.ServiceProtocol();
}
// prepare packet for insertion into network
// this includes clearing IP addresses, recalculating checksums, etc
// this does not happen for exits because the point is they don't rewrite addresses
if (type != service::ProtocolType::Exit)
{
if (pkt.IsV4())
pkt.UpdateIPv4Address({0}, {0});
else
pkt.UpdateIPv6Address({0}, {0});
}
// try sending it on an existing convotag
// this succeds for inbound convos, probably.
if (auto maybe = GetBestConvoTagFor(to))
service::Address addr{};
for (const auto& [range, exitAddr] : exitEntries)
{
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
if (
// we permit this because it matches our rules and we allow bogons:
(range.BogonRange() and range.Contains(dst))
// allow because the destination is not a bogon and the mapped range is not a bogon
or not IsBogon(dst))
{
MarkIPActive(dst);
return;
addr = exitAddr;
}
// we do not permit bogons when they don't explicitly match a permitted bogon range
}
// try establishing a path to this guy
// will fail if it's an inbound convo
EnsurePathTo(
to,
[pkt, type, dst, to, this](auto maybe) {
if (not maybe)
{
var::visit(
[this](auto&& addr) {
LogWarn(Name(), " failed to ensure path to ", addr, " no convo tag found");
},
to);
}
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
}
else
if (addr.IsZero()) // drop becase no exit was found that matches our rules
return;
pkt.ZeroSourceAddress();
MarkAddressOutbound(addr);
EnsurePathToService(
addr,
[pkt, this](service::Address addr, service::OutboundContext* ctx) {
if (ctx)
{
var::visit(
[this](auto&& addr) {
LogWarn(Name(), " failed to send to ", addr, ", SendToOrQueue failed");
},
to);
ctx->SendPacketToRemote(pkt.ConstBuffer(), service::ProtocolType::Exit);
Router()->TriggerPump();
return;
}
LogWarn("cannot ensure path to exit ", addr, " so we drop some packets");
},
PathAlignmentTimeout());
});
return;
}
std::variant<service::Address, RouterID> to;
service::ProtocolType type;
if (m_SNodes.at(itr->second))
{
to = RouterID{itr->second.as_array()};
type = service::ProtocolType::TrafficV4;
}
else
{
to = service::Address{itr->second.as_array()};
type = m_state->m_ExitEnabled and src != m_OurIP ? service::ProtocolType::Exit
: pkt.ServiceProtocol();
}
// prepare packet for insertion into network
// this includes clearing IP addresses, recalculating checksums, etc
// this does not happen for exits because the point is they don't rewrite addresses
if (type != service::ProtocolType::Exit)
{
if (pkt.IsV4())
pkt.UpdateIPv4Address({0}, {0});
else
pkt.UpdateIPv6Address({0}, {0});
}
// try sending it on an existing convotag
// this succeds for inbound convos, probably.
if (auto maybe = GetBestConvoTagFor(to))
{
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
Router()->TriggerPump();
return;
}
}
// try establishing a path to this guy
// will fail if it's an inbound convo
EnsurePathTo(
to,
[pkt, type, dst, to, this](auto maybe) {
if (not maybe)
{
var::visit(
[this](auto&& addr) {
LogWarn(Name(), " failed to ensure path to ", addr, " no convo tag found");
},
to);
}
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
Router()->TriggerPump();
}
else
{
var::visit(
[this](auto&& addr) {
LogWarn(Name(), " failed to send to ", addr, ", SendToOrQueue failed");
},
to);
}
},
PathAlignmentTimeout());
}
bool
@ -1333,8 +1322,8 @@ namespace llarp
pkt.UpdateIPv6Address(src, dst);
}
m_NetworkToUserPktQueue.push(std::move(write));
// wake up packet flushing event so we ensure that all packets are written to user
m_PacketSendWaker->Trigger();
// wake up so we ensure that all packets are written to user
Router()->TriggerPump();
return true;
}
@ -1439,13 +1428,6 @@ namespace llarp
m_IPActivity[ip] = std::numeric_limits<llarp_time_t>::max();
}
void
TunEndpoint::HandleGotUserPacket(net::IPPacket pkt)
{
m_UserToNetworkPktQueue.Emplace(std::move(pkt));
m_MessageSendWaker->Trigger();
}
TunEndpoint::~TunEndpoint() = default;
} // namespace handlers

@ -172,10 +172,6 @@ namespace llarp
huint128_t
ObtainIPForAddr(std::variant<service::Address, RouterID> addr) override;
/// flush network traffic
void
Flush();
void
ResetInternalState() override;
@ -187,9 +183,6 @@ namespace llarp
net::IPPacket::CompareOrder,
net::IPPacket::GetNow>;
/// queue for sending packets over the network from us
PacketQueue_t m_UserToNetworkPktQueue;
struct WritePacket
{
uint64_t seqno;
@ -204,6 +197,10 @@ namespace llarp
/// queue for sending packets to user from network
std::priority_queue<WritePacket> m_NetworkToUserPktQueue;
void
Pump(llarp_time_t now) override;
/// return true if we have a remote loki address for this ip address
bool
HasRemoteForIP(huint128_t ipv4) const;
@ -216,10 +213,6 @@ namespace llarp
void
MarkIPActiveForever(huint128_t ip);
/// flush ip packets
virtual void
FlushSend();
/// flush writing ip packets to interface
void
FlushWrite();
@ -292,11 +285,6 @@ namespace llarp
std::set<IPRange> m_OwnedRanges;
/// how long to wait for path alignment
llarp_time_t m_PathAlignmentTimeout;
/// idempotent wakeup for writing packets to user
std::shared_ptr<EventLoopWakeup> m_PacketSendWaker;
/// idempotent wakeup for writing messages to network
std::shared_ptr<EventLoopWakeup> m_MessageSendWaker;
/// a file to load / store the ephemeral address map to
std::optional<fs::path> m_PersistAddrMapFile;

@ -89,6 +89,7 @@ namespace llarp
return;
_outboundMessageHandler.Pump();
_linkManager.PumpLinks();
_hiddenServiceContext.Pump();
llarp::LogTrace("Router::PumpLL() end");
}

@ -80,6 +80,14 @@ namespace llarp
}
}
void
Context::Pump()
{
auto now = time_now_ms();
for (auto& [name, endpoint] : m_Endpoints)
endpoint->Pump(now);
}
bool
Context::RemoveEndpoint(const std::string& name)
{

@ -36,6 +36,10 @@ namespace llarp
void
ForEachService(std::function<bool(const std::string&, const Endpoint_ptr&)> visit) const;
/// Pumps the hidden service endpoints, called during Router::PumpLL
void
Pump();
/// add endpoint via config
void
AddEndpoint(const Config& conf, bool autostart = false);

@ -1596,7 +1596,6 @@ namespace llarp
}
if (not SendToOrQueue(*maybe, pkt, t))
return false;
m_state->m_Router->TriggerPump();
return true;
}
LogDebug("SendToOrQueue failed: no endpoint for convo tag ", tag);
@ -1610,12 +1609,14 @@ namespace llarp
auto pkt = std::make_shared<net::IPPacket>();
if (!pkt->Load(buf))
return false;
EnsurePathToSNode(addr, [=](RouterID, exit::BaseSession_ptr s, ConvoTag) {
if (s)
{
s->SendPacketToRemote(pkt->ConstBuffer(), t);
}
});
EnsurePathToSNode(
addr, [this, t, pkt = std::move(pkt)](RouterID, exit::BaseSession_ptr s, ConvoTag) {
if (s)
{
s->SendPacketToRemote(pkt->ConstBuffer(), t);
Router()->TriggerPump();
}
});
return true;
}
@ -1673,7 +1674,6 @@ namespace llarp
}
UpstreamFlush(router);
router->TriggerPump();
}
std::optional<ConvoTag>
@ -1885,6 +1885,7 @@ namespace llarp
return;
}
self->m_SendQueue.tryPushBack(SendEvent_t{transfer, p});
self->Router()->TriggerPump();
});
return true;
}

@ -244,6 +244,10 @@ namespace llarp
std::string service,
std::function<void(std::vector<dns::SRVData>)> resultHandler) override;
/// called when something needs a pump to trigger the pump idempotently
void
TriggerPump();
/// called on event loop pump
virtual void
Pump(llarp_time_t now);

Loading…
Cancel
Save