diff --git a/include/llarp.hpp b/include/llarp.hpp index 71bbff39c..c723bf0ea 100644 --- a/include/llarp.hpp +++ b/include/llarp.hpp @@ -26,7 +26,7 @@ namespace llarp struct RouterContact; namespace thread { - struct ThreadPool; + class ThreadPool; } namespace metrics diff --git a/llarp/CMakeLists.txt b/llarp/CMakeLists.txt index e1ae657f2..8f9d7f41a 100644 --- a/llarp/CMakeLists.txt +++ b/llarp/CMakeLists.txt @@ -214,6 +214,7 @@ set(LIB_SRC service/async_key_exchange.cpp service/config.cpp service/context.cpp + service/endpoint_state.cpp service/endpoint_util.cpp service/endpoint.cpp service/handler.cpp @@ -226,6 +227,7 @@ set(LIB_SRC service/outbound_context.cpp service/pendingbuffer.cpp service/protocol.cpp + service/router_lookup_job.cpp service/sendcontext.cpp service/session.cpp service/tag_lookup_job.cpp diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index eb1007a99..ec914c6ae 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -552,7 +552,7 @@ namespace llarp } if(m_Exit) { - for(const auto &snode : m_SnodeBlacklist) + for(const auto &snode : SnodeBlacklist()) m_Exit->BlacklistSnode(snode); } return SetupNetworking(); diff --git a/llarp/net/net_int.hpp b/llarp/net/net_int.hpp index fec386de1..5a64093ce 100644 --- a/llarp/net/net_int.hpp +++ b/llarp/net/net_int.hpp @@ -96,9 +96,9 @@ namespace llarp template < typename H > friend H - AbslHashValue(H h, const huint_t< UInt_t >& i) + AbslHashValue(H hash, const huint_t< UInt_t >& i) { - return H::combine(std::move(h), i.h); + return H::combine(std::move(hash), i.h); } using V6Container = std::vector< uint8_t >; diff --git a/llarp/path/pathbuilder.hpp b/llarp/path/pathbuilder.hpp index 98bbc5493..5d47b3801 100644 --- a/llarp/path/pathbuilder.hpp +++ b/llarp/path/pathbuilder.hpp @@ -5,6 +5,7 @@ #include #include +#include namespace llarp { diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 1fdebf37b..77169ec3a 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -26,109 +27,41 @@ namespace llarp { Endpoint::Endpoint(const std::string& name, AbstractRouter* r, Context* parent) - : path::Builder(r, 3, path::default_len) - , context(parent) - , m_Router(r) - , m_Name(name) + : path::Builder(r, 3, path::default_len), context(parent) { - m_Tag.Zero(); + m_state = std::make_unique< EndpointState >(); + m_state->m_Router = r; + m_state->m_Name = name; + m_state->m_Tag.Zero(); } bool Endpoint::SetOption(const std::string& k, const std::string& v) { - if(k == "keyfile") - { - m_Keyfile = v; - } - if(k == "tag") - { - m_Tag = v; - LogInfo("Setting tag to ", v); - } - if(k == "prefetch-tag") - { - m_PrefetchTags.insert(v); - } - if(k == "prefetch-addr") - { - Address addr; - if(addr.FromString(v)) - m_PrefetchAddrs.insert(addr); - } - if(k == "min-latency") - { - auto val = atoi(v.c_str()); - if(val > 0) - m_MinPathLatency = val; - } - if(k == "bundle-rc") - { - m_BundleRC = IsTrueValue(v.c_str()); - } - if(k == "blacklist-snode") - { - RouterID snode; - if(!snode.FromString(v)) - { - LogError(Name(), " invalid snode value: ", v); - return false; - } - const auto result = m_SnodeBlacklist.insert(snode); - if(!result.second) - { - LogError(Name(), " duplicate blacklist-snode: ", snode.ToString()); - return false; - } - LogInfo(Name(), " adding ", snode.ToString(), " to blacklist"); - } - if(k == "on-up") - { - m_OnUp = hooks::ExecShellBackend(v); - if(m_OnUp) - LogInfo(Name(), " added on up script: ", v); - else - LogError(Name(), " failed to add on up script"); - } - if(k == "on-down") - { - m_OnDown = hooks::ExecShellBackend(v); - if(m_OnDown) - LogInfo(Name(), " added on down script: ", v); - else - LogError(Name(), " failed to add on down script"); - } - if(k == "on-ready") - { - m_OnReady = hooks::ExecShellBackend(v); - if(m_OnReady) - LogInfo(Name(), " added on ready script: ", v); - else - LogError(Name(), " failed to add on ready script"); - } - return true; + return m_state->SetOption(k, v, Name()); } llarp_ev_loop_ptr Endpoint::EndpointNetLoop() { - if(m_IsolatedNetLoop) - return m_IsolatedNetLoop; + if(m_state->m_IsolatedNetLoop) + return m_state->m_IsolatedNetLoop; - return m_Router->netloop(); + return Router()->netloop(); } bool Endpoint::NetworkIsIsolated() const { - return m_IsolatedLogic.get() != nullptr && m_IsolatedNetLoop != nullptr; + return m_state->m_IsolatedLogic.get() != nullptr + && m_state->m_IsolatedNetLoop != nullptr; } bool Endpoint::HasPendingPathToService(const Address& addr) const { - return m_PendingServiceLookups.find(addr) - != m_PendingServiceLookups.end(); + return m_state->m_PendingServiceLookups.find(addr) + != m_state->m_PendingServiceLookups.end(); } void @@ -147,25 +80,25 @@ namespace llarp ManualRebuild(1); return; } - m_IntroSet.I.clear(); + introSet().I.clear(); for(auto& intro : I) { - m_IntroSet.I.emplace_back(std::move(intro)); + introSet().I.emplace_back(std::move(intro)); } - if(m_IntroSet.I.size() == 0) + if(introSet().I.size() == 0) { LogWarn("not enough intros to publish introset for ", Name()); if(ShouldBuildMore(now) || forceRebuild) ManualRebuild(1); return; } - m_IntroSet.topic = m_Tag; - if(!m_Identity.SignIntroSet(m_IntroSet, now)) + introSet().topic = m_state->m_Tag; + if(!m_Identity.SignIntroSet(introSet(), now)) { LogWarn("failed to sign introset for endpoint ", Name()); return; } - if(PublishIntroSet(m_Router)) + if(PublishIntroSet(Router())) { LogInfo("(re)publishing introset for endpoint ", Name()); } @@ -179,9 +112,9 @@ namespace llarp Endpoint::IsReady() const { const auto now = Now(); - if(m_IntroSet.I.size() == 0) + if(introSet().I.size() == 0) return false; - if(m_IntroSet.IsExpired(now)) + if(introSet().IsExpired(now)) return false; return true; } @@ -189,7 +122,8 @@ namespace llarp bool Endpoint::HasPendingRouterLookup(const RouterID remote) const { - return m_PendingRouters.find(remote) != m_PendingRouters.end(); + const auto& routers = m_state->m_PendingRouters; + return routers.find(remote) != routers.end(); } bool @@ -197,8 +131,8 @@ namespace llarp llarp::AlignedBuffer< 32 >& addr, bool& snode) const { - auto itr = m_Sessions.find(tag); - if(itr != m_Sessions.end()) + auto itr = Sessions().find(tag); + if(itr != Sessions().end()) { snode = false; addr = itr->second.remote.Addr(); @@ -206,7 +140,7 @@ namespace llarp } else { - for(const auto& item : m_SNodeSessions) + for(const auto& item : m_state->m_SNodeSessions) { if(item.second.second == tag) { @@ -222,7 +156,7 @@ namespace llarp bool Endpoint::IntrosetIsStale() const { - return m_IntroSet.HasExpiredIntros(Now()); + return introSet().HasExpiredIntros(Now()); } util::StatusObject @@ -230,33 +164,7 @@ namespace llarp { auto obj = path::Builder::ExtractStatus(); obj.Put("identity", m_Identity.pub.Addr().ToString()); - - obj.Put("lastPublished", m_LastPublish); - obj.Put("lastPublishAttempt", m_LastPublishAttempt); - obj.Put("introset", m_IntroSet.ExtractStatus()); - - if(!m_Tag.IsZero()) - obj.Put("tag", m_Tag.ToString()); - static auto getSecond = [](const auto& item) -> const auto& - { - return item.second; - }; - obj.PutContainer("deadSessions", m_DeadSessions, getSecond); - obj.PutContainer("remoteSessions", m_RemoteSessions, getSecond); - obj.PutContainer("lookups", m_PendingLookups, getSecond); - obj.PutContainer("snodeSessions", m_SNodeSessions, - [](const auto& item) { return item.second.first; }); - - util::StatusObject sessionObj{}; - - for(const auto& item : m_Sessions) - { - std::string k = item.first.ToHex(); - sessionObj.Put(k, item.second.ExtractStatus()); - } - - obj.Put("converstations", sessionObj); - return obj; + return m_state->ExtractStatus(obj); } void @@ -270,16 +178,16 @@ namespace llarp } // expire snode sessions - EndpointUtil::ExpireSNodeSessions(now, m_SNodeSessions); + EndpointUtil::ExpireSNodeSessions(now, m_state->m_SNodeSessions); // expire pending tx - EndpointUtil::ExpirePendingTx(now, m_PendingLookups); + EndpointUtil::ExpirePendingTx(now, m_state->m_PendingLookups); // expire pending router lookups - EndpointUtil::ExpirePendingRouterLookups(now, m_PendingRouters); + EndpointUtil::ExpirePendingRouterLookups(now, m_state->m_PendingRouters); // prefetch addrs - for(const auto& addr : m_PrefetchAddrs) + for(const auto& addr : m_state->m_PrefetchAddrs) { - if(!EndpointUtil::HasPathToService(addr, m_RemoteSessions)) + if(!EndpointUtil::HasPathToService(addr, m_state->m_RemoteSessions)) { if(!EnsurePathToService( addr, @@ -293,12 +201,14 @@ namespace llarp } #ifdef TESTNET // prefetch tags - for(const auto& tag : m_PrefetchTags) + for(const auto& tag : m_state->m_PrefetchTags) { - auto itr = m_PrefetchedTags.find(tag); - if(itr == m_PrefetchedTags.end()) + auto itr = m_state->m_PrefetchedTags.find(tag); + if(itr == m_state->m_PrefetchedTags.end()) { - itr = m_PrefetchedTags.emplace(tag, CachedTagResult(tag, this)).first; + itr = + m_state->m_PrefetchedTags.emplace(tag, CachedTagResult(tag, this)) + .first; } for(const auto& introset : itr->second.result) { @@ -333,20 +243,21 @@ namespace llarp #endif // deregister dead sessions - EndpointUtil::DeregisterDeadSessions(now, m_DeadSessions); + EndpointUtil::DeregisterDeadSessions(now, m_state->m_DeadSessions); // tick remote sessions - EndpointUtil::TickRemoteSessions(now, m_RemoteSessions, m_DeadSessions); + EndpointUtil::TickRemoteSessions(now, m_state->m_RemoteSessions, + m_state->m_DeadSessions); // expire convotags - EndpointUtil::ExpireConvoSessions(now, m_Sessions); + EndpointUtil::ExpireConvoSessions(now, Sessions()); } bool Endpoint::Stop() { // stop remote sessions - EndpointUtil::StopRemoteSessions(m_RemoteSessions); + EndpointUtil::StopRemoteSessions(m_state->m_RemoteSessions); // stop snode sessions - EndpointUtil::StopSnodeSessions(m_SNodeSessions); + EndpointUtil::StopSnodeSessions(m_state->m_SNodeSessions); if(m_OnDown) m_OnDown->NotifyAsync(NotifyParams()); return path::Builder::Stop(); @@ -355,8 +266,9 @@ namespace llarp uint64_t Endpoint::GenTXID() { - uint64_t txid = randint(); - while(m_PendingLookups.find(txid) != m_PendingLookups.end()) + uint64_t txid = randint(); + const auto& lookups = m_state->m_PendingLookups; + while(lookups.find(txid) != lookups.end()) ++txid; return txid; } @@ -364,31 +276,30 @@ namespace llarp std::string Endpoint::Name() const { - return m_Name + ":" + m_Identity.pub.Name(); + return m_state->m_Name + ":" + m_Identity.pub.Name(); } void Endpoint::PutLookup(IServiceLookup* lookup, uint64_t txid) { - // std::unique_ptr< service::IServiceLookup > ptr(lookup); - // m_PendingLookups.emplace(txid, ptr); - // m_PendingLookups[txid] = std::move(ptr); - m_PendingLookups.emplace(txid, std::unique_ptr< IServiceLookup >(lookup)); + m_state->m_PendingLookups.emplace( + txid, std::unique_ptr< IServiceLookup >(lookup)); } bool Endpoint::HandleGotIntroMessage(dht::GotIntroMessage_constptr msg) { std::set< IntroSet > remote; + auto currentPub = m_state->m_CurrentPublishTX; for(const auto& introset : msg->I) { if(!introset.Verify(Now())) { - if(m_Identity.pub == introset.A && m_CurrentPublishTX == msg->T) + if(m_Identity.pub == introset.A && currentPub == msg->T) IntroSetPublishFail(); return true; } - if(m_Identity.pub == introset.A && m_CurrentPublishTX == msg->T) + if(m_Identity.pub == introset.A && currentPub == msg->T) { LogInfo( "got introset publish confirmation for hidden service endpoint ", @@ -399,15 +310,16 @@ namespace llarp remote.insert(introset); } - auto itr = m_PendingLookups.find(msg->T); - if(itr == m_PendingLookups.end()) + auto& lookups = m_state->m_PendingLookups; + auto itr = lookups.find(msg->T); + if(itr == lookups.end()) { LogWarn("invalid lookup response for hidden service endpoint ", Name(), " txid=", msg->T); return true; } std::unique_ptr< IServiceLookup > lookup = std::move(itr->second); - m_PendingLookups.erase(itr); + lookups.erase(itr); lookup->HandleResponse(remote); return true; } @@ -415,7 +327,7 @@ namespace llarp bool Endpoint::HasInboundConvo(const Address& addr) const { - for(const auto& item : m_Sessions) + for(const auto& item : Sessions()) { if(item.second.remote.Addr() == addr && item.second.inbound) return true; @@ -427,10 +339,10 @@ namespace llarp Endpoint::PutSenderFor(const ConvoTag& tag, const ServiceInfo& info, bool inbound) { - auto itr = m_Sessions.find(tag); - if(itr == m_Sessions.end()) + auto itr = Sessions().find(tag); + if(itr == Sessions().end()) { - itr = m_Sessions.emplace(tag, Session{}).first; + itr = Sessions().emplace(tag, Session{}).first; itr->second.inbound = inbound; itr->second.remote = info; } @@ -440,8 +352,8 @@ namespace llarp bool Endpoint::GetSenderFor(const ConvoTag& tag, ServiceInfo& si) const { - auto itr = m_Sessions.find(tag); - if(itr == m_Sessions.end()) + auto itr = Sessions().find(tag); + if(itr == Sessions().end()) return false; si = itr->second.remote; return true; @@ -450,8 +362,8 @@ namespace llarp void Endpoint::PutIntroFor(const ConvoTag& tag, const Introduction& intro) { - auto itr = m_Sessions.find(tag); - if(itr == m_Sessions.end()) + auto itr = Sessions().find(tag); + if(itr == Sessions().end()) { return; } @@ -462,8 +374,8 @@ namespace llarp bool Endpoint::GetIntroFor(const ConvoTag& tag, Introduction& intro) const { - auto itr = m_Sessions.find(tag); - if(itr == m_Sessions.end()) + auto itr = Sessions().find(tag); + if(itr == Sessions().end()) return false; intro = itr->second.intro; return true; @@ -472,8 +384,8 @@ namespace llarp void Endpoint::PutReplyIntroFor(const ConvoTag& tag, const Introduction& intro) { - auto itr = m_Sessions.find(tag); - if(itr == m_Sessions.end()) + auto itr = Sessions().find(tag); + if(itr == Sessions().end()) { return; } @@ -484,8 +396,8 @@ namespace llarp bool Endpoint::GetReplyIntroFor(const ConvoTag& tag, Introduction& intro) const { - auto itr = m_Sessions.find(tag); - if(itr == m_Sessions.end()) + auto itr = Sessions().find(tag); + if(itr == Sessions().end()) return false; intro = itr->second.replyIntro; return true; @@ -495,15 +407,15 @@ namespace llarp Endpoint::GetConvoTagsForService(const Address& addr, std::set< ConvoTag >& tags) const { - return EndpointUtil::GetConvoTagsForService(m_Sessions, addr, tags); + return EndpointUtil::GetConvoTagsForService(Sessions(), addr, tags); } bool Endpoint::GetCachedSessionKeyFor(const ConvoTag& tag, SharedSecret& secret) const { - auto itr = m_Sessions.find(tag); - if(itr == m_Sessions.end()) + auto itr = Sessions().find(tag); + if(itr == Sessions().end()) return false; secret = itr->second.sharedKey; return true; @@ -512,10 +424,10 @@ namespace llarp void Endpoint::PutCachedSessionKeyFor(const ConvoTag& tag, const SharedSecret& k) { - auto itr = m_Sessions.find(tag); - if(itr == m_Sessions.end()) + auto itr = Sessions().find(tag); + if(itr == Sessions().end()) { - itr = m_Sessions.emplace(tag, Session{}).first; + itr = Sessions().emplace(tag, Session{}).first; } itr->second.sharedKey = k; itr->second.lastUsed = Now(); @@ -524,11 +436,12 @@ namespace llarp bool Endpoint::LoadKeyFile() { - if(!m_Keyfile.empty()) + const auto& keyfile = m_state->m_Keyfile; + if(!keyfile.empty()) { - if(!m_Identity.EnsureKeys(m_Keyfile)) + if(!m_Identity.EnsureKeys(keyfile)) { - LogError("Can't ensure keyfile [", m_Keyfile, "]"); + LogError("Can't ensure keyfile [", keyfile, "]"); return false; } } @@ -548,10 +461,10 @@ namespace llarp m_DataHandler = this; } // this does network isolation - while(m_OnInit.size()) + while(m_state->m_OnInit.size()) { - if(m_OnInit.front()()) - m_OnInit.pop_front(); + if(m_state->m_OnInit.front()()) + m_state->m_OnInit.pop_front(); else { LogWarn("Can't call init of network isolation"); @@ -623,7 +536,7 @@ namespace llarp } else if(NumInStatus(path::ePathEstablished) < 3) { - if(m_IntroSet.HasExpiredIntros(now)) + if(introSet().HasExpiredIntros(now)) ManualRebuild(1); } } @@ -631,10 +544,10 @@ namespace llarp bool Endpoint::PublishIntroSetVia(AbstractRouter* r, path::Path_ptr path) { - auto job = new PublishIntroSetJob(this, GenTXID(), m_IntroSet); + auto job = new PublishIntroSetJob(this, GenTXID(), introSet()); if(job->SendRequestViaPath(path, r)) { - m_LastPublishAttempt = Now(); + m_state->m_LastPublishAttempt = Now(); return true; } return false; @@ -649,9 +562,9 @@ namespace llarp getter(item)->ResetInternalState(); }); }; - resetState(m_RemoteSessions, + resetState(m_state->m_RemoteSessions, [](const auto& item) { return item.second; }); - resetState(m_SNodeSessions, + resetState(m_state->m_SNodeSessions, [](const auto& item) { return item.second.first; }); } @@ -666,22 +579,26 @@ namespace llarp ForEachPath([&](const path::Path_ptr& p) { if(!p->IsReady()) return; - for(const auto& i : m_IntroSet.I) + for(const auto& i : introSet().I) { if(i == p->intro) return; } ++numNotInIntroset; }); - if(m_IntroSet.HasExpiredIntros(now) || numNotInIntroset > 1) - return now - m_LastPublishAttempt >= INTROSET_PUBLISH_RETRY_INTERVAL; - return now - m_LastPublishAttempt >= INTROSET_PUBLISH_INTERVAL; + + auto lastpub = m_state->m_LastPublishAttempt; + if(m_state->m_IntroSet.HasExpiredIntros(now) || numNotInIntroset > 1) + { + return now - lastpub >= INTROSET_PUBLISH_RETRY_INTERVAL; + } + return now - lastpub >= INTROSET_PUBLISH_INTERVAL; } void Endpoint::IntroSetPublished() { - m_LastPublish = Now(); + m_state->m_LastPublish = Now(); LogInfo(Name(), " IntroSet publish confirmed"); if(m_OnReady) m_OnReady->NotifyAsync(NotifyParams()); @@ -691,14 +608,15 @@ namespace llarp void Endpoint::IsolatedNetworkMainLoop() { - m_IsolatedNetLoop = llarp_make_ev_loop(); - m_IsolatedLogic = std::make_shared< llarp::Logic >(); + m_state->m_IsolatedNetLoop = llarp_make_ev_loop(); + m_state->m_IsolatedLogic = std::make_shared< llarp::Logic >(); if(SetupNetworking()) - llarp_ev_loop_run_single_process(m_IsolatedNetLoop, m_IsolatedLogic); + llarp_ev_loop_run_single_process(m_state->m_IsolatedNetLoop, + m_state->m_IsolatedLogic); else { - m_IsolatedNetLoop.reset(); - m_IsolatedLogic.reset(); + m_state->m_IsolatedNetLoop.reset(); + m_state->m_IsolatedLogic.reset(); } } @@ -708,7 +626,7 @@ namespace llarp { std::set< RouterID > exclude = prev; - for(const auto& snode : m_SnodeBlacklist) + for(const auto& snode : SnodeBlacklist()) exclude.insert(snode); return path::Builder::SelectHop(db, exclude, cur, hop, roles); } @@ -716,7 +634,7 @@ namespace llarp bool Endpoint::ShouldBundleRC() const { - return m_BundleRC; + return m_state->m_BundleRC; } void @@ -725,48 +643,52 @@ namespace llarp Address addr; introset.A.CalculateAddress(addr.as_array()); - if(m_RemoteSessions.count(addr) >= MAX_OUTBOUND_CONTEXT_COUNT) + auto& remoteSessions = m_state->m_RemoteSessions; + auto& serviceLookups = m_state->m_PendingServiceLookups; + + if(remoteSessions.count(addr) >= MAX_OUTBOUND_CONTEXT_COUNT) { - auto itr = m_RemoteSessions.find(addr); + auto itr = remoteSessions.find(addr); - auto range = m_PendingServiceLookups.equal_range(addr); + auto range = serviceLookups.equal_range(addr); auto i = range.first; if(i != range.second) { i->second(addr, itr->second.get()); ++i; } - m_PendingServiceLookups.erase(addr); + serviceLookups.erase(addr); return; } - auto it = m_RemoteSessions.emplace( + auto it = remoteSessions.emplace( addr, std::make_shared< OutboundContext >(introset, this)); LogInfo("Created New outbound context for ", addr.ToString()); // inform pending - auto range = m_PendingServiceLookups.equal_range(addr); + auto range = serviceLookups.equal_range(addr); auto itr = range.first; if(itr != range.second) { itr->second(addr, it->second.get()); ++itr; } - m_PendingServiceLookups.erase(addr); + serviceLookups.erase(addr); } void Endpoint::HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg, llarp_async_verify_rc* j) { - auto itr = m_PendingRouters.find(msg->R[0].pubkey); - if(itr != m_PendingRouters.end()) + auto& pendingRouters = m_state->m_PendingRouters; + auto itr = pendingRouters.find(msg->R[0].pubkey); + if(itr != pendingRouters.end()) { if(j->valid) itr->second.InformResult(msg->R); else itr->second.InformResult({}); - m_PendingRouters.erase(itr); + pendingRouters.erase(itr); } delete j; } @@ -777,10 +699,10 @@ namespace llarp if(msg->R.size()) { llarp_async_verify_rc* job = new llarp_async_verify_rc; - job->nodedb = m_Router->nodedb(); - job->cryptoworker = m_Router->threadpool(); - job->diskworker = m_Router->diskworker(); - job->logic = m_Router->logic(); + job->nodedb = Router()->nodedb(); + job->cryptoworker = Router()->threadpool(); + job->diskworker = Router()->diskworker(); + job->logic = Router()->logic(); job->hook = std::bind(&Endpoint::HandleVerifyGotRouter, this, msg, std::placeholders::_1); job->rc = msg->R[0]; @@ -788,13 +710,14 @@ namespace llarp } else { - auto itr = m_PendingRouters.begin(); - while(itr != m_PendingRouters.end()) + auto& routers = m_state->m_PendingRouters; + auto itr = routers.begin(); + while(itr != routers.end()) { if(itr->second.txid == msg->txid) { itr->second.InformResult({}); - itr = m_PendingRouters.erase(itr); + itr = routers.erase(itr); } else ++itr; @@ -808,7 +731,7 @@ namespace llarp { if(router.IsZero()) return; - if(!m_Router->nodedb()->Has(router)) + if(!Router()->nodedb()->Has(router)) { LookupRouterAnon(router, nullptr); } @@ -817,7 +740,8 @@ namespace llarp bool Endpoint::LookupRouterAnon(RouterID router, RouterLookupHandler handler) { - if(m_PendingRouters.find(router) == m_PendingRouters.end()) + auto& routers = m_state->m_PendingRouters; + if(routers.find(router) == routers.end()) { auto path = GetEstablishedPathClosestTo(router); routing::DHTMessage msg; @@ -825,9 +749,9 @@ namespace llarp msg.M.emplace_back( std::make_unique< dht::FindRouterMessage >(txid, router)); - if(path && path->SendRoutingMessage(msg, m_Router)) + if(path && path->SendRoutingMessage(msg, Router())) { - m_PendingRouters.emplace(router, RouterLookupJob(this, handler)); + routers.emplace(router, RouterLookupJob(this, handler)); return true; } } @@ -876,7 +800,7 @@ namespace llarp bool Endpoint::HasPathToSNode(const RouterID ident) const { - auto range = m_SNodeSessions.equal_range(ident); + auto range = m_state->m_SNodeSessions.equal_range(ident); auto itr = range.first; while(itr != range.second) { @@ -894,8 +818,8 @@ namespace llarp { if(msg->proto == eProtocolTrafficV4 || msg->proto == eProtocolTrafficV6) { - util::Lock l(&m_InboundTrafficQueueMutex); - m_InboundTrafficQueue.emplace(msg); + util::Lock l(&m_state->m_InboundTrafficQueueMutex); + m_state->m_InboundTrafficQueue.emplace(msg); return true; } if(msg->proto == eProtocolControl) @@ -910,7 +834,7 @@ namespace llarp void Endpoint::RemoveConvoTag(const ConvoTag& t) { - m_Sessions.erase(t); + Sessions().erase(t); } bool @@ -943,8 +867,8 @@ namespace llarp if(!f.Sign(m_Identity)) return false; { - util::Lock lock(&m_SendQueueMutex); - m_SendQueue.emplace_back( + util::Lock lock(&m_state->m_SendQueueMutex); + m_state->m_SendQueue.emplace_back( std::make_shared< const routing::PathTransferMessage >(f, frame.F), p); @@ -969,18 +893,20 @@ namespace llarp Endpoint::OnLookup(const Address& addr, const IntroSet* introset, const RouterID& endpoint) { - auto now = Now(); + auto now = Now(); + auto& fails = m_state->m_ServiceLookupFails; + auto& lookups = m_state->m_PendingServiceLookups; if(introset == nullptr || introset->IsExpired(now)) { LogError(Name(), " failed to lookup ", addr.ToString(), " from ", endpoint); - m_ServiceLookupFails[endpoint] = m_ServiceLookupFails[endpoint] + 1; + fails[endpoint] = fails[endpoint] + 1; // inform one - auto itr = m_PendingServiceLookups.find(addr); - if(itr != m_PendingServiceLookups.end()) + auto itr = lookups.find(addr); + if(itr != lookups.end()) { itr->second(addr, nullptr); - m_PendingServiceLookups.erase(itr); + lookups.erase(itr); } return false; } @@ -1005,17 +931,23 @@ namespace llarp BuildOne(); return false; } + LogInfo(Name(), " Ensure Path to ", remote.ToString()); + + auto& sessions = m_state->m_RemoteSessions; + { - auto itr = m_RemoteSessions.find(remote); - if(itr != m_RemoteSessions.end()) + auto itr = sessions.find(remote); + if(itr != sessions.end()) { hook(itr->first, itr->second.get()); return true; } } - if(m_PendingServiceLookups.count(remote) >= MaxConcurrentLookups) + auto& lookups = m_state->m_PendingServiceLookups; + + if(lookups.count(remote) >= MaxConcurrentLookups) { LogWarn(Name(), " has too many pending service lookups for ", remote.ToString()); @@ -1028,7 +960,7 @@ namespace llarp LogInfo("doing lookup for ", remote, " via ", path->Endpoint()); if(job->SendRequestViaPath(path, Router())) { - m_PendingServiceLookups.emplace(remote, hook); + lookups.emplace(remote, hook); return true; } LogError("send via path failed"); @@ -1038,8 +970,9 @@ namespace llarp void Endpoint::EnsurePathToSNode(const RouterID snode, SNodeEnsureHook h) { + auto& nodeSessions = m_state->m_SNodeSessions; using namespace std::placeholders; - if(m_SNodeSessions.count(snode) == 0) + if(nodeSessions.count(snode) == 0) { ConvoTag tag; // TODO: check for collision lol no we don't but maybe we will... @@ -1051,11 +984,12 @@ namespace llarp /// TODO: V6 return HandleInboundPacket(tag, pkt, eProtocolTrafficV4); }, - m_Router, m_NumPaths, numHops, false, ShouldBundleRC()); - m_SNodeSessions.emplace(snode, std::make_pair(session, tag)); + Router(), m_NumPaths, numHops, false, ShouldBundleRC()); + + m_state->m_SNodeSessions.emplace(snode, std::make_pair(session, tag)); } EnsureRouterIsKnown(snode); - auto range = m_SNodeSessions.equal_range(snode); + auto range = nodeSessions.equal_range(snode); auto itr = range.first; while(itr != range.second) { @@ -1086,42 +1020,48 @@ namespace llarp void Endpoint::Pump(llarp_time_t) { + const auto& sessions = m_state->m_SNodeSessions; + auto& queue = m_state->m_InboundTrafficQueue; EndpointLogic()->queue_func([&]() { // send downstream packets to user for snode - for(const auto& item : m_SNodeSessions) + for(const auto& item : sessions) item.second.first->FlushDownstream(); // send downstream traffic to user for hidden service - util::Lock lock(&m_InboundTrafficQueueMutex); - while(m_InboundTrafficQueue.size()) + util::Lock lock(&m_state->m_InboundTrafficQueueMutex); + while(queue.size()) { - const auto& msg = m_InboundTrafficQueue.top(); + const auto& msg = queue.top(); llarp_buffer_t buf(msg->payload); HandleInboundPacket(msg->tag, buf, msg->proto); - m_InboundTrafficQueue.pop(); + queue.pop(); } }); auto router = Router(); // TODO: locking on this container - for(const auto& item : m_RemoteSessions) + for(const auto& item : m_state->m_RemoteSessions) item.second->FlushUpstream(); // TODO: locking on this container - for(const auto& item : m_SNodeSessions) + for(const auto& item : sessions) item.second.first->FlushUpstream(); - util::Lock lock(&m_SendQueueMutex); + util::Lock lock(&m_state->m_SendQueueMutex); // send outbound traffic - for(const auto& item : m_SendQueue) + for(const auto& item : m_state->m_SendQueue) item.second->SendRoutingMessage(*item.first, router); - m_SendQueue.clear(); + m_state->m_SendQueue.clear(); } bool - Endpoint::EnsureConvo(const AlignedBuffer< 32 > addr, bool snode, - ConvoEventListener_ptr ev) + Endpoint::EnsureConvo(ABSL_ATTRIBUTE_UNUSED const AlignedBuffer< 32 > addr, + bool snode, + ABSL_ATTRIBUTE_UNUSED ConvoEventListener_ptr ev) { if(snode) { } + + // TODO: something meaningful + return false; } bool @@ -1195,8 +1135,8 @@ namespace llarp } LogDebug(Name(), " send ", data.sz, " via ", remoteIntro.router); { - util::Lock lock(&m_SendQueueMutex); - m_SendQueue.emplace_back(transfer, p); + util::Lock lock(&m_state->m_SendQueueMutex); + m_state->m_SendQueue.emplace_back(transfer, p); } return true; } @@ -1204,9 +1144,10 @@ namespace llarp } // outbound converstation - if(EndpointUtil::HasPathToService(remote, m_RemoteSessions)) + auto& sessions = m_state->m_RemoteSessions; + if(EndpointUtil::HasPathToService(remote, sessions)) { - auto range = m_RemoteSessions.equal_range(remote); + auto range = sessions.equal_range(remote); auto itr = range.first; while(itr != range.second) { @@ -1218,7 +1159,9 @@ namespace llarp ++itr; } } - m_PendingTraffic[remote].emplace_back(data, t); + + auto& traffic = m_state->m_PendingTraffic; + traffic[remote].emplace_back(data, t); // no converstation return EnsurePathToService( remote, @@ -1226,10 +1169,12 @@ namespace llarp if(c) { c->UpdateIntroSet(true); - for(auto& pending : m_PendingTraffic[r]) + for(auto& pending : traffic[r]) + { c->AsyncEncryptAndSendTo(pending.Buffer(), pending.protocol); + } } - m_PendingTraffic.erase(r); + traffic.erase(r); }, 5000, true); } @@ -1237,14 +1182,14 @@ namespace llarp bool Endpoint::HasConvoTag(const ConvoTag& t) const { - return m_Sessions.find(t) != m_Sessions.end(); + return Sessions().find(t) != Sessions().end(); } uint64_t Endpoint::GetSeqNoForConvo(const ConvoTag& tag) { - auto itr = m_Sessions.find(tag); - if(itr == m_Sessions.end()) + auto itr = Sessions().find(tag); + if(itr == Sessions().end()) return 0; return ++(itr->second.seqno); } @@ -1270,20 +1215,56 @@ namespace llarp std::shared_ptr< Logic > Endpoint::RouterLogic() { - return m_Router->logic(); + return Router()->logic(); } std::shared_ptr< Logic > Endpoint::EndpointLogic() { - return m_IsolatedLogic ? m_IsolatedLogic : m_Router->logic(); + return m_state->m_IsolatedLogic ? m_state->m_IsolatedLogic + : Router()->logic(); } std::shared_ptr< llarp::thread::ThreadPool > Endpoint::CryptoWorker() { - return m_Router->threadpool(); + return Router()->threadpool(); } + AbstractRouter* + Endpoint::Router() + { + return m_state->m_Router; + } + + const std::set< RouterID >& + Endpoint::SnodeBlacklist() const + { + return m_state->m_SnodeBlacklist; + } + + const IntroSet& + Endpoint::introSet() const + { + return m_state->m_IntroSet; + } + + IntroSet& + Endpoint::introSet() + { + return m_state->m_IntroSet; + } + + const ConvoMap& + Endpoint::Sessions() const + { + return m_state->m_Sessions; + } + + ConvoMap& + Endpoint::Sessions() + { + return m_state->m_Sessions; + } } // namespace service } // namespace llarp diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index dad4597e4..d895e900a 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -32,6 +32,7 @@ namespace llarp { struct AsyncKeyExchange; struct Context; + struct EndpointState; struct OutboundContext; struct IConvoEventListener @@ -133,10 +134,7 @@ namespace llarp CryptoWorker(); AbstractRouter* - Router() - { - return m_Router; - } + Router(); virtual bool LoadKeyFile(); @@ -349,6 +347,9 @@ namespace llarp uint64_t GenTXID(); + const std::set< RouterID >& + SnodeBlacklist() const; + protected: bool SendToServiceOrQueue(const service::Address& addr, @@ -413,9 +414,6 @@ namespace llarp return false; } - public: - std::set< RouterID > m_SnodeBlacklist; - protected: IDataHandler* m_DataHandler = nullptr; Identity m_Identity; @@ -427,119 +425,16 @@ namespace llarp private: friend struct EndpointUtil; - struct RouterLookupJob - { - RouterLookupJob(Endpoint* p, RouterLookupHandler h) : handler(h) - { - started = p->Now(); - txid = p->GenTXID(); - } - - RouterLookupHandler handler; - uint64_t txid; - llarp_time_t started; - - bool - IsExpired(llarp_time_t now) const - { - if(now < started) - return false; - return now - started > 30000; - } - - void - InformResult(std::vector< RouterContact > result) - { - if(handler) - handler(result); - } - }; - - using Msg_ptr = std::shared_ptr< const routing::PathTransferMessage >; - using SendEvent_t = std::pair< Msg_ptr, path::Path_ptr >; - using PendingTraffic = - std::unordered_map< Address, PendingBufferQueue, Address::Hash >; - - using ProtocolMessagePtr = std::shared_ptr< ProtocolMessage >; - using RecvPacketQueue_t = - std::priority_queue< ProtocolMessagePtr, - std::vector< ProtocolMessagePtr >, - ComparePtr< ProtocolMessagePtr > >; - - util::Mutex m_InboundTrafficQueueMutex; - /// ordered queue for inbound hidden service traffic - RecvPacketQueue_t m_InboundTrafficQueue - GUARDED_BY(m_InboundTrafficQueueMutex); - - using PendingRouters = - std::unordered_map< RouterID, RouterLookupJob, RouterID::Hash >; - - using PendingLookups = - std::unordered_map< uint64_t, - std::unique_ptr< service::IServiceLookup > >; - - using Sessions = - std::unordered_multimap< Address, std::shared_ptr< OutboundContext >, - Address::Hash >; - - using SNodeSessionValue = - std::pair< std::shared_ptr< exit::BaseSession >, ConvoTag >; - - using SNodeSessions = - std::unordered_multimap< RouterID, SNodeSessionValue, - RouterID::Hash >; + // clang-format off + const IntroSet& introSet() const; + IntroSet& introSet(); using ConvoMap = std::unordered_map< ConvoTag, Session, ConvoTag::Hash >; + const ConvoMap& Sessions() const; + ConvoMap& Sessions(); + // clang-format on - AbstractRouter* m_Router; - std::shared_ptr< Logic > m_IsolatedLogic = nullptr; - llarp_ev_loop_ptr m_IsolatedNetLoop = nullptr; - std::string m_Keyfile; - std::string m_Name; - std::string m_NetNS; - bool m_BundleRC = false; - - util::Mutex m_SendQueueMutex; - std::deque< SendEvent_t > m_SendQueue GUARDED_BY(m_SendQueueMutex); - - PendingTraffic m_PendingTraffic; - - Sessions m_RemoteSessions; - Sessions m_DeadSessions; - - std::set< ConvoTag > m_InboundConvos; - - SNodeSessions m_SNodeSessions; - - std::unordered_multimap< Address, PathEnsureHook, Address::Hash > - m_PendingServiceLookups; - - std::unordered_map< RouterID, uint32_t, RouterID::Hash > - m_ServiceLookupFails; - - PendingRouters m_PendingRouters; - - uint64_t m_CurrentPublishTX = 0; - llarp_time_t m_LastPublish = 0; - llarp_time_t m_LastPublishAttempt = 0; - llarp_time_t m_MinPathLatency = (5 * 1000); - /// our introset - service::IntroSet m_IntroSet; - /// pending remote service lookups by id - PendingLookups m_PendingLookups; - /// prefetch remote address list - std::set< Address > m_PrefetchAddrs; - /// hidden service tag - Tag m_Tag; - /// prefetch descriptors for these hidden service tags - std::set< Tag > m_PrefetchTags; - /// on initialize functions - std::list< std::function< bool(void) > > m_OnInit; - - /// conversations - ConvoMap m_Sessions; - - std::unordered_map< Tag, CachedTagResult, Tag::Hash > m_PrefetchedTags; + std::unique_ptr< EndpointState > m_state; }; using Endpoint_ptr = std::shared_ptr< Endpoint >; diff --git a/llarp/service/endpoint_state.cpp b/llarp/service/endpoint_state.cpp new file mode 100644 index 000000000..867854746 --- /dev/null +++ b/llarp/service/endpoint_state.cpp @@ -0,0 +1,119 @@ +#include + +#include +#include +#include +#include + +namespace llarp +{ + namespace service + { + bool + EndpointState::SetOption(const std::string& k, const std::string& v, + const std::string& name) + { + if(k == "keyfile") + { + m_Keyfile = v; + } + if(k == "tag") + { + m_Tag = v; + LogInfo("Setting tag to ", v); + } + if(k == "prefetch-tag") + { + m_PrefetchTags.insert(v); + } + if(k == "prefetch-addr") + { + Address addr; + if(addr.FromString(v)) + m_PrefetchAddrs.insert(addr); + } + if(k == "min-latency") + { + auto val = atoi(v.c_str()); + if(val > 0) + m_MinPathLatency = val; + } + if(k == "bundle-rc") + { + m_BundleRC = IsTrueValue(v.c_str()); + } + if(k == "blacklist-snode") + { + RouterID snode; + if(!snode.FromString(v)) + { + LogError(name, " invalid snode value: ", v); + return false; + } + const auto result = m_SnodeBlacklist.insert(snode); + if(!result.second) + { + LogError(name, " duplicate blacklist-snode: ", snode.ToString()); + return false; + } + LogInfo(name, " adding ", snode.ToString(), " to blacklist"); + } + if(k == "on-up") + { + m_OnUp = hooks::ExecShellBackend(v); + if(m_OnUp) + LogInfo(name, " added on up script: ", v); + else + LogError(name, " failed to add on up script"); + } + if(k == "on-down") + { + m_OnDown = hooks::ExecShellBackend(v); + if(m_OnDown) + LogInfo(name, " added on down script: ", v); + else + LogError(name, " failed to add on down script"); + } + if(k == "on-ready") + { + m_OnReady = hooks::ExecShellBackend(v); + if(m_OnReady) + LogInfo(name, " added on ready script: ", v); + else + LogError(name, " failed to add on ready script"); + } + return true; + } + + util::StatusObject + EndpointState::ExtractStatus(util::StatusObject& obj) const + { + obj.Put("lastPublished", m_LastPublish); + obj.Put("lastPublishAttempt", m_LastPublishAttempt); + obj.Put("introset", m_IntroSet.ExtractStatus()); + + if(!m_Tag.IsZero()) + obj.Put("tag", m_Tag.ToString()); + static auto getSecond = [](const auto& item) -> const auto& + { + return item.second; + }; + obj.PutContainer("deadSessions", m_DeadSessions, getSecond); + obj.PutContainer("remoteSessions", m_RemoteSessions, getSecond); + obj.PutContainer("lookups", m_PendingLookups, getSecond); + obj.PutContainer("snodeSessions", m_SNodeSessions, + [](const auto& item) { return item.second.first; }); + + util::StatusObject sessionObj{}; + + for(const auto& item : m_Sessions) + { + std::string k = item.first.ToHex(); + sessionObj.Put(k, item.second.ExtractStatus()); + } + + obj.Put("converstations", sessionObj); + return obj; + } + } // namespace service +} // namespace llarp diff --git a/llarp/service/endpoint_state.hpp b/llarp/service/endpoint_state.hpp new file mode 100644 index 000000000..3246b5390 --- /dev/null +++ b/llarp/service/endpoint_state.hpp @@ -0,0 +1,109 @@ +#ifndef LLARP_SERVICE_ENDPOINT_STATE_HPP +#define LLARP_SERVICE_ENDPOINT_STATE_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +struct llarp_ev_loop; +using llarp_ev_loop_ptr = std::shared_ptr< llarp_ev_loop >; + +namespace llarp +{ + // clang-format off + namespace exit { struct BaseSession; } + namespace path { struct Path; using Path_ptr = std::shared_ptr< Path >; } + namespace routing { struct PathTransferMessage; } + // clang-format on + + namespace service + { + struct IServiceLookup; + struct OutboundContext; + + struct EndpointState + { + hooks::Backend_ptr m_OnUp; + hooks::Backend_ptr m_OnDown; + hooks::Backend_ptr m_OnReady; + + util::Mutex m_InboundTrafficQueueMutex; + /// ordered queue for inbound hidden service traffic + RecvPacketQueue_t m_InboundTrafficQueue + GUARDED_BY(m_InboundTrafficQueueMutex); + + std::set< RouterID > m_SnodeBlacklist; + + AbstractRouter* m_Router; + std::shared_ptr< Logic > m_IsolatedLogic = nullptr; + llarp_ev_loop_ptr m_IsolatedNetLoop = nullptr; + std::string m_Keyfile; + std::string m_Name; + std::string m_NetNS; + bool m_BundleRC = false; + + util::Mutex m_SendQueueMutex; + std::deque< SendEvent_t > m_SendQueue GUARDED_BY(m_SendQueueMutex); + + PendingTraffic m_PendingTraffic; + + Sessions m_RemoteSessions; + Sessions m_DeadSessions; + + std::set< ConvoTag > m_InboundConvos; + + SNodeSessions m_SNodeSessions; + + std::unordered_multimap< Address, PathEnsureHook, Address::Hash > + m_PendingServiceLookups; + + std::unordered_map< RouterID, uint32_t, RouterID::Hash > + m_ServiceLookupFails; + + PendingRouters m_PendingRouters; + + uint64_t m_CurrentPublishTX = 0; + llarp_time_t m_LastPublish = 0; + llarp_time_t m_LastPublishAttempt = 0; + llarp_time_t m_MinPathLatency = (5 * 1000); + /// our introset + IntroSet m_IntroSet; + /// pending remote service lookups by id + PendingLookups m_PendingLookups; + /// prefetch remote address list + std::set< Address > m_PrefetchAddrs; + /// hidden service tag + Tag m_Tag; + /// prefetch descriptors for these hidden service tags + std::set< Tag > m_PrefetchTags; + /// on initialize functions + std::list< std::function< bool(void) > > m_OnInit; + + /// conversations + ConvoMap m_Sessions; + + std::unordered_map< Tag, CachedTagResult, Tag::Hash > m_PrefetchedTags; + + bool + SetOption(const std::string& k, const std::string& v, + const std::string& name); + + util::StatusObject + ExtractStatus(util::StatusObject& obj) const; + }; + } // namespace service +} // namespace llarp + +#endif diff --git a/llarp/service/endpoint_types.hpp b/llarp/service/endpoint_types.hpp new file mode 100644 index 000000000..e635c2118 --- /dev/null +++ b/llarp/service/endpoint_types.hpp @@ -0,0 +1,62 @@ +#ifndef LLARP_SERVICE_ENDPOINT_TYPES_HPP +#define LLARP_SERVICE_ENDPOINT_TYPES_HPP + +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace llarp +{ + // clang-format off + namespace exit { struct BaseSession; } + namespace path { struct Path; using Path_ptr = std::shared_ptr< Path >; } + namespace routing { struct PathTransferMessage; } + // clang-format on + + namespace service + { + struct IServiceLookup; + struct OutboundContext; + + using Msg_ptr = std::shared_ptr< const routing::PathTransferMessage >; + using SendEvent_t = std::pair< Msg_ptr, path::Path_ptr >; + using PendingBufferQueue = std::deque< PendingBuffer >; + using PendingTraffic = + std::unordered_map< Address, PendingBufferQueue, Address::Hash >; + + using ProtocolMessagePtr = std::shared_ptr< ProtocolMessage >; + using RecvPacketQueue_t = + std::priority_queue< ProtocolMessagePtr, + std::vector< ProtocolMessagePtr >, + ComparePtr< ProtocolMessagePtr > >; + + using PendingRouters = + std::unordered_map< RouterID, RouterLookupJob, RouterID::Hash >; + + using PendingLookups = + std::unordered_map< uint64_t, std::unique_ptr< IServiceLookup > >; + + using Sessions = + std::unordered_multimap< Address, std::shared_ptr< OutboundContext >, + Address::Hash >; + + using SNodeSessionValue = + std::pair< std::shared_ptr< exit::BaseSession >, ConvoTag >; + + using SNodeSessions = + std::unordered_multimap< RouterID, SNodeSessionValue, RouterID::Hash >; + + using ConvoMap = std::unordered_map< ConvoTag, Session, ConvoTag::Hash >; + + using PathEnsureHook = std::function< void(Address, OutboundContext*) >; + + } // namespace service +} // namespace llarp + +#endif diff --git a/llarp/service/endpoint_util.cpp b/llarp/service/endpoint_util.cpp index 7d8805cf9..058de16f9 100644 --- a/llarp/service/endpoint_util.cpp +++ b/llarp/service/endpoint_util.cpp @@ -1,6 +1,8 @@ #include +#include #include +#include #include namespace llarp @@ -8,8 +10,7 @@ namespace llarp namespace service { void - EndpointUtil::ExpireSNodeSessions(llarp_time_t now, - Endpoint::SNodeSessions& sessions) + EndpointUtil::ExpireSNodeSessions(llarp_time_t now, SNodeSessions& sessions) { auto itr = sessions.begin(); while(itr != sessions.end()) @@ -34,8 +35,7 @@ namespace llarp } void - EndpointUtil::ExpirePendingTx(llarp_time_t now, - Endpoint::PendingLookups& lookups) + EndpointUtil::ExpirePendingTx(llarp_time_t now, PendingLookups& lookups) { for(auto itr = lookups.begin(); itr != lookups.end();) { @@ -54,7 +54,7 @@ namespace llarp void EndpointUtil::ExpirePendingRouterLookups(llarp_time_t now, - Endpoint::PendingRouters& routers) + PendingRouters& routers) { for(auto itr = routers.begin(); itr != routers.end();) { @@ -70,8 +70,7 @@ namespace llarp } void - EndpointUtil::DeregisterDeadSessions(llarp_time_t now, - Endpoint::Sessions& sessions) + EndpointUtil::DeregisterDeadSessions(llarp_time_t now, Sessions& sessions) { auto itr = sessions.begin(); while(itr != sessions.end()) @@ -88,9 +87,8 @@ namespace llarp } void - EndpointUtil::TickRemoteSessions(llarp_time_t now, - Endpoint::Sessions& remoteSessions, - Endpoint::Sessions& deadSessions) + EndpointUtil::TickRemoteSessions(llarp_time_t now, Sessions& remoteSessions, + Sessions& deadSessions) { auto itr = remoteSessions.begin(); while(itr != remoteSessions.end()) @@ -110,8 +108,7 @@ namespace llarp } void - EndpointUtil::ExpireConvoSessions(llarp_time_t now, - Endpoint::ConvoMap& sessions) + EndpointUtil::ExpireConvoSessions(llarp_time_t now, ConvoMap& sessions) { auto itr = sessions.begin(); while(itr != sessions.end()) @@ -126,7 +123,7 @@ namespace llarp } void - EndpointUtil::StopRemoteSessions(Endpoint::Sessions& remoteSessions) + EndpointUtil::StopRemoteSessions(Sessions& remoteSessions) { for(auto& item : remoteSessions) { @@ -135,7 +132,7 @@ namespace llarp } void - EndpointUtil::StopSnodeSessions(Endpoint::SNodeSessions& sessions) + EndpointUtil::StopSnodeSessions(SNodeSessions& sessions) { for(auto& item : sessions) { @@ -145,7 +142,7 @@ namespace llarp bool EndpointUtil::HasPathToService(const Address& addr, - const Endpoint::Sessions& remoteSessions) + const Sessions& remoteSessions) { auto range = remoteSessions.equal_range(addr); auto itr = range.first; @@ -159,7 +156,7 @@ namespace llarp } bool - EndpointUtil::GetConvoTagsForService(const Endpoint::ConvoMap& sessions, + EndpointUtil::GetConvoTagsForService(const ConvoMap& sessions, const Address& info, std::set< ConvoTag >& tags) { diff --git a/llarp/service/endpoint_util.hpp b/llarp/service/endpoint_util.hpp index 974dddfee..f5045e06c 100644 --- a/llarp/service/endpoint_util.hpp +++ b/llarp/service/endpoint_util.hpp @@ -1,7 +1,7 @@ #ifndef LLARP_SERVICE_ENDPOINT_UTIL_HPP #define LLARP_SERVICE_ENDPOINT_UTIL_HPP -#include +#include namespace llarp { @@ -10,38 +10,36 @@ namespace llarp struct EndpointUtil { static void - ExpireSNodeSessions(llarp_time_t now, Endpoint::SNodeSessions& sessions); + ExpireSNodeSessions(llarp_time_t now, SNodeSessions& sessions); static void - ExpirePendingTx(llarp_time_t now, Endpoint::PendingLookups& lookups); + ExpirePendingTx(llarp_time_t now, PendingLookups& lookups); static void - ExpirePendingRouterLookups(llarp_time_t now, - Endpoint::PendingRouters& routers); + ExpirePendingRouterLookups(llarp_time_t now, PendingRouters& routers); static void - DeregisterDeadSessions(llarp_time_t now, Endpoint::Sessions& sessions); + DeregisterDeadSessions(llarp_time_t now, Sessions& sessions); static void - TickRemoteSessions(llarp_time_t now, Endpoint::Sessions& remoteSessions, - Endpoint::Sessions& deadSessions); + TickRemoteSessions(llarp_time_t now, Sessions& remoteSessions, + Sessions& deadSessions); static void - ExpireConvoSessions(llarp_time_t now, Endpoint::ConvoMap& sessions); + ExpireConvoSessions(llarp_time_t now, ConvoMap& sessions); static void - StopRemoteSessions(Endpoint::Sessions& remoteSessions); + StopRemoteSessions(Sessions& remoteSessions); static void - StopSnodeSessions(Endpoint::SNodeSessions& sessions); + StopSnodeSessions(SNodeSessions& sessions); static bool - HasPathToService(const Address& addr, - const Endpoint::Sessions& remoteSessions); + HasPathToService(const Address& addr, const Sessions& remoteSessions); static bool - GetConvoTagsForService(const Endpoint::ConvoMap& sessions, - const Address& addr, std::set< ConvoTag >& tags); + GetConvoTagsForService(const ConvoMap& sessions, const Address& addr, + std::set< ConvoTag >& tags); }; } // namespace service diff --git a/llarp/service/handler.hpp b/llarp/service/handler.hpp index c0b2c7037..9e029e720 100644 --- a/llarp/service/handler.hpp +++ b/llarp/service/handler.hpp @@ -5,7 +5,9 @@ #include #include #include + #include +#include namespace llarp { diff --git a/llarp/service/outbound_context.cpp b/llarp/service/outbound_context.cpp index e5f9dd875..f4f5319f2 100644 --- a/llarp/service/outbound_context.cpp +++ b/llarp/service/outbound_context.cpp @@ -303,7 +303,7 @@ namespace llarp } std::set< RouterID > exclude = prev; exclude.insert(m_NextIntro.router); - for(const auto& snode : m_Endpoint->m_SnodeBlacklist) + for(const auto& snode : m_Endpoint->SnodeBlacklist()) exclude.insert(snode); if(hop == numHops - 1) { @@ -384,7 +384,7 @@ namespace llarp { if(intro.ExpiresSoon(now)) continue; - if(m_Endpoint->m_SnodeBlacklist.count(intro.router)) + if(m_Endpoint->SnodeBlacklist().count(intro.router)) continue; if(m_BadIntros.find(intro) == m_BadIntros.end() && remoteIntro.router == intro.router) @@ -402,7 +402,7 @@ namespace llarp /// pick newer intro not on same router for(const auto& intro : currentIntroSet.I) { - if(m_Endpoint->m_SnodeBlacklist.count(intro.router)) + if(m_Endpoint->SnodeBlacklist().count(intro.router)) continue; m_Endpoint->EnsureRouterIsKnown(intro.router); if(intro.ExpiresSoon(now)) diff --git a/llarp/service/router_lookup_job.cpp b/llarp/service/router_lookup_job.cpp new file mode 100644 index 000000000..1b60d2cd4 --- /dev/null +++ b/llarp/service/router_lookup_job.cpp @@ -0,0 +1,15 @@ +#include + +#include + +namespace llarp +{ + namespace service + { + RouterLookupJob::RouterLookupJob(Endpoint* p, RouterLookupHandler h) + : handler(h), txid(p->GenTXID()), started(p->Now()) + { + } + + } // namespace service +} // namespace llarp diff --git a/llarp/service/router_lookup_job.hpp b/llarp/service/router_lookup_job.hpp new file mode 100644 index 000000000..504ad44a0 --- /dev/null +++ b/llarp/service/router_lookup_job.hpp @@ -0,0 +1,37 @@ +#ifndef LLARP_SERVICE_ROUTER_LOOKUP_JOB_HPP +#define LLARP_SERVICE_ROUTER_LOOKUP_JOB_HPP + +#include + +namespace llarp +{ + namespace service + { + struct Endpoint; + + struct RouterLookupJob + { + RouterLookupJob(Endpoint* p, RouterLookupHandler h); + + RouterLookupHandler handler; + uint64_t txid; + llarp_time_t started; + + bool + IsExpired(llarp_time_t now) const + { + if(now < started) + return false; + return now - started > 30000; + } + + void + InformResult(std::vector< RouterContact > result) + { + if(handler) + handler(result); + } + }; + } // namespace service +} // namespace llarp +#endif diff --git a/vendor/libtuntap-master/tuntap-unix.c b/vendor/libtuntap-master/tuntap-unix.c index 87a2a0641..cf8354af0 100644 --- a/vendor/libtuntap-master/tuntap-unix.c +++ b/vendor/libtuntap-master/tuntap-unix.c @@ -278,18 +278,11 @@ tuntap_read(struct device *dev, void *buf, size_t size) return 0; } #ifdef Darwin - unsigned int pktinfo = 0; + unsigned int pktinfo = 0; const struct iovec vecs[2] = { - { - .iov_base = &pktinfo, - .iov_len = sizeof(unsigned int) - }, - { - .iov_base = buf, - .iov_len = size - } - }; - n = readv(dev->tun_fd, vecs, 2); + {.iov_base = &pktinfo, .iov_len = sizeof(unsigned int)}, + {.iov_base = buf, .iov_len = size}}; + n = readv(dev->tun_fd, vecs, 2); if(n >= (int)(sizeof(unsigned int))) n -= sizeof(unsigned int); #else @@ -320,17 +313,13 @@ tuntap_write(struct device *dev, void *buf, size_t size) static unsigned int af6 = htonl(AF_INET6); const struct iovec vecs[2] = { - { - .iov_base = (((unsigned char*)buf)[0] & 0x60) == 0x60 ? &af6 : &af4, .iov_len = sizeof(unsigned int) - }, - { - .iov_base = buf, - .iov_len = size - } - }; + {.iov_base = (((unsigned char *)buf)[0] & 0x60) == 0x60 ? &af6 : &af4, + .iov_len = sizeof(unsigned int)}, + {.iov_base = buf, .iov_len = size}}; - n = writev(dev->tun_fd, &vecs, 2); - if (n >= sizeof(unsigned int)) n -= sizeof(unsigned int); + n = writev(dev->tun_fd, vecs, 2); + if(n >= (int)sizeof(unsigned int)) + n -= sizeof(unsigned int); #else n = write(dev->tun_fd, buf, size);