From 40449df0f1cea7681ff4c06d5a0e97e2a7eb99a3 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 22 Jan 2019 01:14:02 +0000 Subject: [PATCH] Split classes out of dht::Context --- CMakeLists.txt | 10 + llarp/dht/context.cpp | 439 +----------------------- llarp/dht/context.hpp | 228 +----------- llarp/dht/explorenetworkjob.cpp | 31 ++ llarp/dht/explorenetworkjob.hpp | 45 +++ llarp/dht/localrouterlookup.cpp | 46 +++ llarp/dht/localrouterlookup.hpp | 27 ++ llarp/dht/localserviceaddresslookup.cpp | 45 +++ llarp/dht/localserviceaddresslookup.hpp | 27 ++ llarp/dht/localtaglookup.cpp | 43 +++ llarp/dht/localtaglookup.hpp | 23 ++ llarp/dht/node.hpp | 3 +- llarp/dht/publishservicejob.cpp | 46 +++ llarp/dht/publishservicejob.hpp | 51 +++ llarp/dht/recursiverouterlookup.cpp | 55 +++ llarp/dht/recursiverouterlookup.hpp | 42 +++ llarp/dht/serviceaddresslookup.cpp | 79 +++++ llarp/dht/serviceaddresslookup.hpp | 44 +++ llarp/dht/taglookup.cpp | 59 ++++ llarp/dht/taglookup.hpp | 44 +++ llarp/dht/tx.cpp | 1 + llarp/dht/tx.hpp | 107 ++++++ llarp/dht/txholder.cpp | 1 + llarp/dht/txholder.hpp | 190 ++++++++++ llarp/router_contact.hpp | 4 + llarp/service/IntroSet.hpp | 5 + 26 files changed, 1052 insertions(+), 643 deletions(-) create mode 100644 llarp/dht/explorenetworkjob.cpp create mode 100644 llarp/dht/explorenetworkjob.hpp create mode 100644 llarp/dht/localrouterlookup.cpp create mode 100644 llarp/dht/localrouterlookup.hpp create mode 100644 llarp/dht/localserviceaddresslookup.cpp create mode 100644 llarp/dht/localserviceaddresslookup.hpp create mode 100644 llarp/dht/localtaglookup.cpp create mode 100644 llarp/dht/localtaglookup.hpp create mode 100644 llarp/dht/publishservicejob.cpp create mode 100644 llarp/dht/publishservicejob.hpp create mode 100644 llarp/dht/recursiverouterlookup.cpp create mode 100644 llarp/dht/recursiverouterlookup.hpp create mode 100644 llarp/dht/serviceaddresslookup.cpp create mode 100644 llarp/dht/serviceaddresslookup.hpp create mode 100644 llarp/dht/taglookup.cpp create mode 100644 llarp/dht/taglookup.hpp create mode 100644 llarp/dht/tx.cpp create mode 100644 llarp/dht/tx.hpp create mode 100644 llarp/dht/txholder.cpp create mode 100644 llarp/dht/txholder.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 8cf5e5d44..950a75a9a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -510,8 +510,12 @@ set(LIB_SRC llarp/dht/bucket.cpp llarp/dht/context.cpp llarp/dht/dht.cpp + llarp/dht/explorenetworkjob.cpp llarp/dht/kademlia.cpp llarp/dht/key.cpp + llarp/dht/localtaglookup.cpp + llarp/dht/localrouterlookup.cpp + llarp/dht/localserviceaddresslookup.cpp llarp/dht/message.cpp llarp/dht/messages/findintro.cpp llarp/dht/messages/findrouter.cpp @@ -519,6 +523,12 @@ set(LIB_SRC llarp/dht/messages/gotrouter.cpp llarp/dht/messages/pubintro.cpp llarp/dht/node.cpp + llarp/dht/publishservicejob.cpp + llarp/dht/recursiverouterlookup.cpp + llarp/dht/serviceaddresslookup.cpp + llarp/dht/taglookup.cpp + llarp/dht/tx.cpp + llarp/dht/txholder.cpp llarp/dht/txowner.cpp llarp/dns.cpp llarp/dnsc.cpp diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index 5b488372d..cff3089f9 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -1,10 +1,18 @@ #include +#include +#include +#include +#include #include #include #include #include #include +#include +#include +#include +#include #include #include #include @@ -15,18 +23,9 @@ namespace llarp { namespace dht { - Context::Context() + Context::Context() : router(nullptr), allowTransit(false) { randombytes((byte_t *)&ids, sizeof(uint64_t)); - allowTransit = false; - } - - Context::~Context() - { - if(nodes) - delete nodes; - if(services) - delete services; } void @@ -45,53 +44,6 @@ namespace llarp llarp::LogError("failed to select random nodes for exploration"); } - struct ExploreNetworkJob : public TX< RouterID, RouterID > - { - ExploreNetworkJob(const RouterID &peer, Context *ctx) - : TX< RouterID, RouterID >(TXOwner{}, peer, ctx) - { - } - - bool - Validate(const RouterID &) const override - { - // TODO: check with lokid - return true; - } - - void - Start(const TXOwner &peer) override - { - parent->DHTSendTo(peer.node.as_array(), - new FindRouterMessage(peer.txid)); - } - - bool - GetNextPeer(Key_t &, const std::set< Key_t > &) override - { - return false; - } - - void - DoNextRequest(const Key_t &) override - { - } - - void - SendReply() override - { - llarp::LogInfo("got ", valuesFound.size(), " routers from exploration"); - for(const auto &pk : valuesFound) - { - // lookup router - parent->LookupRouter( - pk, - std::bind(&llarp::Router::HandleDHTLookupForExplore, - parent->router, pk, std::placeholders::_1)); - } - } - }; - void Context::ExploreNetworkVia(const Key_t &askpeer) { @@ -272,8 +224,8 @@ namespace llarp { router = r; ourKey = us; - nodes = new Bucket< RCNode >(ourKey, llarp::randint); - services = new Bucket< ISNode >(ourKey, llarp::randint); + nodes = std::make_unique< Bucket< RCNode > >(ourKey, llarp::randint); + services = std::make_unique< Bucket< ISNode > >(ourKey, llarp::randint); llarp::LogDebug("initialize dht with key ", ourKey); // start exploring @@ -308,7 +260,7 @@ namespace llarp llarp::routing::DHTMessage reply; if(!msg->HandleMessage(router->dht, reply.M)) return false; - if(reply.M.size()) + if(!reply.M.empty()) { auto path = router->paths.GetByUpstream(router->pubkey(), id); return path && path->SendRoutingMessage(&reply, router); @@ -316,112 +268,6 @@ namespace llarp return true; } - struct ServiceAddressLookup - : public TX< service::Address, service::IntroSet > - { - IntroSetLookupHandler handleResult; - uint64_t R; - - ServiceAddressLookup(const TXOwner &asker, const service::Address &addr, - Context *ctx, uint64_t r, - IntroSetLookupHandler handler) - : TX< service::Address, service::IntroSet >(asker, addr, ctx) - , handleResult(handler) - , R(r) - { - peersAsked.insert(ctx->OurKey()); - } - - bool - Validate(const service::IntroSet &value) const override - { - if(!value.Verify(parent->Crypto(), parent->Now())) - { - llarp::LogWarn("Got invalid introset from service lookup"); - return false; - } - if(value.A.Addr() != target) - { - llarp::LogWarn("got introset with wrong target from service lookup"); - return false; - } - return true; - } - - bool - GetNextPeer(Key_t &next, const std::set< Key_t > &exclude) override - { - Key_t k = target.ToKey(); - return parent->nodes->FindCloseExcluding(k, next, exclude); - } - - void - Start(const TXOwner &peer) override - { - parent->DHTSendTo(peer.node.as_array(), - new FindIntroMessage(peer.txid, target, R)); - } - - void - DoNextRequest(const Key_t &ask) override - { - if(R) - parent->LookupIntroSetRecursive(target, whoasked.node, whoasked.txid, - ask, R - 1); - else - parent->LookupIntroSetIterative(target, whoasked.node, whoasked.txid, - ask); - } - - virtual void - SendReply() override - { - if(handleResult) - handleResult(valuesFound); - - parent->DHTSendTo(whoasked.node.as_array(), - new GotIntroMessage(valuesFound, whoasked.txid)); - } - }; - - struct LocalServiceAddressLookup : public ServiceAddressLookup - { - PathID_t localPath; - - LocalServiceAddressLookup(const PathID_t &pathid, uint64_t txid, - const service::Address &addr, Context *ctx, - __attribute__((unused)) const Key_t &askpeer) - : ServiceAddressLookup(TXOwner{ctx->OurKey(), txid}, addr, ctx, 5, - nullptr) - , localPath(pathid) - { - } - - void - SendReply() override - { - auto path = parent->router->paths.GetByUpstream( - parent->OurKey().as_array(), localPath); - if(!path) - { - llarp::LogWarn( - "did not send reply for relayed dht request, no such local path " - "for pathid=", - localPath); - return; - } - routing::DHTMessage msg; - msg.M.emplace_back(new GotIntroMessage(valuesFound, whoasked.txid)); - if(!path->SendRoutingMessage(&msg, parent->router)) - { - llarp::LogWarn( - "failed to send routing message when informing result of dht " - "request, pathid=", - localPath); - } - } - }; - void Context::LookupIntroSetForPath(const service::Address &addr, uint64_t txid, const llarp::PathID_t &path, @@ -434,62 +280,6 @@ namespace llarp new LocalServiceAddressLookup(path, txid, addr, this, askpeer)); } - struct PublishServiceJob : public TX< service::Address, service::IntroSet > - { - uint64_t S; - std::set< Key_t > dontTell; - service::IntroSet I; - PublishServiceJob(const TXOwner &asker, const service::IntroSet &introset, - Context *ctx, uint64_t s, - const std::set< Key_t > &exclude) - : TX< service::Address, service::IntroSet >(asker, introset.A.Addr(), - ctx) - , S(s) - , dontTell(exclude) - , I(introset) - { - } - - bool - Validate(const service::IntroSet &introset) const override - { - if(I.A != introset.A) - { - llarp::LogWarn( - "publish introset acknowledgement acked a different service"); - return false; - } - return true; - } - - void - Start(const TXOwner &peer) override - { - std::vector< Key_t > exclude; - for(const auto &router : dontTell) - exclude.push_back(router); - parent->DHTSendTo(peer.node.as_array(), - new PublishIntroMessage(I, peer.txid, S, exclude)); - } - - bool - GetNextPeer(Key_t &, const std::set< Key_t > &) override - { - return false; - } - - void - DoNextRequest(const Key_t &) override - { - } - - void - SendReply() override - { - // don't need this - } - }; - void Context::PropagateIntroSetTo(const Key_t &from, uint64_t txid, const service::IntroSet &introset, @@ -508,7 +298,7 @@ namespace llarp Context::LookupIntroSetRecursive(const service::Address &addr, const Key_t &whoasked, uint64_t txid, const Key_t &askpeer, uint64_t R, - IntroSetLookupHandler handler) + service::IntroSetLookupHandler handler) { TXOwner asker(whoasked, txid); TXOwner peer(askpeer, ++ids); @@ -521,7 +311,7 @@ namespace llarp Context::LookupIntroSetIterative(const service::Address &addr, const Key_t &whoasked, uint64_t txid, const Key_t &askpeer, - IntroSetLookupHandler handler) + service::IntroSetLookupHandler handler) { TXOwner asker(whoasked, txid); TXOwner peer(askpeer, ++ids); @@ -530,76 +320,6 @@ namespace llarp new ServiceAddressLookup(asker, addr, this, 0, handler)); } - struct TagLookup : public TX< service::Tag, service::IntroSet > - { - uint64_t R; - TagLookup(const TXOwner &asker, const service::Tag &tag, Context *ctx, - uint64_t r) - : TX< service::Tag, service::IntroSet >(asker, tag, ctx), R(r) - { - } - - bool - Validate(const service::IntroSet &introset) const override - { - if(!introset.Verify(parent->Crypto(), parent->Now())) - { - llarp::LogWarn("got invalid introset from tag lookup"); - return false; - } - if(introset.topic != target) - { - llarp::LogWarn("got introset with missmatched topic in tag lookup"); - return false; - } - return true; - } - - void - Start(const TXOwner &peer) override - { - parent->DHTSendTo(peer.node.as_array(), - new FindIntroMessage(target, peer.txid, R)); - } - - bool - GetNextPeer(Key_t &, const std::set< Key_t > &) override - { - return false; - } - - void - DoNextRequest(const Key_t &) override - { - } - - void - SendReply() override - { - std::set< service::IntroSet > found; - for(const auto &remoteTag : valuesFound) - { - found.insert(remoteTag); - } - // collect our local values if we haven't hit a limit - if(found.size() < 2) - { - for(const auto &localTag : - parent->FindRandomIntroSetsWithTagExcluding(target, 1, found)) - { - found.insert(localTag); - } - } - std::vector< service::IntroSet > values; - for(const auto &introset : found) - { - values.push_back(introset); - } - parent->DHTSendTo(whoasked.node.as_array(), - new GotIntroMessage(values, whoasked.txid)); - } - }; - void Context::LookupTagRecursive(const service::Tag &tag, const Key_t &whoasked, uint64_t whoaskedTX, const Key_t &askpeer, @@ -613,42 +333,6 @@ namespace llarp " R=", R); } - struct LocalTagLookup : public TagLookup - { - PathID_t localPath; - - LocalTagLookup(const PathID_t &path, uint64_t txid, - const service::Tag &target, Context *ctx) - : TagLookup(TXOwner{ctx->OurKey(), txid}, target, ctx, 0) - , localPath(path) - { - } - - void - SendReply() override - { - auto path = parent->router->paths.GetByUpstream( - parent->OurKey().as_array(), localPath); - if(!path) - { - llarp::LogWarn( - "did not send reply for relayed dht request, no such local path " - "for pathid=", - localPath); - return; - } - routing::DHTMessage msg; - msg.M.emplace_back(new GotIntroMessage(valuesFound, whoasked.txid)); - if(!path->SendRoutingMessage(&msg, parent->router)) - { - llarp::LogWarn( - "failed to send routing message when informing result of dht " - "request, pathid=", - localPath); - } - } - }; - void Context::LookupTagForPath(const service::Tag &tag, uint64_t txid, const llarp::PathID_t &path, const Key_t &askpeer) @@ -699,101 +383,6 @@ namespace llarp return true; } - struct RecursiveRouterLookup : public TX< RouterID, RouterContact > - { - RouterLookupHandler resultHandler; - RecursiveRouterLookup(const TXOwner &whoasked, const RouterID &target, - Context *ctx, RouterLookupHandler result) - : TX< RouterID, RouterContact >(whoasked, target, ctx) - , resultHandler(result) - - { - peersAsked.insert(ctx->OurKey()); - } - - bool - Validate(const RouterContact &rc) const override - { - if(!rc.Verify(parent->Crypto(), parent->Now())) - { - llarp::LogWarn("rc from lookup result is invalid"); - return false; - } - return true; - } - - bool - GetNextPeer(Key_t &, const std::set< Key_t > &) override - { - return false; - } - - void - DoNextRequest(const Key_t &) override - { - } - - void - Start(const TXOwner &peer) override - { - parent->DHTSendTo(peer.node.as_array(), - new FindRouterMessage(peer.txid, target)); - } - - virtual void - SendReply() override - { - if(resultHandler) - { - resultHandler(valuesFound); - } - else - { - parent->DHTSendTo( - whoasked.node.as_array(), - new GotRouterMessage({}, whoasked.txid, valuesFound, false)); - } - } - }; - - struct LocalRouterLookup : public RecursiveRouterLookup - { - PathID_t localPath; - - LocalRouterLookup(const PathID_t &path, uint64_t txid, - const RouterID &target, Context *ctx) - : RecursiveRouterLookup(TXOwner{ctx->OurKey(), txid}, target, ctx, - nullptr) - , localPath(path) - { - } - - void - SendReply() override - { - auto path = parent->router->paths.GetByUpstream( - parent->OurKey().as_array(), localPath); - if(!path) - { - llarp::LogWarn( - "did not send reply for relayed dht request, no such local path " - "for pathid=", - localPath); - return; - } - routing::DHTMessage msg; - msg.M.emplace_back(new GotRouterMessage(parent->OurKey(), whoasked.txid, - valuesFound, true)); - if(!path->SendRoutingMessage(&msg, parent->router)) - { - llarp::LogWarn( - "failed to send routing message when informing result of dht " - "request, pathid=", - localPath); - } - } - }; - void Context::LookupRouterForPath(const RouterID &target, uint64_t txid, const llarp::PathID_t &path, diff --git a/llarp/dht/context.hpp b/llarp/dht/context.hpp index 1f1e84dbb..ce1b2ddfd 100644 --- a/llarp/dht/context.hpp +++ b/llarp/dht/context.hpp @@ -1,5 +1,5 @@ -#ifndef LLARP_DHT_CONTEXT_HPP -#define LLARP_DHT_CONTEXT_HPP +#ifndef LLARP_DHT_CONTEXT +#define LLARP_DHT_CONTEXT #include #include @@ -7,6 +7,8 @@ #include #include #include +#include +#include #include #include #include @@ -19,93 +21,9 @@ namespace llarp namespace dht { - struct Context; - - template < typename K, typename V > - struct TX - { - TX(const TXOwner& asker, const K& k, Context* p) - : target(k), whoasked(asker) - { - parent = p; - } - - virtual ~TX(){}; - - K target; - Context* parent; - std::set< Key_t > peersAsked; - std::vector< V > valuesFound; - TXOwner whoasked; - - virtual bool - Validate(const V& value) const = 0; - - void - OnFound(const Key_t askedPeer, const V& value) - { - peersAsked.insert(askedPeer); - if(Validate(value)) - valuesFound.push_back(value); - } - - virtual void - Start(const TXOwner& peer) = 0; - - virtual bool - GetNextPeer(Key_t& next, const std::set< Key_t >& exclude) = 0; - - virtual void - DoNextRequest(const Key_t& peer) = 0; - - /// return true if we want to persist this tx - bool - AskNextPeer(const Key_t& prevPeer, const std::unique_ptr< Key_t >& next) - { - peersAsked.insert(prevPeer); - Key_t peer; - if(next) - { - // explicit next peer provided - peer = *next; - } - else if(!GetNextPeer(peer, peersAsked)) - { - // no more peers - llarp::LogInfo("no more peers for request asking for", target); - return false; - } - - const Key_t targetKey{target}; - if((prevPeer ^ targetKey) < (peer ^ targetKey)) - { - // next peer is not closer - llarp::LogInfo("next peer ", peer, " is not closer to ", target, - " than ", prevPeer); - return false; - } - else - { - peersAsked.insert(peer); - } - DoNextRequest(peer); - return true; - } - - virtual void - SendReply() = 0; - }; - - using IntroSetLookupHandler = - std::function< void(const std::vector< service::IntroSet >&) >; - - using RouterLookupHandler = - std::function< void(const std::vector< RouterContact >&) >; - struct Context { Context(); - ~Context(); llarp::Crypto* Crypto(); @@ -116,13 +34,13 @@ namespace llarp LookupIntroSetRecursive(const service::Address& target, const Key_t& whoasked, uint64_t whoaskedTX, const Key_t& askpeer, uint64_t R, - IntroSetLookupHandler result = nullptr); + service::IntroSetLookupHandler result = nullptr); void LookupIntroSetIterative(const service::Address& target, const Key_t& whoasked, uint64_t whoaskedTX, const Key_t& askpeer, - IntroSetLookupHandler result = nullptr); + service::IntroSetLookupHandler result = nullptr); /// on behalf of whoasked request router with public key target from dht /// router with key askpeer @@ -136,7 +54,9 @@ namespace llarp { Key_t askpeer; if(!nodes->FindClosest(Key_t(target), askpeer)) + { return false; + } LookupRouterRecursive(target, OurKey(), 0, askpeer, result); return true; } @@ -221,13 +141,13 @@ namespace llarp void Explore(size_t N = 3); - llarp::Router* router = nullptr; + llarp::Router* router; // for router contacts - Bucket< RCNode >* nodes = nullptr; + std::unique_ptr< Bucket< RCNode > > nodes; // for introduction sets - Bucket< ISNode >* services = nullptr; - bool allowTransit = false; + std::unique_ptr< Bucket< ISNode > > services; + bool allowTransit; const Key_t& OurKey() const @@ -235,130 +155,6 @@ namespace llarp return ourKey; } - template < typename K, typename V, typename K_Hash, - llarp_time_t requestTimeoutMS = 5000UL > - struct TXHolder - { - // tx who are waiting for a reply for each key - std::unordered_multimap< K, TXOwner, K_Hash > waiting; - // tx timesouts by key - std::unordered_map< K, llarp_time_t, K_Hash > timeouts; - // maps remote peer with tx to handle reply from them - std::unordered_map< TXOwner, std::unique_ptr< TX< K, V > >, - TXOwner::Hash > - tx; - - const TX< K, V >* - GetPendingLookupFrom(const TXOwner& owner) const - { - auto itr = tx.find(owner); - if(itr == tx.end()) - return nullptr; - else - return itr->second.get(); - } - - bool - HasLookupFor(const K& target) const - { - return timeouts.find(target) != timeouts.end(); - } - - bool - HasPendingLookupFrom(const TXOwner& owner) const - { - return GetPendingLookupFrom(owner) != nullptr; - } - - void - NewTX(const TXOwner& askpeer, const TXOwner& whoasked, const K& k, - TX< K, V >* t) - { - (void)whoasked; - tx.emplace(askpeer, std::unique_ptr< TX< K, V > >(t)); - auto count = waiting.count(k); - waiting.insert(std::make_pair(k, askpeer)); - - auto itr = timeouts.find(k); - if(itr == timeouts.end()) - { - timeouts.insert( - std::make_pair(k, time_now_ms() + requestTimeoutMS)); - } - if(count == 0) - t->Start(askpeer); - } - - /// mark tx as not fond - void - NotFound(const TXOwner& from, const std::unique_ptr< Key_t >& next) - { - bool sendReply = true; - auto txitr = tx.find(from); - if(txitr == tx.end()) - return; - - // ask for next peer - if(txitr->second->AskNextPeer(from.node, next)) - sendReply = false; - - llarp::LogWarn("Target key ", txitr->second->target); - Inform(from, txitr->second->target, {}, sendReply, sendReply); - } - - void - Found(const TXOwner& from, const K& k, const std::vector< V >& values) - { - Inform(from, k, values, true); - } - - /// inform all watches for key of values found - void - Inform(TXOwner from, K key, std::vector< V > values, - bool sendreply = false, bool removeTimeouts = true) - { - auto range = waiting.equal_range(key); - auto itr = range.first; - while(itr != range.second) - { - auto txitr = tx.find(itr->second); - if(txitr != tx.end()) - { - for(const auto& value : values) - txitr->second->OnFound(from.node, value); - if(sendreply) - { - txitr->second->SendReply(); - tx.erase(txitr); - } - } - ++itr; - } - - if(sendreply) - waiting.erase(key); - - if(removeTimeouts) - timeouts.erase(key); - } - - void - Expire(llarp_time_t now) - { - auto itr = timeouts.begin(); - while(itr != timeouts.end()) - { - if(now > itr->second && now - itr->second >= requestTimeoutMS) - { - Inform(TXOwner{}, itr->first, {}, true, false); - itr = timeouts.erase(itr); - } - else - ++itr; - } - } - }; - TXHolder< service::Address, service::IntroSet, service::Address::Hash > pendingIntrosetLookups; diff --git a/llarp/dht/explorenetworkjob.cpp b/llarp/dht/explorenetworkjob.cpp new file mode 100644 index 000000000..0b22b0caf --- /dev/null +++ b/llarp/dht/explorenetworkjob.cpp @@ -0,0 +1,31 @@ +#include + +#include +#include +#include + +namespace llarp +{ + namespace dht + { + void + ExploreNetworkJob::Start(const TXOwner &peer) + { + parent->DHTSendTo(peer.node.as_array(), new FindRouterMessage(peer.txid)); + } + + void + ExploreNetworkJob::SendReply() + { + llarp::LogInfo("got ", valuesFound.size(), " routers from exploration"); + for(const auto &pk : valuesFound) + { + // lookup router + parent->LookupRouter( + pk, + std::bind(&Router::HandleDHTLookupForExplore, parent->router, pk, + std::placeholders::_1)); + } + } + } // namespace dht +} // namespace llarp diff --git a/llarp/dht/explorenetworkjob.hpp b/llarp/dht/explorenetworkjob.hpp new file mode 100644 index 000000000..0ab627f85 --- /dev/null +++ b/llarp/dht/explorenetworkjob.hpp @@ -0,0 +1,45 @@ +#ifndef LLARP_DHT_EXPLORENETWORKJOB +#define LLARP_DHT_EXPLORENETWORKJOB + +#include +#include + +namespace llarp +{ + namespace dht + { + struct ExploreNetworkJob : public TX< RouterID, RouterID > + { + ExploreNetworkJob(const RouterID &peer, Context *ctx) + : TX< RouterID, RouterID >(TXOwner{}, peer, ctx) + { + } + + bool + Validate(const RouterID &) const override + { + // TODO: check with lokid + return true; + } + + void + Start(const TXOwner &peer) override; + + bool + GetNextPeer(Key_t &, const std::set< Key_t > &) override + { + return false; + } + + void + DoNextRequest(const Key_t &) override + { + } + + void + SendReply() override; + }; + } // namespace dht +} // namespace llarp + +#endif diff --git a/llarp/dht/localrouterlookup.cpp b/llarp/dht/localrouterlookup.cpp new file mode 100644 index 000000000..55245a936 --- /dev/null +++ b/llarp/dht/localrouterlookup.cpp @@ -0,0 +1,46 @@ +#include + +#include +#include +#include +#include +#include + +namespace llarp +{ + namespace dht + { + LocalRouterLookup::LocalRouterLookup(const PathID_t &path, uint64_t txid, + const RouterID &target, Context *ctx) + : RecursiveRouterLookup(TXOwner{ctx->OurKey(), txid}, target, ctx, + nullptr) + , localPath(path) + { + } + + void + LocalRouterLookup::SendReply() + { + auto path = parent->router->paths.GetByUpstream( + parent->OurKey().as_array(), localPath); + if(!path) + { + llarp::LogWarn( + "did not send reply for relayed dht request, no such local path " + "for pathid=", + localPath); + return; + } + routing::DHTMessage msg; + msg.M.emplace_back(new GotRouterMessage(parent->OurKey(), whoasked.txid, + valuesFound, true)); + if(!path->SendRoutingMessage(&msg, parent->router)) + { + llarp::LogWarn( + "failed to send routing message when informing result of dht " + "request, pathid=", + localPath); + } + } + } // namespace dht +} // namespace llarp diff --git a/llarp/dht/localrouterlookup.hpp b/llarp/dht/localrouterlookup.hpp new file mode 100644 index 000000000..de4c6a087 --- /dev/null +++ b/llarp/dht/localrouterlookup.hpp @@ -0,0 +1,27 @@ +#ifndef LLARP_DHT_LOCALROUTERLOOKUP +#define LLARP_DHT_LOCALROUTERLOOKUP + +#include + +#include +#include +#include + +namespace llarp +{ + namespace dht + { + struct LocalRouterLookup : public RecursiveRouterLookup + { + PathID_t localPath; + + LocalRouterLookup(const PathID_t &path, uint64_t txid, + const RouterID &target, Context *ctx); + + void + SendReply() override; + }; + } // namespace dht +} // namespace llarp + +#endif diff --git a/llarp/dht/localserviceaddresslookup.cpp b/llarp/dht/localserviceaddresslookup.cpp new file mode 100644 index 000000000..0958f6b35 --- /dev/null +++ b/llarp/dht/localserviceaddresslookup.cpp @@ -0,0 +1,45 @@ +#include + +#include +#include +#include +#include + +namespace llarp +{ + namespace dht + { + LocalServiceAddressLookup::LocalServiceAddressLookup( + const PathID_t &pathid, uint64_t txid, const service::Address &addr, + Context *ctx, __attribute__((unused)) const Key_t &askpeer) + : ServiceAddressLookup(TXOwner{ctx->OurKey(), txid}, addr, ctx, 5, + nullptr) + , localPath(pathid) + { + } + + void + LocalServiceAddressLookup::SendReply() + { + auto path = parent->router->paths.GetByUpstream( + parent->OurKey().as_array(), localPath); + if(!path) + { + llarp::LogWarn( + "did not send reply for relayed dht request, no such local path " + "for pathid=", + localPath); + return; + } + routing::DHTMessage msg; + msg.M.emplace_back(new GotIntroMessage(valuesFound, whoasked.txid)); + if(!path->SendRoutingMessage(&msg, parent->router)) + { + llarp::LogWarn( + "failed to send routing message when informing result of dht " + "request, pathid=", + localPath); + } + } + } // namespace dht +} // namespace llarp diff --git a/llarp/dht/localserviceaddresslookup.hpp b/llarp/dht/localserviceaddresslookup.hpp new file mode 100644 index 000000000..078006c16 --- /dev/null +++ b/llarp/dht/localserviceaddresslookup.hpp @@ -0,0 +1,27 @@ +#ifndef LLARP_DHT_LOCALSERVICEADDRESSLOOKUP +#define LLARP_DHT_LOCALSERVICEADDRESSLOOKUP + +#include + +#include + +namespace llarp +{ + namespace dht + { + struct LocalServiceAddressLookup : public ServiceAddressLookup + { + PathID_t localPath; + + LocalServiceAddressLookup(const PathID_t &pathid, uint64_t txid, + const service::Address &addr, Context *ctx, + __attribute__((unused)) const Key_t &askpeer); + + void + SendReply() override; + }; + + } // namespace dht +} // namespace llarp + +#endif diff --git a/llarp/dht/localtaglookup.cpp b/llarp/dht/localtaglookup.cpp new file mode 100644 index 000000000..2c7c9a072 --- /dev/null +++ b/llarp/dht/localtaglookup.cpp @@ -0,0 +1,43 @@ +#include + +#include +#include +#include +#include + +namespace llarp +{ + namespace dht + { + LocalTagLookup::LocalTagLookup(const PathID_t &path, uint64_t txid, + const service::Tag &target, Context *ctx) + : TagLookup(TXOwner{ctx->OurKey(), txid}, target, ctx, 0) + , localPath(path) + { + } + + void + LocalTagLookup::SendReply() + { + auto path = parent->router->paths.GetByUpstream( + parent->OurKey().as_array(), localPath); + if(!path) + { + llarp::LogWarn( + "did not send reply for relayed dht request, no such local path " + "for pathid=", + localPath); + return; + } + routing::DHTMessage msg; + msg.M.emplace_back(new GotIntroMessage(valuesFound, whoasked.txid)); + if(!path->SendRoutingMessage(&msg, parent->router)) + { + llarp::LogWarn( + "failed to send routing message when informing result of dht " + "request, pathid=", + localPath); + } + } + } // namespace dht +} // namespace llarp diff --git a/llarp/dht/localtaglookup.hpp b/llarp/dht/localtaglookup.hpp new file mode 100644 index 000000000..cc8832236 --- /dev/null +++ b/llarp/dht/localtaglookup.hpp @@ -0,0 +1,23 @@ +#ifndef LLARP_DHT_LOOKUPTAGLOOKUP +#define LLARP_DHT_LOOKUPTAGLOOKUP + +#include + +namespace llarp +{ + namespace dht + { + struct LocalTagLookup : public TagLookup + { + PathID_t localPath; + + LocalTagLookup(const PathID_t &path, uint64_t txid, + const service::Tag &target, Context *ctx); + + void + SendReply() override; + }; + } // namespace dht +} // namespace llarp + +#endif diff --git a/llarp/dht/node.hpp b/llarp/dht/node.hpp index 5ad58eaea..3ae1ee0a7 100644 --- a/llarp/dht/node.hpp +++ b/llarp/dht/node.hpp @@ -41,9 +41,8 @@ namespace llarp ID.Zero(); } - ISNode(const service::IntroSet& other) + ISNode(const service::IntroSet& other) : introset(other) { - introset = other; introset.A.CalculateAddress(ID.as_array()); } diff --git a/llarp/dht/publishservicejob.cpp b/llarp/dht/publishservicejob.cpp new file mode 100644 index 000000000..9481c26fa --- /dev/null +++ b/llarp/dht/publishservicejob.cpp @@ -0,0 +1,46 @@ +#include + +#include +#include + +namespace llarp +{ + namespace dht + { + PublishServiceJob::PublishServiceJob(const TXOwner &asker, + const service::IntroSet &introset, + Context *ctx, uint64_t s, + const std::set< Key_t > &exclude) + : TX< service::Address, service::IntroSet >(asker, introset.A.Addr(), + ctx) + , S(s) + , dontTell(exclude) + , I(introset) + { + } + + bool + PublishServiceJob::Validate(const service::IntroSet &introset) const + { + if(I.A != introset.A) + { + llarp::LogWarn( + "publish introset acknowledgement acked a different service"); + return false; + } + return true; + } + + void + PublishServiceJob::Start(const TXOwner &peer) + { + std::vector< Key_t > exclude; + for(const auto &router : dontTell) + { + exclude.push_back(router); + } + parent->DHTSendTo(peer.node.as_array(), + new PublishIntroMessage(I, peer.txid, S, exclude)); + } + } // namespace dht +} // namespace llarp diff --git a/llarp/dht/publishservicejob.hpp b/llarp/dht/publishservicejob.hpp new file mode 100644 index 000000000..2ad41e461 --- /dev/null +++ b/llarp/dht/publishservicejob.hpp @@ -0,0 +1,51 @@ +#ifndef LLARP_DHT_PUBLISHSERVICEJOB +#define LLARP_DHT_PUBLISHSERVICEJOB + +#include +#include +#include +#include + +#include + +namespace llarp +{ + namespace dht + { + struct PublishServiceJob : public TX< service::Address, service::IntroSet > + { + uint64_t S; + std::set< Key_t > dontTell; + service::IntroSet I; + + PublishServiceJob(const TXOwner &asker, const service::IntroSet &introset, + Context *ctx, uint64_t s, + const std::set< Key_t > &exclude); + + bool + Validate(const service::IntroSet &introset) const override; + + void + Start(const TXOwner &peer) override; + + bool + GetNextPeer(Key_t &, const std::set< Key_t > &) override + { + return false; + } + + void + DoNextRequest(const Key_t &) override + { + } + + void + SendReply() override + { + // don't need this + } + }; + } // namespace dht +} // namespace llarp + +#endif diff --git a/llarp/dht/recursiverouterlookup.cpp b/llarp/dht/recursiverouterlookup.cpp new file mode 100644 index 000000000..9672a35b3 --- /dev/null +++ b/llarp/dht/recursiverouterlookup.cpp @@ -0,0 +1,55 @@ +#include + +#include +#include +#include + +namespace llarp +{ + namespace dht + { + RecursiveRouterLookup::RecursiveRouterLookup(const TXOwner &whoasked, + const RouterID &target, + Context *ctx, + RouterLookupHandler result) + : TX< RouterID, RouterContact >(whoasked, target, ctx) + , resultHandler(result) + + { + peersAsked.insert(ctx->OurKey()); + } + + bool + RecursiveRouterLookup::Validate(const RouterContact &rc) const + { + if(!rc.Verify(parent->Crypto(), parent->Now())) + { + llarp::LogWarn("rc from lookup result is invalid"); + return false; + } + return true; + } + + void + RecursiveRouterLookup::Start(const TXOwner &peer) + { + parent->DHTSendTo(peer.node.as_array(), + new FindRouterMessage(peer.txid, target)); + } + + void + RecursiveRouterLookup::SendReply() + { + if(resultHandler) + { + resultHandler(valuesFound); + } + else + { + parent->DHTSendTo( + whoasked.node.as_array(), + new GotRouterMessage({}, whoasked.txid, valuesFound, false)); + } + } + } // namespace dht +} // namespace llarp diff --git a/llarp/dht/recursiverouterlookup.hpp b/llarp/dht/recursiverouterlookup.hpp new file mode 100644 index 000000000..f4b0e4a5d --- /dev/null +++ b/llarp/dht/recursiverouterlookup.hpp @@ -0,0 +1,42 @@ +#ifndef LLARP_DHT_RECURSIVEROUTERLOOKUP +#define LLARP_DHT_RECURSIVEROUTERLOOKUP + +#include + +#include +#include + +namespace llarp +{ + namespace dht + { + struct RecursiveRouterLookup : public TX< RouterID, RouterContact > + { + RouterLookupHandler resultHandler; + RecursiveRouterLookup(const TXOwner &whoasked, const RouterID &target, + Context *ctx, RouterLookupHandler result); + + bool + Validate(const RouterContact &rc) const override; + + bool + GetNextPeer(Key_t &, const std::set< Key_t > &) override + { + return false; + } + + void + DoNextRequest(const Key_t &) override + { + } + + void + Start(const TXOwner &peer) override; + + virtual void + SendReply() override; + }; + } // namespace dht +} // namespace llarp + +#endif diff --git a/llarp/dht/serviceaddresslookup.cpp b/llarp/dht/serviceaddresslookup.cpp new file mode 100644 index 000000000..d9395e7c5 --- /dev/null +++ b/llarp/dht/serviceaddresslookup.cpp @@ -0,0 +1,79 @@ +#include + +#include +#include +#include + +namespace llarp +{ + namespace dht + { + ServiceAddressLookup::ServiceAddressLookup( + const TXOwner &asker, const service::Address &addr, Context *ctx, + uint64_t r, service::IntroSetLookupHandler handler) + : TX< service::Address, service::IntroSet >(asker, addr, ctx) + , handleResult(handler) + , R(r) + { + peersAsked.insert(ctx->OurKey()); + } + + bool + ServiceAddressLookup::Validate(const service::IntroSet &value) const + { + if(!value.Verify(parent->Crypto(), parent->Now())) + { + llarp::LogWarn("Got invalid introset from service lookup"); + return false; + } + if(value.A.Addr() != target) + { + llarp::LogWarn("got introset with wrong target from service lookup"); + return false; + } + return true; + } + + bool + ServiceAddressLookup::GetNextPeer(Key_t &next, + const std::set< Key_t > &exclude) + { + Key_t k = target.ToKey(); + return parent->nodes->FindCloseExcluding(k, next, exclude); + } + + void + ServiceAddressLookup::Start(const TXOwner &peer) + { + parent->DHTSendTo(peer.node.as_array(), + new FindIntroMessage(peer.txid, target, R)); + } + + void + ServiceAddressLookup::DoNextRequest(const Key_t &ask) + { + if(R) + { + parent->LookupIntroSetRecursive(target, whoasked.node, whoasked.txid, + ask, R - 1); + } + else + { + parent->LookupIntroSetIterative(target, whoasked.node, whoasked.txid, + ask); + } + } + + void + ServiceAddressLookup::SendReply() + { + if(handleResult) + { + handleResult(valuesFound); + } + + parent->DHTSendTo(whoasked.node.as_array(), + new GotIntroMessage(valuesFound, whoasked.txid)); + } + } // namespace dht +} // namespace llarp diff --git a/llarp/dht/serviceaddresslookup.hpp b/llarp/dht/serviceaddresslookup.hpp new file mode 100644 index 000000000..81425b752 --- /dev/null +++ b/llarp/dht/serviceaddresslookup.hpp @@ -0,0 +1,44 @@ +#ifndef LLARP_DHT_SERVICEADDRESSLOOKUP +#define LLARP_DHT_SERVICEADDRESSLOOKUP + +#include +#include +#include +#include + +namespace llarp +{ + namespace dht + { + struct TXOwner; + + struct ServiceAddressLookup + : public TX< service::Address, service::IntroSet > + { + service::IntroSetLookupHandler handleResult; + uint64_t R; + + ServiceAddressLookup(const TXOwner &asker, const service::Address &addr, + Context *ctx, uint64_t r, + service::IntroSetLookupHandler handler); + + bool + Validate(const service::IntroSet &value) const override; + + bool + GetNextPeer(Key_t &next, const std::set< Key_t > &exclude) override; + + void + Start(const TXOwner &peer) override; + + void + DoNextRequest(const Key_t &ask) override; + + virtual void + SendReply() override; + }; + } // namespace dht + +} // namespace llarp + +#endif diff --git a/llarp/dht/taglookup.cpp b/llarp/dht/taglookup.cpp new file mode 100644 index 000000000..8c84792b0 --- /dev/null +++ b/llarp/dht/taglookup.cpp @@ -0,0 +1,59 @@ +#include + +#include +#include + +namespace llarp +{ + namespace dht + { + bool + TagLookup::Validate(const service::IntroSet &introset) const + { + if(!introset.Verify(parent->Crypto(), parent->Now())) + { + llarp::LogWarn("got invalid introset from tag lookup"); + return false; + } + if(introset.topic != target) + { + llarp::LogWarn("got introset with missmatched topic in tag lookup"); + return false; + } + return true; + } + + void + TagLookup::Start(const TXOwner &peer) + { + parent->DHTSendTo(peer.node.as_array(), + new FindIntroMessage(target, peer.txid, R)); + } + + void + TagLookup::SendReply() + { + std::set< service::IntroSet > found; + for(const auto &remoteTag : valuesFound) + { + found.insert(remoteTag); + } + // collect our local values if we haven't hit a limit + if(found.size() < 2) + { + for(const auto &localTag : + parent->FindRandomIntroSetsWithTagExcluding(target, 1, found)) + { + found.insert(localTag); + } + } + std::vector< service::IntroSet > values; + for(const auto &introset : found) + { + values.push_back(introset); + } + parent->DHTSendTo(whoasked.node.as_array(), + new GotIntroMessage(values, whoasked.txid)); + } + } // namespace dht +} // namespace llarp diff --git a/llarp/dht/taglookup.hpp b/llarp/dht/taglookup.hpp new file mode 100644 index 000000000..2c2e6e63b --- /dev/null +++ b/llarp/dht/taglookup.hpp @@ -0,0 +1,44 @@ +#ifndef LLARP_DHT_TAGLOOKUP +#define LLARP_DHT_TAGLOOKUP + +#include +#include +#include + +namespace llarp +{ + namespace dht + { + struct TagLookup : public TX< service::Tag, service::IntroSet > + { + uint64_t R; + TagLookup(const TXOwner &asker, const service::Tag &tag, Context *ctx, + uint64_t r) + : TX< service::Tag, service::IntroSet >(asker, tag, ctx), R(r) + { + } + + bool + Validate(const service::IntroSet &introset) const override; + + void + Start(const TXOwner &peer) override; + + bool + GetNextPeer(Key_t &, const std::set< Key_t > &) override + { + return false; + } + + void + DoNextRequest(const Key_t &) override + { + } + + void + SendReply() override; + }; + } // namespace dht +} // namespace llarp + +#endif diff --git a/llarp/dht/tx.cpp b/llarp/dht/tx.cpp new file mode 100644 index 000000000..4e60e79f0 --- /dev/null +++ b/llarp/dht/tx.cpp @@ -0,0 +1 @@ +#include diff --git a/llarp/dht/tx.hpp b/llarp/dht/tx.hpp new file mode 100644 index 000000000..9bd42103a --- /dev/null +++ b/llarp/dht/tx.hpp @@ -0,0 +1,107 @@ +#ifndef LLARP_DHT_TX +#define LLARP_DHT_TX + +#include +#include +#include + +#include +#include + +namespace llarp +{ + struct Router; + + namespace dht + { + struct Context; + + template < typename K, typename V > + struct TX + { + K target; + Context* parent; + std::set< Key_t > peersAsked; + std::vector< V > valuesFound; + TXOwner whoasked; + + TX(const TXOwner& asker, const K& k, Context* p) + : target(k), whoasked(asker) + { + parent = p; + } + + virtual ~TX(){}; + + virtual bool + Validate(const V& value) const = 0; + + void + OnFound(const Key_t askedPeer, const V& value); + + virtual void + Start(const TXOwner& peer) = 0; + + virtual bool + GetNextPeer(Key_t& next, const std::set< Key_t >& exclude) = 0; + + virtual void + DoNextRequest(const Key_t& peer) = 0; + + /// return true if we want to persist this tx + bool + AskNextPeer(const Key_t& prevPeer, const std::unique_ptr< Key_t >& next); + + virtual void + SendReply() = 0; + }; + + template < typename K, typename V > + inline void + TX< K, V >::OnFound(const Key_t askedPeer, const V& value) + { + peersAsked.insert(askedPeer); + if(Validate(value)) + { + valuesFound.push_back(value); + } + } + + template < typename K, typename V > + inline bool + TX< K, V >::AskNextPeer(const Key_t& prevPeer, + const std::unique_ptr< Key_t >& next) + { + peersAsked.insert(prevPeer); + Key_t peer; + if(next) + { + // explicit next peer provided + peer = *next; + } + else if(!GetNextPeer(peer, peersAsked)) + { + // no more peers + llarp::LogInfo("no more peers for request asking for", target); + return false; + } + + const Key_t targetKey{target}; + if((prevPeer ^ targetKey) < (peer ^ targetKey)) + { + // next peer is not closer + llarp::LogInfo("next peer ", peer, " is not closer to ", target, + " than ", prevPeer); + return false; + } + else + { + peersAsked.insert(peer); + } + DoNextRequest(peer); + return true; + } + } // namespace dht +} // namespace llarp + +#endif diff --git a/llarp/dht/txholder.cpp b/llarp/dht/txholder.cpp new file mode 100644 index 000000000..2f7bf76a7 --- /dev/null +++ b/llarp/dht/txholder.cpp @@ -0,0 +1 @@ +#include diff --git a/llarp/dht/txholder.hpp b/llarp/dht/txholder.hpp new file mode 100644 index 000000000..41107748f --- /dev/null +++ b/llarp/dht/txholder.hpp @@ -0,0 +1,190 @@ +#ifndef LLARP_DHT_TXHOLDER +#define LLARP_DHT_TXHOLDER + +#include +#include +#include +#include + +#include +#include + +namespace llarp +{ + namespace dht + { + template < typename K, typename V, typename K_Hash, + llarp_time_t requestTimeoutMS = 5000UL > + struct TXHolder + { + using TXPtr = std::unique_ptr< TX< K, V > >; + // tx who are waiting for a reply for each key + std::unordered_multimap< K, TXOwner, K_Hash > waiting; + // tx timesouts by key + std::unordered_map< K, llarp_time_t, K_Hash > timeouts; + // maps remote peer with tx to handle reply from them + std::unordered_map< TXOwner, TXPtr, TXOwner::Hash > tx; + + const TX< K, V >* + GetPendingLookupFrom(const TXOwner& owner) const; + + bool + HasLookupFor(const K& target) const + { + return timeouts.find(target) != timeouts.end(); + } + + bool + HasPendingLookupFrom(const TXOwner& owner) const + { + return GetPendingLookupFrom(owner) != nullptr; + } + + void + NewTX(const TXOwner& askpeer, const TXOwner& whoasked, const K& k, + TX< K, V >* t); + + /// mark tx as not fond + void + NotFound(const TXOwner& from, const std::unique_ptr< Key_t >& next); + + void + Found(const TXOwner& from, const K& k, const std::vector< V >& values) + { + Inform(from, k, values, true); + } + + /// inform all watches for key of values found + void + Inform(TXOwner from, K key, std::vector< V > values, + bool sendreply = false, bool removeTimeouts = true); + + void + Expire(llarp_time_t now); + }; + + template < typename K, typename V, typename K_Hash, + llarp_time_t requestTimeoutMS > + const TX< K, V >* + TXHolder< K, V, K_Hash, requestTimeoutMS >::GetPendingLookupFrom( + const TXOwner& owner) const + { + auto itr = tx.find(owner); + if(itr == tx.end()) + { + return nullptr; + } + else + { + return itr->second.get(); + } + } + + template < typename K, typename V, typename K_Hash, + llarp_time_t requestTimeoutMS > + void + TXHolder< K, V, K_Hash, requestTimeoutMS >::NewTX(const TXOwner& askpeer, + const TXOwner& whoasked, + const K& k, TX< K, V >* t) + { + (void)whoasked; + tx.emplace(askpeer, std::unique_ptr< TX< K, V > >(t)); + auto count = waiting.count(k); + waiting.emplace(k, askpeer); + + auto itr = timeouts.find(k); + if(itr == timeouts.end()) + { + timeouts.emplace(k, time_now_ms() + requestTimeoutMS); + } + if(count == 0) + { + t->Start(askpeer); + } + } + + template < typename K, typename V, typename K_Hash, + llarp_time_t requestTimeoutMS > + void + TXHolder< K, V, K_Hash, requestTimeoutMS >::NotFound( + const TXOwner& from, const std::unique_ptr< Key_t >& next) + { + bool sendReply = true; + auto txitr = tx.find(from); + if(txitr == tx.end()) + { + return; + } + + // ask for next peer + if(txitr->second->AskNextPeer(from.node, next)) + { + sendReply = false; + } + + llarp::LogWarn("Target key ", txitr->second->target); + Inform(from, txitr->second->target, {}, sendReply, sendReply); + } + + template < typename K, typename V, typename K_Hash, + llarp_time_t requestTimeoutMS > + void + TXHolder< K, V, K_Hash, requestTimeoutMS >::Inform(TXOwner from, K key, + std::vector< V > values, + bool sendreply, + bool removeTimeouts) + { + auto range = waiting.equal_range(key); + auto itr = range.first; + while(itr != range.second) + { + auto txitr = tx.find(itr->second); + if(txitr != tx.end()) + { + for(const auto& value : values) + { + txitr->second->OnFound(from.node, value); + } + if(sendreply) + { + txitr->second->SendReply(); + tx.erase(txitr); + } + } + ++itr; + } + + if(sendreply) + { + waiting.erase(key); + } + + if(removeTimeouts) + { + timeouts.erase(key); + } + } + + template < typename K, typename V, typename K_Hash, + llarp_time_t requestTimeoutMS > + void + TXHolder< K, V, K_Hash, requestTimeoutMS >::Expire(llarp_time_t now) + { + auto itr = timeouts.begin(); + while(itr != timeouts.end()) + { + if(now > itr->second && now - itr->second >= requestTimeoutMS) + { + Inform(TXOwner{}, itr->first, {}, true, false); + itr = timeouts.erase(itr); + } + else + { + ++itr; + } + } + } + } // namespace dht +} // namespace llarp + +#endif diff --git a/llarp/router_contact.hpp b/llarp/router_contact.hpp index 5e01875f0..3d3b1c077 100644 --- a/llarp/router_contact.hpp +++ b/llarp/router_contact.hpp @@ -8,6 +8,7 @@ #include #include +#include #include #define MAX_RC_SIZE (1024) @@ -188,6 +189,9 @@ namespace llarp bool VerifySignature(llarp::Crypto *crypto) const; }; + + using RouterLookupHandler = + std::function< void(const std::vector< RouterContact >&) >; } // namespace llarp #endif diff --git a/llarp/service/IntroSet.hpp b/llarp/service/IntroSet.hpp index 17c9fc9b7..46577f085 100644 --- a/llarp/service/IntroSet.hpp +++ b/llarp/service/IntroSet.hpp @@ -10,6 +10,7 @@ #include #include +#include #include #include @@ -153,6 +154,10 @@ namespace llarp bool Verify(llarp::Crypto* crypto, llarp_time_t now) const; }; + + using IntroSetLookupHandler = + std::function< void(const std::vector< IntroSet >&) >; + } // namespace service } // namespace llarp