Merge pull request #947 from majestrate/close-connections-when-done-2019-12-05

Close connections when done
pull/951/head
Jeff 5 years ago committed by GitHub
commit 2e9ae35af7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -170,10 +170,10 @@ namespace llarp
if(m_State == State::Closed) if(m_State == State::Closed)
return; return;
auto close_msg = CreatePacket(Command::eCLOS, 0, 16, 16); auto close_msg = CreatePacket(Command::eCLOS, 0, 16, 16);
EncryptAndSend(std::move(close_msg));
if(m_State == State::Ready) if(m_State == State::Ready)
m_Parent->UnmapAddr(m_RemoteAddr); m_Parent->UnmapAddr(m_RemoteAddr);
m_State = State::Closed; m_State = State::Closed;
EncryptAndSend(std::move(close_msg));
LogInfo("closing connection to ", m_RemoteAddr); LogInfo("closing connection to ", m_RemoteAddr);
} }

@ -302,9 +302,9 @@ namespace llarp
auto itr = m_PersistingSessions.begin(); auto itr = m_PersistingSessions.begin();
while(itr != m_PersistingSessions.end()) while(itr != m_PersistingSessions.end())
{ {
auto link = GetLinkWithSessionTo(itr->first);
if(now < itr->second) if(now < itr->second)
{ {
auto link = GetLinkWithSessionTo(itr->first);
if(link) if(link)
{ {
LogDebug("keepalive to ", itr->first); LogDebug("keepalive to ", itr->first);
@ -321,6 +321,10 @@ namespace llarp
const RouterID r(itr->first); const RouterID r(itr->first);
LogInfo("commit to ", r, " expired"); LogInfo("commit to ", r, " expired");
itr = m_PersistingSessions.erase(itr); itr = m_PersistingSessions.erase(itr);
for(const auto &link : outboundLinks)
{
link->CloseSessionTo(r);
}
} }
} }
} }

@ -11,8 +11,9 @@ namespace llarp
{ {
static constexpr size_t MaxSessionsPerKey = 16; static constexpr size_t MaxSessionsPerKey = 16;
ILinkLayer::ILinkLayer(std::shared_ptr<KeyManager> keyManager, GetRCFunc getrc, ILinkLayer::ILinkLayer(std::shared_ptr< KeyManager > keyManager,
LinkMessageHandler handler, SignBufferFunc signbuf, GetRCFunc getrc, LinkMessageHandler handler,
SignBufferFunc signbuf,
SessionEstablishedHandler establishedSession, SessionEstablishedHandler establishedSession,
SessionRenegotiateHandler reneg, SessionRenegotiateHandler reneg,
TimeoutHandler timeout, SessionClosedHandler closed, TimeoutHandler timeout, SessionClosedHandler closed,
@ -286,7 +287,6 @@ namespace llarp
ILinkLayer::Start(std::shared_ptr< Logic > l, ILinkLayer::Start(std::shared_ptr< Logic > l,
std::shared_ptr< thread::ThreadPool > worker) std::shared_ptr< thread::ThreadPool > worker)
{ {
m_Recv = std::make_shared< TrafficQueue_t >();
m_Worker = worker; m_Worker = worker;
m_Logic = l; m_Logic = l;
ScheduleTick(100); ScheduleTick(100);
@ -315,6 +315,17 @@ namespace llarp
++itr; ++itr;
} }
} }
{
// decay recently closed list
auto itr = m_RecentlyClosed.begin();
while(itr != m_RecentlyClosed.end())
{
if(itr->second >= now)
itr = m_RecentlyClosed.erase(itr);
else
++itr;
}
}
} }
void void
@ -340,12 +351,13 @@ namespace llarp
++itr; ++itr;
} }
} }
m_Recv.reset();
} }
void void
ILinkLayer::CloseSessionTo(const RouterID& remote) ILinkLayer::CloseSessionTo(const RouterID& remote)
{ {
static constexpr llarp_time_t CloseGraceWindow = 500;
const auto now = Now();
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex); ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
RouterID r = remote; RouterID r = remote;
llarp::LogInfo("Closing all to ", r); llarp::LogInfo("Closing all to ", r);
@ -354,6 +366,8 @@ namespace llarp
while(itr != range.second) while(itr != range.second)
{ {
itr->second->Close(); itr->second->Close();
m_RecentlyClosed.emplace(itr->second->GetRemoteEndpoint(),
now + CloseGraceWindow);
itr = m_AuthedLinks.erase(itr); itr = m_AuthedLinks.erase(itr);
} }
} }
@ -462,7 +476,11 @@ namespace llarp
auto itr = pkts->begin(); auto itr = pkts->begin();
while(itr != pkts->end()) while(itr != pkts->end())
{ {
link->RecvFrom(itr->remote, std::move(itr->pkt)); if(link->m_RecentlyClosed.find(itr->remote)
== link->m_RecentlyClosed.end())
{
link->RecvFrom(itr->remote, std::move(itr->pkt));
}
++itr; ++itr;
} }
link->Pump(); link->Pump();

@ -52,7 +52,7 @@ namespace llarp
struct ILinkLayer struct ILinkLayer
{ {
ILinkLayer(std::shared_ptr<KeyManager> keyManager, GetRCFunc getrc, ILinkLayer(std::shared_ptr< KeyManager > keyManager, GetRCFunc getrc,
LinkMessageHandler handler, SignBufferFunc signFunc, LinkMessageHandler handler, SignBufferFunc signFunc,
SessionEstablishedHandler sessionEstablish, SessionEstablishedHandler sessionEstablish,
SessionRenegotiateHandler renegotiate, TimeoutHandler timeout, SessionRenegotiateHandler renegotiate, TimeoutHandler timeout,
@ -179,7 +179,7 @@ namespace llarp
SessionClosedHandler SessionClosed; SessionClosedHandler SessionClosed;
SessionRenegotiateHandler SessionRenegotiate; SessionRenegotiateHandler SessionRenegotiate;
PumpDoneHandler PumpDone; PumpDoneHandler PumpDone;
std::shared_ptr<KeyManager> keyManager; std::shared_ptr< KeyManager > keyManager;
std::shared_ptr< Logic > std::shared_ptr< Logic >
logic() logic()
@ -256,10 +256,8 @@ namespace llarp
ACQUIRED_AFTER(m_AuthedLinksMutex)); ACQUIRED_AFTER(m_AuthedLinksMutex));
Pending m_Pending GUARDED_BY(m_PendingMutex); Pending m_Pending GUARDED_BY(m_PendingMutex);
using TrafficEvent_t = std::pair< Addr, ILinkSession::Packet_t >; std::unordered_map< llarp::Addr, llarp_time_t, llarp::Addr::Hash >
using TrafficQueue_t = std::vector< TrafficEvent_t >; m_RecentlyClosed;
std::shared_ptr< TrafficQueue_t > m_Recv;
}; };
using LinkLayer_ptr = std::shared_ptr< ILinkLayer >; using LinkLayer_ptr = std::shared_ptr< ILinkLayer >;

Loading…
Cancel
Save