Fix link layer delivery race condition (fix random ping delays)

We trigger a pump immediately, but this is racey because we add to our
plaintext data in a worker thread; if the worker thread runs after the
pump then it ends up leaving plaintext to be handled, but there's no
wakeup until the next one.

This was the cause of seeing a random +1s and bunching added to ping
responses sometimes: it wasn't until the *next* ping goes through the
network that the plaintext queue gets processed, at which point it
flushes the old one and often the new one together.

The fix here gets rid of the map of sessions needing wakeups and instead
adds an atomic flag to all of them to let us figure out which ones
need to be flushed.
pull/1795/head
Jason Rhinelander 3 years ago
parent aa1dc83459
commit 74215fc44c

@ -23,9 +23,7 @@ namespace llarp::iwp
: ILinkLayer(
keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, worker)
, m_Wakeup{ev->make_waker([this]() { HandleWakeupPlaintext(); })}
, m_PlaintextRecv{1024}
, m_Inbound{allowInbound}
{}
const char*
@ -81,6 +79,7 @@ namespace llarp::iwp
LogWarn("Brand new session failed; removing from pending sessions list");
m_Pending.erase(from);
}
WakeupPlaintext();
}
}
@ -107,13 +106,6 @@ namespace llarp::iwp
return std::make_shared<Session>(this, rc, ai);
}
void
LinkLayer::AddWakeup(std::weak_ptr<Session> session)
{
if (auto ptr = session.lock())
m_PlaintextRecv[ptr->GetRemoteEndpoint()] = session;
}
void
LinkLayer::WakeupPlaintext()
{
@ -123,13 +115,15 @@ namespace llarp::iwp
void
LinkLayer::HandleWakeupPlaintext()
{
for (const auto& [addr, session] : m_PlaintextRecv)
{
auto ptr = session.lock();
if (ptr)
ptr->HandlePlaintext();
}
m_PlaintextRecv.clear();
// Copy bare pointers out first because HandlePlaintext can end up removing themselves from the
// structures.
m_WakingUp.clear(); // Reused to minimize allocations.
for (const auto& [router_id, session] : m_AuthedLinks)
m_WakingUp.push_back(session.get());
for (const auto& [addr, session] : m_Pending)
m_WakingUp.push_back(session.get());
for (auto* session : m_WakingUp)
session->HandlePlaintext();
PumpDone();
}

@ -53,9 +53,6 @@ namespace llarp::iwp
void
WakeupPlaintext();
void
AddWakeup(std::weak_ptr<Session> peer);
std::string
PrintableName() const;
@ -64,8 +61,8 @@ namespace llarp::iwp
HandleWakeupPlaintext();
const std::shared_ptr<EventLoopWakeup> m_Wakeup;
std::unordered_map<SockAddr, std::weak_ptr<Session>> m_PlaintextRecv;
std::unordered_map<SockAddr, RouterID> m_AuthedAddrs;
std::vector<ILinkSession*> m_WakingUp;
const bool m_Inbound;
};

@ -40,6 +40,7 @@ namespace llarp
, m_PlaintextRecv{PlaintextQueueSize}
{
token.Zero();
m_PlaintextEmpty.test_and_set();
GotLIM = util::memFn(&Session::GotOutboundLIM, this);
CryptoManager::instance()->shorthash(m_SessionKey, llarp_buffer_t(rc.pubkey));
}
@ -53,6 +54,7 @@ namespace llarp
, m_PlaintextRecv{PlaintextQueueSize}
{
token.Randomize();
m_PlaintextEmpty.test_and_set();
GotLIM = util::memFn(&Session::GotInboundLIM, this);
const PubKey pk = m_Parent->GetOurRC().pubkey;
CryptoManager::instance()->shorthash(m_SessionKey, llarp_buffer_t(pk));
@ -267,7 +269,6 @@ namespace llarp
if (not m_DecryptNext.empty())
{
m_Parent->AddWakeup(weak_from_this());
m_Parent->QueueWork([self, data = m_DecryptNext] { self->DecryptWorker(data); });
m_DecryptNext.clear();
}
@ -648,12 +649,15 @@ namespace llarp
++itr;
}
m_PlaintextRecv.tryPushBack(std::move(msgs));
m_PlaintextEmpty.clear();
m_Parent->WakeupPlaintext();
}
void
Session::HandlePlaintext()
{
if (m_PlaintextEmpty.test_and_set())
return;
while (auto maybe_queue = m_PlaintextRecv.tryPopFront())
{
for (auto& result : *maybe_queue)
@ -688,7 +692,7 @@ namespace llarp
}
}
SendMACK();
TriggerPump();
m_Parent->WakeupPlaintext();
}
void

@ -132,7 +132,7 @@ namespace llarp
return m_Inbound;
}
void
HandlePlaintext();
HandlePlaintext() override;
private:
enum class State
@ -201,6 +201,7 @@ namespace llarp
CryptoQueue_t m_EncryptNext;
CryptoQueue_t m_DecryptNext;
std::atomic_flag m_PlaintextEmpty;
llarp::thread::Queue<CryptoQueue_t> m_PlaintextRecv;
void

@ -408,8 +408,7 @@ namespace llarp
Lock_t l(m_AuthedLinksMutex);
RouterID r = remote;
llarp::LogInfo("Closing all to ", r);
auto [itr, end] = m_AuthedLinks.equal_range(r);
while (itr != end)
for (auto [itr, end] = m_AuthedLinks.equal_range(r); itr != end;)
{
itr->second->Close();
m_RecentlyClosed.emplace(itr->second->GetRemoteEndpoint(), now + CloseGraceWindow);
@ -493,7 +492,6 @@ namespace llarp
if (m_Pending.count(address))
return false;
m_Pending.emplace(address, s);
m_Router->PumpLL();
return true;
}

@ -130,5 +130,8 @@ namespace llarp
virtual util::StatusObject
ExtractStatus() const = 0;
virtual void
HandlePlaintext() = 0;
};
} // namespace llarp

@ -1248,6 +1248,7 @@ namespace llarp
#else
_loop->set_pump_function([this] { PumpLLNonIdempotent(); });
#endif
//_loop->call_every(10ms, weak_from_this(), [this] { PumpLLNonIdempotent(); });
_loop->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); });
_running.store(true);
_startedAt = Now();

Loading…
Cancel
Save