/* * Copyright (c) 2013-2021, The PurpleI2P Project * * This file is part of Purple i2pd project and licensed under BSD3 * * See full license text in LICENSE file at top of project tree */ #include #include "Log.h" #include "Timestamp.h" #include "RouterContext.h" #include "NetDb.hpp" #include "SSU.h" #include "util.h" #ifdef _WIN32 #include #endif namespace i2p { namespace transport { SSUServer::SSUServer (int port): m_IsRunning(false), m_Thread (nullptr), m_ReceiversThread (nullptr), m_ReceiversThreadV6 (nullptr), m_Work (m_Service), m_ReceiversWork (m_ReceiversService), m_ReceiversWorkV6 (m_ReceiversServiceV6), m_Endpoint (boost::asio::ip::udp::v4 (), port), m_EndpointV6 (boost::asio::ip::udp::v6 (), port), m_Socket (m_ReceiversService), m_SocketV6 (m_ReceiversServiceV6), m_IntroducersUpdateTimer (m_Service), m_PeerTestsCleanupTimer (m_Service), m_TerminationTimer (m_Service), m_TerminationTimerV6 (m_Service) { } SSUServer::~SSUServer () { } void SSUServer::OpenSocket () { try { m_Socket.open (boost::asio::ip::udp::v4()); m_Socket.set_option (boost::asio::socket_base::receive_buffer_size (SSU_SOCKET_RECEIVE_BUFFER_SIZE)); m_Socket.set_option (boost::asio::socket_base::send_buffer_size (SSU_SOCKET_SEND_BUFFER_SIZE)); m_Socket.bind (m_Endpoint); LogPrint (eLogInfo, "SSU: Start listening v4 port ", m_Endpoint.port()); } catch ( std::exception & ex ) { LogPrint (eLogError, "SSU: failed to bind to v4 port ", m_Endpoint.port(), ": ", ex.what()); ThrowFatal ("Unable to start IPv4 SSU transport at port ", m_Endpoint.port(), ": ", ex.what ()); } } void SSUServer::OpenSocketV6 () { try { m_SocketV6.open (boost::asio::ip::udp::v6()); m_SocketV6.set_option (boost::asio::ip::v6_only (true)); m_SocketV6.set_option (boost::asio::socket_base::receive_buffer_size (SSU_SOCKET_RECEIVE_BUFFER_SIZE)); m_SocketV6.set_option (boost::asio::socket_base::send_buffer_size (SSU_SOCKET_SEND_BUFFER_SIZE)); m_SocketV6.bind (m_EndpointV6); LogPrint (eLogInfo, "SSU: Start listening v6 port ", m_EndpointV6.port()); } catch ( std::exception & ex ) { LogPrint (eLogError, "SSU: failed to bind to v6 port ", m_EndpointV6.port(), ": ", ex.what()); ThrowFatal ("Unable to start IPv6 SSU transport at port ", m_Endpoint.port(), ": ", ex.what ()); } } void SSUServer::Start () { m_IsRunning = true; m_Thread = new std::thread (std::bind (&SSUServer::Run, this)); if (context.SupportsV4 ()) { OpenSocket (); m_ReceiversThread = new std::thread (std::bind (&SSUServer::RunReceivers, this)); m_ReceiversService.post (std::bind (&SSUServer::Receive, this)); ScheduleTermination (); } if (context.SupportsV6 ()) { OpenSocketV6 (); m_ReceiversThreadV6 = new std::thread (std::bind (&SSUServer::RunReceiversV6, this)); m_ReceiversServiceV6.post (std::bind (&SSUServer::ReceiveV6, this)); ScheduleTerminationV6 (); } SchedulePeerTestsCleanupTimer (); ScheduleIntroducersUpdateTimer (); // wait for 30 seconds and decide if we need introducers } void SSUServer::Stop () { DeleteAllSessions (); m_IsRunning = false; m_TerminationTimer.cancel (); m_TerminationTimerV6.cancel (); m_Service.stop (); m_Socket.close (); m_SocketV6.close (); m_ReceiversService.stop (); m_ReceiversServiceV6.stop (); if (m_ReceiversThread) { m_ReceiversThread->join (); delete m_ReceiversThread; m_ReceiversThread = nullptr; } if (m_ReceiversThreadV6) { m_ReceiversThreadV6->join (); delete m_ReceiversThreadV6; m_ReceiversThreadV6 = nullptr; } if (m_Thread) { m_Thread->join (); delete m_Thread; m_Thread = nullptr; } } void SSUServer::Run () { i2p::util::SetThreadName("SSU"); while (m_IsRunning) { try { m_Service.run (); } catch (std::exception& ex) { LogPrint (eLogError, "SSU: server runtime exception: ", ex.what ()); } } } void SSUServer::RunReceivers () { i2p::util::SetThreadName("SSUv4"); while (m_IsRunning) { try { m_ReceiversService.run (); } catch (std::exception& ex) { LogPrint (eLogError, "SSU: receivers runtime exception: ", ex.what ()); if (m_IsRunning) { // restart socket m_Socket.close (); OpenSocket (); Receive (); } } } } void SSUServer::RunReceiversV6 () { i2p::util::SetThreadName("SSUv6"); while (m_IsRunning) { try { m_ReceiversServiceV6.run (); } catch (std::exception& ex) { LogPrint (eLogError, "SSU: v6 receivers runtime exception: ", ex.what ()); if (m_IsRunning) { m_SocketV6.close (); OpenSocketV6 (); ReceiveV6 (); } } } } void SSUServer::SetLocalAddress (const boost::asio::ip::address& localAddress) { if (localAddress.is_v6 ()) m_EndpointV6.address (localAddress); else if (localAddress.is_v4 ()) m_Endpoint.address (localAddress); } void SSUServer::AddRelay (uint32_t tag, std::shared_ptr relay) { m_Relays[tag] = relay; } void SSUServer::RemoveRelay (uint32_t tag) { m_Relays.erase (tag); } std::shared_ptr SSUServer::FindRelaySession (uint32_t tag) { auto it = m_Relays.find (tag); if (it != m_Relays.end ()) { if (it->second->GetState () == eSessionStateEstablished) return it->second; else m_Relays.erase (it); } return nullptr; } void SSUServer::Send (const uint8_t * buf, size_t len, const boost::asio::ip::udp::endpoint& to) { boost::system::error_code ec; if (to.protocol () == boost::asio::ip::udp::v4()) m_Socket.send_to (boost::asio::buffer (buf, len), to, 0, ec); else m_SocketV6.send_to (boost::asio::buffer (buf, len), to, 0, ec); if (ec) { LogPrint (eLogError, "SSU: send exception: ", ec.message (), " while trying to send data to ", to.address (), ":", to.port (), " (length: ", len, ")"); } } void SSUServer::Receive () { SSUPacket * packet = new SSUPacket (); m_Socket.async_receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from, std::bind (&SSUServer::HandleReceivedFrom, this, std::placeholders::_1, std::placeholders::_2, packet)); } void SSUServer::ReceiveV6 () { SSUPacket * packet = new SSUPacket (); m_SocketV6.async_receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from, std::bind (&SSUServer::HandleReceivedFromV6, this, std::placeholders::_1, std::placeholders::_2, packet)); } void SSUServer::HandleReceivedFrom (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet) { if (!ecode || ecode == boost::asio::error::connection_refused || ecode == boost::asio::error::connection_reset || ecode == boost::asio::error::network_unreachable || ecode == boost::asio::error::host_unreachable #ifdef _WIN32 // windows can throw WinAPI error, which is not handled by ASIO || ecode.value() == boost::winapi::ERROR_CONNECTION_REFUSED_ || ecode.value() == boost::winapi::ERROR_NETWORK_UNREACHABLE_ || ecode.value() == boost::winapi::ERROR_HOST_UNREACHABLE_ #endif ) // just try continue reading when received ICMP response otherwise socket can crash, // but better to find out which host were sent it and mark that router as unreachable { packet->len = bytes_transferred; std::vector packets; packets.push_back (packet); boost::system::error_code ec; size_t moreBytes = m_Socket.available(ec); if (!ec) { while (moreBytes && packets.size () < 25) { packet = new SSUPacket (); packet->len = m_Socket.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V4), packet->from, 0, ec); if (!ec) { packets.push_back (packet); moreBytes = m_Socket.available(ec); if (ec) break; } else { LogPrint (eLogError, "SSU: receive_from error: code ", ec.value(), ": ", ec.message ()); delete packet; break; } } } m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, &m_Sessions)); Receive (); } else { delete packet; if (ecode != boost::asio::error::operation_aborted) { LogPrint (eLogError, "SSU: receive error: code ", ecode.value(), ": ", ecode.message ()); m_Socket.close (); OpenSocket (); Receive (); } } } void SSUServer::HandleReceivedFromV6 (const boost::system::error_code& ecode, std::size_t bytes_transferred, SSUPacket * packet) { if (!ecode || ecode == boost::asio::error::connection_refused || ecode == boost::asio::error::connection_reset || ecode == boost::asio::error::network_unreachable || ecode == boost::asio::error::host_unreachable #ifdef _WIN32 // windows can throw WinAPI error, which is not handled by ASIO || ecode.value() == boost::winapi::ERROR_CONNECTION_REFUSED_ || ecode.value() == boost::winapi::ERROR_NETWORK_UNREACHABLE_ || ecode.value() == boost::winapi::ERROR_HOST_UNREACHABLE_ #endif ) // just try continue reading when received ICMP response otherwise socket can crash, // but better to find out which host were sent it and mark that router as unreachable { packet->len = bytes_transferred; std::vector packets; packets.push_back (packet); boost::system::error_code ec; size_t moreBytes = m_SocketV6.available (ec); if (!ec) { while (moreBytes && packets.size () < 25) { packet = new SSUPacket (); packet->len = m_SocketV6.receive_from (boost::asio::buffer (packet->buf, SSU_MTU_V6), packet->from, 0, ec); if (!ec) { packets.push_back (packet); moreBytes = m_SocketV6.available(ec); if (ec) break; } else { LogPrint (eLogError, "SSU: v6 receive_from error: code ", ec.value(), ": ", ec.message ()); delete packet; break; } } } m_Service.post (std::bind (&SSUServer::HandleReceivedPackets, this, packets, &m_SessionsV6)); ReceiveV6 (); } else { delete packet; if (ecode != boost::asio::error::operation_aborted) { LogPrint (eLogError, "SSU: v6 receive error: code ", ecode.value(), ": ", ecode.message ()); m_SocketV6.close (); OpenSocketV6 (); ReceiveV6 (); } } } void SSUServer::HandleReceivedPackets (std::vector packets, std::map > * sessions) { if (!m_IsRunning) return; std::shared_ptr session; for (auto& packet: packets) { try { if (!session || session->GetRemoteEndpoint () != packet->from) // we received packet for other session than previous { if (session) { session->FlushData (); session = nullptr; } auto it = sessions->find (packet->from); if (it != sessions->end ()) session = it->second; if (!session && packet->len > 0) { session = std::make_shared (*this, packet->from); session->WaitForConnect (); (*sessions)[packet->from] = session; LogPrint (eLogDebug, "SSU: new session from ", packet->from.address ().to_string (), ":", packet->from.port (), " created"); } } if (session) session->ProcessNextMessage (packet->buf, packet->len, packet->from); } catch (std::exception& ex) { LogPrint (eLogError, "SSU: HandleReceivedPackets ", ex.what ()); if (session) session->FlushData (); session = nullptr; } delete packet; } if (session) session->FlushData (); } std::shared_ptr SSUServer::FindSession (std::shared_ptr router) const { if (!router) return nullptr; auto address = router->GetSSUAddress (true); // v4 only if (!address) return nullptr; auto session = FindSession (boost::asio::ip::udp::endpoint (address->host, address->port)); if (session || !context.SupportsV6 ()) return session; // try v6 address = router->GetSSUV6Address (); if (!address) return nullptr; return FindSession (boost::asio::ip::udp::endpoint (address->host, address->port)); } std::shared_ptr SSUServer::FindSession (const boost::asio::ip::udp::endpoint& e) const { auto& sessions = e.address ().is_v6 () ? m_SessionsV6 : m_Sessions; auto it = sessions.find (e); if (it != sessions.end ()) return it->second; else return nullptr; } void SSUServer::CreateSession (std::shared_ptr router, bool peerTest, bool v4only) { auto address = router->GetSSUAddress (v4only || !context.SupportsV6 ()); if (address) CreateSession (router, address, peerTest); else LogPrint (eLogWarning, "SSU: Router ", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()), " doesn't have SSU address"); } void SSUServer::CreateSession (std::shared_ptr router, std::shared_ptr address, bool peerTest) { if (router && address) { if (address->UsesIntroducer ()) m_Service.post (std::bind (&SSUServer::CreateSessionThroughIntroducer, this, router, address, peerTest)); // always V4 thread else { boost::asio::ip::udp::endpoint remoteEndpoint (address->host, address->port); m_Service.post (std::bind (&SSUServer::CreateDirectSession, this, router, remoteEndpoint, peerTest)); } } } void SSUServer::CreateDirectSession (std::shared_ptr router, boost::asio::ip::udp::endpoint remoteEndpoint, bool peerTest) { auto& sessions = remoteEndpoint.address ().is_v6 () ? m_SessionsV6 : m_Sessions; auto it = sessions.find (remoteEndpoint); if (it != sessions.end ()) { auto session = it->second; if (peerTest && session->GetState () == eSessionStateEstablished) session->SendPeerTest (); } else { // otherwise create new session auto session = std::make_shared (*this, remoteEndpoint, router, peerTest); sessions[remoteEndpoint] = session; // connect LogPrint (eLogDebug, "SSU: Creating new session to [", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()), "] ", remoteEndpoint.address ().to_string (), ":", remoteEndpoint.port ()); session->Connect (); } } void SSUServer::CreateSessionThroughIntroducer (std::shared_ptr router, std::shared_ptr address, bool peerTest) { if (router && address && address->UsesIntroducer ()) { if (address->IsV4 () && !i2p::context.SupportsV4 ()) return; if (address->IsV6 () && !i2p::context.SupportsV6 ()) return; if (!address->host.is_unspecified () && address->port) { // we rarely come here auto& sessions = address->host.is_v6 () ? m_SessionsV6 : m_Sessions; boost::asio::ip::udp::endpoint remoteEndpoint (address->host, address->port); auto it = sessions.find (remoteEndpoint); // check if session is presented already if (it != sessions.end ()) { auto session = it->second; if (peerTest && session->GetState () == eSessionStateEstablished) session->SendPeerTest (); return; } } // create new session int numIntroducers = address->ssu->introducers.size (); if (numIntroducers > 0) { uint32_t ts = i2p::util::GetSecondsSinceEpoch (); std::shared_ptr introducerSession; const i2p::data::RouterInfo::Introducer * introducer = nullptr; // we might have a session to introducer already for (int i = 0; i < numIntroducers; i++) { auto intr = &(address->ssu->introducers[i]); if (intr->iExp > 0 && ts > intr->iExp) continue; // skip expired introducer boost::asio::ip::udp::endpoint ep (intr->iHost, intr->iPort); if (ep.address ().is_v4 () && address->IsV4 ()) // ipv4 { if (!introducer) introducer = intr; auto it = m_Sessions.find (ep); if (it != m_Sessions.end ()) { introducerSession = it->second; break; } } if (ep.address ().is_v6 () && address->IsV6 ()) // ipv6 { if (!introducer) introducer = intr; auto it = m_SessionsV6.find (ep); if (it != m_SessionsV6.end ()) { introducerSession = it->second; break; } } } if (!introducer) { LogPrint (eLogWarning, "SSU: Can't connect to unreachable router and no compatibe non-expired introducers presented"); return; } if (introducerSession) // session found LogPrint (eLogWarning, "SSU: Session to introducer already exists"); else // create new { LogPrint (eLogDebug, "SSU: Creating new session to introducer ", introducer->iHost); boost::asio::ip::udp::endpoint introducerEndpoint (introducer->iHost, introducer->iPort); introducerSession = std::make_shared (*this, introducerEndpoint, router); if (introducerEndpoint.address ().is_v4 ()) m_Sessions[introducerEndpoint] = introducerSession; else if (introducerEndpoint.address ().is_v6 ()) m_SessionsV6[introducerEndpoint] = introducerSession; } if (!address->host.is_unspecified () && address->port) { // create session boost::asio::ip::udp::endpoint remoteEndpoint (address->host, address->port); auto session = std::make_shared (*this, remoteEndpoint, router, peerTest); if (address->host.is_v4 ()) m_Sessions[remoteEndpoint] = session; else if (address->host.is_v6 ()) m_SessionsV6[remoteEndpoint] = session; // introduce LogPrint (eLogInfo, "SSU: Introduce new session to [", i2p::data::GetIdentHashAbbreviation (router->GetIdentHash ()), "] through introducer ", introducer->iHost, ":", introducer->iPort); session->WaitForIntroduction (); if (i2p::context.GetRouterInfo ().HasUnreachableCap ()) // if we are unreachable. TODO: ipv4 and ipv6 { uint8_t buf[1]; Send (buf, 0, remoteEndpoint); // send HolePunch } } introducerSession->Introduce (*introducer, router); } else LogPrint (eLogWarning, "SSU: Can't connect to unreachable router and no introducers present"); } } void SSUServer::DeleteSession (std::shared_ptr session) { if (session) { session->Close (); auto& ep = session->GetRemoteEndpoint (); if (ep.address ().is_v6 ()) m_SessionsV6.erase (ep); else m_Sessions.erase (ep); } } void SSUServer::DeleteAllSessions () { for (auto& it: m_Sessions) it.second->Close (); m_Sessions.clear (); for (auto& it: m_SessionsV6) it.second->Close (); m_SessionsV6.clear (); } template std::shared_ptr SSUServer::GetRandomV4Session (Filter filter) // v4 only { std::vector > filteredSessions; for (const auto& s :m_Sessions) if (filter (s.second)) filteredSessions.push_back (s.second); if (filteredSessions.size () > 0) { auto ind = rand () % filteredSessions.size (); return filteredSessions[ind]; } return nullptr; } std::shared_ptr SSUServer::GetRandomEstablishedV4Session (std::shared_ptr excluded) // v4 only { return GetRandomV4Session ( [excluded](std::shared_ptr session)->bool { return session->GetState () == eSessionStateEstablished && session != excluded; } ); } template std::shared_ptr SSUServer::GetRandomV6Session (Filter filter) // v6 only { std::vector > filteredSessions; for (const auto& s :m_SessionsV6) if (filter (s.second)) filteredSessions.push_back (s.second); if (filteredSessions.size () > 0) { auto ind = rand () % filteredSessions.size (); return filteredSessions[ind]; } return nullptr; } std::shared_ptr SSUServer::GetRandomEstablishedV6Session (std::shared_ptr excluded) // v6 only { return GetRandomV6Session ( [excluded](std::shared_ptr session)->bool { return session->GetState () == eSessionStateEstablished && session != excluded; } ); } std::set SSUServer::FindIntroducers (int maxNumIntroducers) { uint32_t ts = i2p::util::GetSecondsSinceEpoch (); std::set ret; for (int i = 0; i < maxNumIntroducers; i++) { auto session = GetRandomV4Session ( [&ret, ts](std::shared_ptr session)->bool { return session->GetRelayTag () && !ret.count (session.get ()) && session->GetState () == eSessionStateEstablished && ts < session->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_DURATION; } ); if (session) { ret.insert (session.get ()); break; } } return ret; } void SSUServer::RescheduleIntroducersUpdateTimer () { m_IntroducersUpdateTimer.cancel (); m_IntroducersUpdateTimer.expires_from_now (boost::posix_time::seconds(SSU_KEEP_ALIVE_INTERVAL/2)); m_IntroducersUpdateTimer.async_wait (std::bind (&SSUServer::HandleIntroducersUpdateTimer, this, std::placeholders::_1)); } void SSUServer::ScheduleIntroducersUpdateTimer () { m_IntroducersUpdateTimer.expires_from_now (boost::posix_time::seconds(SSU_KEEP_ALIVE_INTERVAL)); m_IntroducersUpdateTimer.async_wait (std::bind (&SSUServer::HandleIntroducersUpdateTimer, this, std::placeholders::_1)); } void SSUServer::HandleIntroducersUpdateTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) { // timeout expired if (i2p::context.GetStatus () == eRouterStatusTesting) { // we still don't know if we need introducers ScheduleIntroducersUpdateTimer (); return; } if (i2p::context.GetStatus () != eRouterStatusFirewalled) { // we don't need introducers m_Introducers.clear (); return; } // we are firewalled if (!i2p::context.IsUnreachable ()) i2p::context.SetUnreachable (true, false); // ipv4 std::list newList; size_t numIntroducers = 0; uint32_t ts = i2p::util::GetSecondsSinceEpoch (); for (const auto& it : m_Introducers) { auto session = FindSession (it); if (session && ts < session->GetCreationTime () + SSU_TO_INTRODUCER_SESSION_DURATION) { session->SendKeepAlive (); newList.push_back (it); numIntroducers++; } else i2p::context.RemoveIntroducer (it); } if (numIntroducers < SSU_MAX_NUM_INTRODUCERS) { // create new auto introducers = FindIntroducers (SSU_MAX_NUM_INTRODUCERS); for (const auto& it1: introducers) { const auto& ep = it1->GetRemoteEndpoint (); i2p::data::RouterInfo::Introducer introducer; introducer.iHost = ep.address (); introducer.iPort = ep.port (); introducer.iTag = it1->GetRelayTag (); introducer.iKey = it1->GetIntroKey (); if (i2p::context.AddIntroducer (introducer)) { newList.push_back (ep); if (newList.size () >= SSU_MAX_NUM_INTRODUCERS) break; } } } m_Introducers = newList; if (m_Introducers.size () < SSU_MAX_NUM_INTRODUCERS) { std::set > requested; for (auto i = m_Introducers.size (); i < SSU_MAX_NUM_INTRODUCERS; i++) { auto introducer = i2p::data::netdb.GetRandomIntroducer (); if (introducer && !requested.count (introducer)) // not requested already { auto address = introducer->GetSSUAddress (true); // v4 if (address && !address->host.is_unspecified ()) { boost::asio::ip::udp::endpoint ep (address->host, address->port); if (std::find (m_Introducers.begin (), m_Introducers.end (), ep) == m_Introducers.end ()) // not connected yet { CreateDirectSession (introducer, ep, false); requested.insert (introducer); } } } } } ScheduleIntroducersUpdateTimer (); } } void SSUServer::NewPeerTest (uint32_t nonce, PeerTestParticipant role, std::shared_ptr session) { m_PeerTests[nonce] = { i2p::util::GetMillisecondsSinceEpoch (), role, session }; } PeerTestParticipant SSUServer::GetPeerTestParticipant (uint32_t nonce) { auto it = m_PeerTests.find (nonce); if (it != m_PeerTests.end ()) return it->second.role; else return ePeerTestParticipantUnknown; } std::shared_ptr SSUServer::GetPeerTestSession (uint32_t nonce) { auto it = m_PeerTests.find (nonce); if (it != m_PeerTests.end ()) return it->second.session; else return nullptr; } void SSUServer::UpdatePeerTest (uint32_t nonce, PeerTestParticipant role) { auto it = m_PeerTests.find (nonce); if (it != m_PeerTests.end ()) it->second.role = role; } void SSUServer::RemovePeerTest (uint32_t nonce) { m_PeerTests.erase (nonce); } void SSUServer::SchedulePeerTestsCleanupTimer () { m_PeerTestsCleanupTimer.expires_from_now (boost::posix_time::seconds(SSU_PEER_TEST_TIMEOUT)); m_PeerTestsCleanupTimer.async_wait (std::bind (&SSUServer::HandlePeerTestsCleanupTimer, this, std::placeholders::_1)); } void SSUServer::HandlePeerTestsCleanupTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) { int numDeleted = 0; uint64_t ts = i2p::util::GetMillisecondsSinceEpoch (); for (auto it = m_PeerTests.begin (); it != m_PeerTests.end ();) { if (ts > it->second.creationTime + SSU_PEER_TEST_TIMEOUT*1000LL) { numDeleted++; it = m_PeerTests.erase (it); } else ++it; } if (numDeleted > 0) LogPrint (eLogDebug, "SSU: ", numDeleted, " peer tests have been expired"); SchedulePeerTestsCleanupTimer (); } } void SSUServer::ScheduleTermination () { m_TerminationTimer.expires_from_now (boost::posix_time::seconds(SSU_TERMINATION_CHECK_TIMEOUT)); m_TerminationTimer.async_wait (std::bind (&SSUServer::HandleTerminationTimer, this, std::placeholders::_1)); } void SSUServer::HandleTerminationTimer (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) { auto ts = i2p::util::GetSecondsSinceEpoch (); for (auto& it: m_Sessions) if (it.second->IsTerminationTimeoutExpired (ts)) { auto session = it.second; if (it.first != session->GetRemoteEndpoint ()) LogPrint (eLogWarning, "SSU: remote endpoint ", session->GetRemoteEndpoint (), " doesn't match key ", it.first, " adjusted"); m_Service.post ([session] { LogPrint (eLogWarning, "SSU: no activity with ", session->GetRemoteEndpoint (), " for ", session->GetTerminationTimeout (), " seconds"); session->Failed (); }); } ScheduleTermination (); } } void SSUServer::ScheduleTerminationV6 () { m_TerminationTimerV6.expires_from_now (boost::posix_time::seconds(SSU_TERMINATION_CHECK_TIMEOUT)); m_TerminationTimerV6.async_wait (std::bind (&SSUServer::HandleTerminationTimerV6, this, std::placeholders::_1)); } void SSUServer::HandleTerminationTimerV6 (const boost::system::error_code& ecode) { if (ecode != boost::asio::error::operation_aborted) { auto ts = i2p::util::GetSecondsSinceEpoch (); for (auto& it: m_SessionsV6) if (it.second->IsTerminationTimeoutExpired (ts)) { auto session = it.second; if (it.first != session->GetRemoteEndpoint ()) LogPrint (eLogWarning, "SSU: remote endpoint ", session->GetRemoteEndpoint (), " doesn't match key ", it.first); m_Service.post ([session] { LogPrint (eLogWarning, "SSU: no activity with ", session->GetRemoteEndpoint (), " for ", session->GetTerminationTimeout (), " seconds"); session->Failed (); }); } ScheduleTerminationV6 (); } } } }