From dff9aeb250ca6b2f34bc83b7da6ea00b81b16868 Mon Sep 17 00:00:00 2001 From: Stephen Shelton Date: Wed, 19 Feb 2020 17:07:46 -0700 Subject: [PATCH] Propagate Introset publishing redundantly --- llarp/dht/context.cpp | 14 ++--- llarp/dht/context.hpp | 3 +- llarp/dht/messages/pubintro.cpp | 108 +++++++++++++++++++++++--------- llarp/dht/messages/pubintro.hpp | 15 +++-- llarp/dht/publishservicejob.cpp | 20 +++--- llarp/dht/publishservicejob.hpp | 7 +-- llarp/service/endpoint.cpp | 12 ++-- llarp/service/endpoint.hpp | 2 +- test/dht/mock_context.hpp | 3 +- 9 files changed, 110 insertions(+), 74 deletions(-) diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index 6538ba962..29699ff93 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -123,10 +123,9 @@ namespace llarp /// send introset to peer from source with S counter and excluding peers void - PropagateIntroSetTo(const Key_t& source, uint64_t sourceTX, - const service::EncryptedIntroSet& introset, - const Key_t& peer, uint64_t S, - const std::set< Key_t >& exclude) override; + PropagateIntroSetTo(const Key_t& from, uint64_t txid, + const service::EncryptedIntroSet& introset, + const Key_t& tellpeer, bool relayed, uint64_t relayOrder); /// initialize dht context and explore every exploreInterval milliseconds void @@ -556,16 +555,15 @@ namespace llarp void Context::PropagateIntroSetTo(const Key_t& from, uint64_t txid, - const service::EncryptedIntroSet& introset, - const Key_t& tellpeer, uint64_t S, - const std::set< Key_t >& exclude) + const service::EncryptedIntroSet& introset, + const Key_t& tellpeer, bool relayed, uint64_t relayOrder) { TXOwner asker(from, txid); TXOwner peer(tellpeer, ++ids); const Key_t addr(introset.derivedSigningKey); _pendingIntrosetLookups.NewTX( peer, asker, addr, - new PublishServiceJob(asker, introset, this, S, exclude)); + new PublishServiceJob(asker, introset, this, relayed, relayOrder)); } void diff --git a/llarp/dht/context.hpp b/llarp/dht/context.hpp index e79b8e354..c7da9828f 100644 --- a/llarp/dht/context.hpp +++ b/llarp/dht/context.hpp @@ -94,8 +94,7 @@ namespace llarp virtual void PropagateIntroSetTo(const Key_t& source, uint64_t sourceTX, const service::EncryptedIntroSet& introset, - const Key_t& peer, uint64_t S, - const std::set< Key_t >& exclude) = 0; + const Key_t& peer, bool relayed, uint64_t relayOrder) = 0; virtual void Init(const Key_t& us, AbstractRouter* router, diff --git a/llarp/dht/messages/pubintro.cpp b/llarp/dht/messages/pubintro.cpp index eade44117..52c93e039 100644 --- a/llarp/dht/messages/pubintro.cpp +++ b/llarp/dht/messages/pubintro.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace llarp { @@ -18,13 +19,12 @@ namespace llarp llarp_buffer_t *val) { bool read = false; - if(key == "E") - { - return BEncodeReadList(exclude, val); - } if(!BEncodeMaybeReadDictEntry("I", introset, read, key, val)) return false; - if(!BEncodeMaybeReadDictInt("S", depth, read, key, val)) + if(!BEncodeMaybeReadDictInt("O", relayOrder, read, key, val)) + return false; + uint64_t relayedInt = (relayed ? 1 : 0); + if(!BEncodeMaybeReadDictInt("R", relayedInt, read, key, val)) return false; if(!BEncodeMaybeReadDictInt("T", txID, read, key, val)) return false; @@ -40,44 +40,90 @@ namespace llarp { auto now = ctx->impl->Now(); - if(depth > MaxPropagationDepth) - { - llarp::LogWarn("invalid propgagation depth value ", depth, " > ", - MaxPropagationDepth); - return false; - } auto &dht = *ctx->impl; if(!introset.Verify(now)) { - llarp::LogWarn("invalid introset: ", introset); + llarp::LogWarn("Received PublishIntroMessage with invalid introset: ", + introset); // don't propogate or store replies.emplace_back(new GotIntroMessage({}, txID)); return true; } - const llarp::dht::Key_t addr(introset.derivedSigningKey); - - now += llarp::service::MAX_INTROSET_TIME_DELTA; - if(introset.IsExpired(now)) + if(introset.IsExpired(now + llarp::service::MAX_INTROSET_TIME_DELTA)) { // don't propogate or store + llarp::LogWarn("Received PublishIntroMessage with expired Introset: ", + introset); replies.emplace_back(new GotIntroMessage({}, txID)); return true; } - dht.services()->PutNode(introset); - replies.emplace_back(new GotIntroMessage({introset}, txID)); - Key_t peer; - std::set< Key_t > exclude_propagate; - for(const auto &e : exclude) - exclude_propagate.insert(e); - exclude_propagate.insert(From); - exclude_propagate.insert(dht.OurKey()); - if(depth > 0 - && dht.Nodes()->FindCloseExcluding(addr, peer, exclude_propagate)) + + const llarp::dht::Key_t addr(introset.derivedSigningKey); + + // identify closest 4 routers + auto closestRCs = dht.GetRouter()->nodedb()->FindClosestTo(addr, 4); + if (closestRCs.size() != 4) { - dht.PropagateIntroSetTo(From, txID, introset, peer, depth - 1, - exclude_propagate); + llarp::LogWarn("Received PublishIntroMessage but only know ", + closestRCs.size(), " nodes"); + replies.emplace_back(new GotIntroMessage({}, txID)); + return true; } + + // function to identify the closest 4 routers we know of for this introset + auto propagateToClosestFour = [&, this]() { + + // grab 1st & 2nd if we are relayOrder == 0, 3rd & 4th otherwise + auto rc0 = (relayOrder == 0 ? closestRCs[0] : closestRCs[2]); + auto rc1 = (relayOrder == 0 ? closestRCs[1] : closestRCs[3]); + + Key_t peer0{rc0.pubkey}; + Key_t peer1{rc1.pubkey}; + + // TODO: handle case where we are peer0 or peer1 + + dht.PropagateIntroSetTo(From, txID, introset, peer0, false, 0); + dht.PropagateIntroSetTo(From, txID, introset, peer1, false, 0); + }; + + if (relayed) + { + if (relayOrder > 1) + { + llarp::LogWarn("Received PublishIntroMessage with invalid relayOrder: ", + relayOrder); + replies.emplace_back(new GotIntroMessage({}, txID)); + return true; + } + + propagateToClosestFour(); + + } + else + { + bool found = false; + for (const auto& rc : closestRCs) + { + if (rc.pubkey == dht.OurKey()) + { + found = true; + break; + } + } + + if (found) + { + dht.services()->PutNode(introset); + replies.emplace_back(new GotIntroMessage({introset}, txID)); + } + else + { + // TODO: ensure this can't create a loop (reintroduce depth?) + propagateToClosestFour(); + } + } + return true; } @@ -88,11 +134,11 @@ namespace llarp return false; if(!BEncodeWriteDictMsgType(buf, "A", "I")) return false; - if(!BEncodeWriteDictList("E", exclude, buf)) - return false; if(!BEncodeWriteDictEntry("I", introset, buf)) return false; - if(!BEncodeWriteDictInt("S", depth, buf)) + if(!BEncodeWriteDictInt("O", relayOrder, buf)) + return false; + if(!BEncodeWriteDictInt("R", relayed, buf)) return false; if(!BEncodeWriteDictInt("T", txID, buf)) return false; diff --git a/llarp/dht/messages/pubintro.hpp b/llarp/dht/messages/pubintro.hpp index ac100ab22..7c9737b6f 100644 --- a/llarp/dht/messages/pubintro.hpp +++ b/llarp/dht/messages/pubintro.hpp @@ -14,20 +14,19 @@ namespace llarp { static const uint64_t MaxPropagationDepth; llarp::service::EncryptedIntroSet introset; - std::vector< Key_t > exclude; - uint64_t depth = 0; + bool relayed = false; + uint64_t relayOrder = 0; uint64_t txID = 0; PublishIntroMessage() : IMessage({}) { } - PublishIntroMessage(const llarp::service::EncryptedIntroSet& i, - uint64_t tx, uint64_t s, - std::vector< Key_t > _exclude = {}) + PublishIntroMessage(const llarp::service::EncryptedIntroSet& introset_, + uint64_t tx, bool relayed_, uint64_t relayOrder_) : IMessage({}) - , introset(i) - , exclude(std::move(_exclude)) - , depth(s) + , introset(introset_) + , relayed(relayed_) + , relayOrder(relayOrder_) , txID(tx) { } diff --git a/llarp/dht/publishservicejob.cpp b/llarp/dht/publishservicejob.cpp index 174d5fb2b..045c56eaa 100644 --- a/llarp/dht/publishservicejob.cpp +++ b/llarp/dht/publishservicejob.cpp @@ -9,14 +9,13 @@ namespace llarp namespace dht { PublishServiceJob::PublishServiceJob(const TXOwner &asker, - const service::EncryptedIntroSet &I, - AbstractContext *ctx, uint64_t s, - std::set< Key_t > exclude) + const service::EncryptedIntroSet &introset_, + AbstractContext *ctx, bool relayed_, uint64_t relayOrder_) : TX< Key_t, service::EncryptedIntroSet >( - asker, Key_t{I.derivedSigningKey}, ctx) - , S(s) - , dontTell(std::move(exclude)) - , introset(I) + asker, Key_t{introset_.derivedSigningKey}, ctx) + , relayed(relayed_) + , relayOrder(relayOrder_) + , introset(introset_) { } @@ -36,14 +35,9 @@ namespace llarp void PublishServiceJob::Start(const TXOwner &peer) { - std::vector< Key_t > exclude; - for(const auto &router : dontTell) - { - exclude.push_back(router); - } parent->DHTSendTo( peer.node.as_array(), - new PublishIntroMessage(introset, peer.txid, S, exclude)); + new PublishIntroMessage(introset, peer.txid, relayed, relayOrder)); } } // namespace dht } // namespace llarp diff --git a/llarp/dht/publishservicejob.hpp b/llarp/dht/publishservicejob.hpp index b53d1ebb7..7c4bfd3ba 100644 --- a/llarp/dht/publishservicejob.hpp +++ b/llarp/dht/publishservicejob.hpp @@ -14,14 +14,13 @@ namespace llarp { struct PublishServiceJob : public TX< Key_t, service::EncryptedIntroSet > { - uint64_t S; - std::set< Key_t > dontTell; + bool relayed; + uint64_t relayOrder; service::EncryptedIntroSet introset; PublishServiceJob(const TXOwner &asker, const service::EncryptedIntroSet &introset, - AbstractContext *ctx, uint64_t s, - std::set< Key_t > exclude); + AbstractContext *ctx, bool relayed, uint64_t relayOrder); bool Validate(const service::EncryptedIntroSet &introset) const override; diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index c8b51bd15..466c39e70 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -468,7 +468,7 @@ namespace llarp size_t published = 0; for(const auto& path : paths) { - if(PublishIntroSetVia(i, r, path)) + if(PublishIntroSetVia(i, r, path, published)) { published++; } @@ -480,11 +480,13 @@ namespace llarp { EncryptedIntroSet m_IntroSet; Endpoint* m_Endpoint; + uint64_t m_relayOrder; PublishIntroSetJob(Endpoint* parent, uint64_t id, - EncryptedIntroSet introset) + EncryptedIntroSet introset, uint64_t relayOrder) : IServiceLookup(parent, id, "PublishIntroSet") , m_IntroSet(std::move(introset)) , m_Endpoint(parent) + , m_relayOrder(relayOrder) { } @@ -493,7 +495,7 @@ namespace llarp { auto msg = std::make_shared< routing::DHTMessage >(); msg->M.emplace_back( - std::make_unique< dht::PublishIntroMessage >(m_IntroSet, txid, 5)); + std::make_unique< dht::PublishIntroMessage >(m_IntroSet, txid, true, m_relayOrder)); return msg; } @@ -526,9 +528,9 @@ namespace llarp bool Endpoint::PublishIntroSetVia(const EncryptedIntroSet& i, AbstractRouter* r, - path::Path_ptr path) + path::Path_ptr path, uint64_t relayOrder) { - auto job = new PublishIntroSetJob(this, GenTXID(), i); + auto job = new PublishIntroSetJob(this, GenTXID(), i, relayOrder); if(job->SendRequestViaPath(path, r)) { m_state->m_LastPublishAttempt = Now(); diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index 505da80b3..37efec7f2 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -172,7 +172,7 @@ namespace llarp bool PublishIntroSetVia(const EncryptedIntroSet& i, AbstractRouter* r, - path::Path_ptr p); + path::Path_ptr p, uint64_t relayOrder); bool HandleGotIntroMessage( diff --git a/test/dht/mock_context.hpp b/test/dht/mock_context.hpp index 76f364051..c39830dac 100644 --- a/test/dht/mock_context.hpp +++ b/test/dht/mock_context.hpp @@ -62,8 +62,7 @@ namespace llarp MOCK_METHOD6(PropagateIntroSetTo, void(const dht::Key_t& source, uint64_t sourceTX, const service::EncryptedIntroSet& introset, - const dht::Key_t& peer, uint64_t S, - const std::set< dht::Key_t >& exclude)); + const dht::Key_t& peer, bool relayed, uint64_t relayOrder)); MOCK_METHOD3(Init, void(const dht::Key_t&, AbstractRouter*, llarp_time_t));