diff --git a/include/llarp/dht/context.hpp b/include/llarp/dht/context.hpp index 26b3e97c3..ae646e824 100644 --- a/include/llarp/dht/context.hpp +++ b/include/llarp/dht/context.hpp @@ -58,7 +58,7 @@ namespace llarp const llarp::PathID_t& path, const Key_t& askpeer); std::set< service::IntroSet > - FindIntroSetsWithTag(const service::Tag& tag); + FindRandomIntroSetsWithTag(const service::Tag& tag, size_t max = 2); void LookupRouterRelayed(const Key_t& requester, uint64_t txid, @@ -68,6 +68,9 @@ namespace llarp bool RelayRequestForPath(const llarp::PathID_t& localPath, const IMessage* msg); + + void + PropagateIntroSetTo(const service::IntroSet & introset, const Key_t & peer, uint64_t S); void Init(const Key_t& us, llarp_router* router); diff --git a/include/llarp/dht/messages/pubintro.hpp b/include/llarp/dht/messages/pubintro.hpp index 9844ae710..5bea3e673 100644 --- a/include/llarp/dht/messages/pubintro.hpp +++ b/include/llarp/dht/messages/pubintro.hpp @@ -17,10 +17,11 @@ namespace llarp { } - PublishIntroMessage(const llarp::service::IntroSet& i, uint64_t tx) + PublishIntroMessage(const llarp::service::IntroSet& i, uint64_t tx, uint64_t s) : IMessage({}), txID(tx) { I = i; + S = s; } ~PublishIntroMessage(); diff --git a/include/llarp/iwp/transit_message.hpp b/include/llarp/iwp/transit_message.hpp index 6537c410d..c945ed8b8 100644 --- a/include/llarp/iwp/transit_message.hpp +++ b/include/llarp/iwp/transit_message.hpp @@ -17,7 +17,8 @@ struct transit_message std::unordered_map< byte_t, fragment_t > frags; fragment_t lastfrag; - llarp_time_t lastAck = 0; + llarp_time_t lastAck = 0; + llarp_time_t lastRetransmit = 0; llarp_time_t started; void diff --git a/include/llarp/pathset.hpp b/include/llarp/pathset.hpp index 33097b00d..bac19abc1 100644 --- a/include/llarp/pathset.hpp +++ b/include/llarp/pathset.hpp @@ -65,8 +65,9 @@ namespace llarp /// return true if we should publish a new hidden service descriptor virtual bool - ShouldPublishDescriptors() const + ShouldPublishDescriptors(llarp_time_t now) const { + (void)now; return false; } diff --git a/include/llarp/service/IntroSet.hpp b/include/llarp/service/IntroSet.hpp index 4da530de9..9ddb7fc28 100644 --- a/include/llarp/service/IntroSet.hpp +++ b/include/llarp/service/IntroSet.hpp @@ -73,7 +73,7 @@ namespace llarp } bool - HasExpiredIntros() const; + HasExpiredIntros(llarp_time_t now) const; bool IsExpired(llarp_time_t now) const; diff --git a/include/llarp/service/endpoint.hpp b/include/llarp/service/endpoint.hpp index e59806d56..ce30bf169 100644 --- a/include/llarp/service/endpoint.hpp +++ b/include/llarp/service/endpoint.hpp @@ -23,7 +23,7 @@ namespace llarp SetOption(const std::string& k, const std::string& v); void - Tick(); + Tick(llarp_time_t now); bool Start(); @@ -32,7 +32,7 @@ namespace llarp Name() const; bool - ShouldPublishDescriptors() const; + ShouldPublishDescriptors(llarp_time_t now) const; bool PublishIntroSet(llarp_router* r); @@ -147,11 +147,14 @@ namespace llarp ~CachedTagResult(); + void + Expire(llarp_time_t now); + bool ShouldRefresh(llarp_time_t now) const { return result.size() == 0 - && (now - lastModified >= TTL && pendingTX == 0); + || (now - lastModified >= TTL && pendingTX == 0); } llarp::routing::IMessage* diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index 373a2cca9..cdeb93b65 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -50,6 +50,7 @@ namespace llarp uint64_t txid; PathID_t pathID; llarp_router *m_router; + std::set< service::IntroSet > localtags; PathTagLookupJob(llarp_router *r, const PathID_t &localpath, uint64_t tx) : txid(tx), pathID(localpath), m_router(r) { @@ -62,8 +63,12 @@ namespace llarp m_router->paths.GetByUpstream(m_router->dht->impl.OurKey(), pathID); if(path) { + for(const auto &introset : results) + { + localtags.insert(introset); + } llarp::routing::DHTMessage msg; - msg.M.push_back(new llarp::dht::GotIntroMessage(results, txid)); + msg.M.push_back(new llarp::dht::GotIntroMessage(localtags, txid)); path->SendRoutingMessage(&msg, m_router); } else @@ -74,6 +79,17 @@ namespace llarp delete this; } }; + + void + Context::PropagateIntroSetTo(const service::IntroSet & introset, const Key_t & peer, uint64_t S) + { + llarp::LogInfo("Propagate Introset for ", introset.A, " to ", peer); + auto id = ++ids; + auto msg = new llarp::DHTImmeidateMessage(peer); + msg->msgs.push_back(new PublishIntroMessage(introset, id, S)); + router->SendToOrQueue(peer, msg); + } + void Context::LookupTagForPath(const service::Tag &tag, uint64_t txid, @@ -84,26 +100,51 @@ namespace llarp ownerKey.node = askpeer; ownerKey.txid = id; PathTagLookupJob *j = new PathTagLookupJob(router, path, txid); + j->localtags = FindRandomIntroSetsWithTag(tag); SearchJob job( OurKey(), txid, std::bind(&PathTagLookupJob::OnResult, j, std::placeholders::_1)); pendingTX[ownerKey] = job; - auto msg = new llarp::DHTImmeidateMessage(askpeer); - auto dhtmsg = new FindIntroMessage(tag, id); - dhtmsg->iterative = true; + auto msg = new llarp::DHTImmeidateMessage(askpeer); + auto dhtmsg = new FindIntroMessage(tag, id); msg->msgs.push_back(dhtmsg); router->SendToOrQueue(askpeer, msg); } std::set< service::IntroSet > - Context::FindIntroSetsWithTag(const service::Tag &tag) + Context::FindRandomIntroSetsWithTag(const service::Tag &tag, size_t max) { std::set< service::IntroSet > found; - for(const auto &itr : services->nodes) + auto &nodes = services->nodes; + if(nodes.size() == 0) + return found; + + auto itr = nodes.begin(); + // start at random middle point + auto start = rand() % nodes.size(); + std::advance(itr, start); + auto end = itr; + while(itr != nodes.end()) { - if(itr.second.introset.topic == tag) - found.insert(itr.second.introset); + if(itr->second.introset.topic == tag) + { + found.insert(itr->second.introset); + if(found.size() == max) + return found; + } + ++itr; + } + itr = nodes.begin(); + while(itr != end) + { + if(itr->second.introset.topic == tag) + { + found.insert(itr->second.introset); + if(found.size() == max) + return found; + } + ++itr; } return found; } @@ -278,18 +319,18 @@ namespace llarp auto id = ++ids; if(txid == 0) txid = id; - TXOwner ownerKey; ownerKey.node = askpeer; ownerKey.txid = id; - IntroSetInformJob *j = new IntroSetInformJob(router, askpeer, id); + IntroSetInformJob *j = new IntroSetInformJob(router, whoasked, txid); SearchJob job( whoasked, txid, std::bind(&IntroSetInformJob::OnResult, j, std::placeholders::_1)); pendingTX[ownerKey] = job; - auto msg = new llarp::DHTImmeidateMessage(askpeer); - auto dhtmsg = new FindIntroMessage(tag, id); + auto msg = new llarp::DHTImmeidateMessage(askpeer); + auto dhtmsg = new FindIntroMessage(tag, id); + dhtmsg->iterative = iterative; msg->msgs.push_back(dhtmsg); router->SendToOrQueue(askpeer, msg); } @@ -306,7 +347,7 @@ namespace llarp TXOwner ownerKey; ownerKey.node = askpeer; ownerKey.txid = id; - IntroSetInformJob *j = new IntroSetInformJob(router, askpeer, id); + IntroSetInformJob *j = new IntroSetInformJob(router, askpeer, txid); SearchJob job( whoasked, txid, addr, excludes, std::bind(&IntroSetInformJob::OnResult, j, std::placeholders::_1)); diff --git a/llarp/dht/find_intro.cpp b/llarp/dht/find_intro.cpp index 2c939bdcb..f29e4b2ec 100644 --- a/llarp/dht/find_intro.cpp +++ b/llarp/dht/find_intro.cpp @@ -146,29 +146,18 @@ namespace llarp } else { - auto introsets = dht.FindIntroSetsWithTag(N); - if(introsets.size()) + if(iterative) { + auto introsets = dht.FindRandomIntroSetsWithTag(N, 8); + // we are iterative and don't have it, reply with a direct reply replies.push_back(new GotIntroMessage(introsets, T)); } else { - if(iterative) + // tag lookup + if(dht.nodes->FindCloseExcluding(N.Key(), peer, exclude)) { - // we are iterative and don't have it, reply with a direct reply - replies.push_back(new GotIntroMessage({}, T)); - } - else - { - // tag lookup - if(dht.nodes->FindCloseExcluding(N.Key(), peer, exclude)) - { - dht.LookupTag(N, From, T, peer); - } - else - { - llarp::LogWarn("no closer peers for tag ", N.ToString()); - } + dht.LookupTag(N, From, T, peer, true); } } } diff --git a/llarp/dht/got_intro.cpp b/llarp/dht/got_intro.cpp index 7fbe62fa6..f5cc830fc 100644 --- a/llarp/dht/got_intro.cpp +++ b/llarp/dht/got_intro.cpp @@ -61,6 +61,8 @@ namespace llarp } else { + llarp::LogWarn("got GIM from ", From, + " with no previous pending transaction, txid=", T); return false; } } diff --git a/llarp/dht/publish_intro.cpp b/llarp/dht/publish_intro.cpp index 92a700863..17557881c 100644 --- a/llarp/dht/publish_intro.cpp +++ b/llarp/dht/publish_intro.cpp @@ -39,6 +39,11 @@ namespace llarp PublishIntroMessage::HandleMessage(llarp_dht_context *ctx, std::vector< IMessage * > &replies) const { + if(S > 5) + { + llarp::LogWarn("invalid S value ", S, " > 5"); + return false; + } auto &dht = ctx->impl; if(!I.VerifySignature(&dht.router->crypto)) { @@ -60,16 +65,10 @@ namespace llarp dht.services->PutNode(I); replies.push_back(new GotIntroMessage({I}, txID)); Key_t peer; - std::set< Key_t > exclude; - exclude.insert(From); - if(txID && dht.nodes->FindCloseExcluding(addr, peer, exclude)) + std::set< Key_t > exclude = {dht.OurKey(), From}; + if(S && dht.nodes->FindCloseExcluding(addr, peer, exclude)) { - // we are not the closest to this address, send it to the closest node - llarp::LogInfo("telling closer peer ", peer, " we got an IntroSet for ", - addr); - auto msg = new llarp::DHTImmeidateMessage(peer); - msg->msgs.push_back(new PublishIntroMessage(I, 0)); - dht.router->SendToOrQueue(peer, msg); + dht.PropagateIntroSetTo(I, peer, S - 1); } return true; } @@ -85,11 +84,8 @@ namespace llarp return false; if(!BEncodeWriteDictInt(buf, "R", R)) return false; - if(hasS) - { - if(!BEncodeWriteDictInt(buf, "S", S)) - return false; - } + if(!BEncodeWriteDictInt(buf, "S", S)) + return false; if(!BEncodeWriteDictInt(buf, "T", txID)) return false; if(!BEncodeWriteDictInt(buf, "V", LLARP_PROTO_VERSION)) diff --git a/llarp/iwp/transit_message.cpp b/llarp/iwp/transit_message.cpp index a5eae879d..dd30911f8 100644 --- a/llarp/iwp/transit_message.cpp +++ b/llarp/iwp/transit_message.cpp @@ -67,7 +67,7 @@ transit_message::should_send_ack(llarp_time_t now) const { if(msginfo.numfrags() == 0) return true; - return now - lastAck > 250; + return now - lastRetransmit > 250; } bool @@ -123,6 +123,7 @@ transit_message::retransmit_frags(sendqueue_t &queue, byte_t flags) body_ptr[8] = frag.first; memcpy(body_ptr + 9, frag.second.data(), fragsize); } + lastRetransmit = llarp_time_now_ms(); } bool diff --git a/llarp/service.cpp b/llarp/service.cpp index cd12755a7..370d04ce7 100644 --- a/llarp/service.cpp +++ b/llarp/service.cpp @@ -80,9 +80,8 @@ namespace llarp } bool - IntroSet::HasExpiredIntros() const + IntroSet::HasExpiredIntros(llarp_time_t now) const { - auto now = llarp_time_now_ms(); for(const auto& i : I) if(now >= i.expiresAt) return true; diff --git a/llarp/service/context.cpp b/llarp/service/context.cpp index bfb1303b2..4ea7a299c 100644 --- a/llarp/service/context.cpp +++ b/llarp/service/context.cpp @@ -20,14 +20,16 @@ namespace llarp void Context::Tick() { + auto now = llarp_time_now_ms(); auto itr = m_Endpoints.begin(); while(itr != m_Endpoints.end()) { - itr->second->Tick(); + itr->second->Tick(now); ++itr; } } + bool Context::AddEndpoint(const Config::section_t &conf) { diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 711ceef59..22aec639a 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -10,6 +10,7 @@ namespace llarp Endpoint::Endpoint(const std::string& name, llarp_router* r) : llarp_pathbuilder_context(r, r->dht, 2), m_Router(r), m_Name(name) { + m_Tag.Zero(); } bool @@ -32,9 +33,9 @@ namespace llarp } void - Endpoint::Tick() + Endpoint::Tick(llarp_time_t now) { - if(ShouldPublishDescriptors()) + if(ShouldPublishDescriptors(now)) { std::list< Introduction > I; if(!GetCurrentIntroductions(I)) @@ -43,9 +44,8 @@ namespace llarp " because we couldn't get any introductions"); return; } - m_IntroSet.I = I; - if(!m_Tag.IsZero()) - m_IntroSet.topic = m_Tag; + m_IntroSet.I = I; + m_IntroSet.topic = m_Tag; if(!m_Identity.SignIntroSet(m_IntroSet, &m_Router->crypto)) { llarp::LogWarn("failed to sign introset for endpoint ", Name()); @@ -60,25 +60,22 @@ namespace llarp llarp::LogWarn("failed to publish intro set for endpoint ", Name()); } } - auto now = llarp_time_now_ms(); for(const auto& tag : m_PrefetchTags) { auto itr = m_PrefetchedTags.find(tag); if(itr == m_PrefetchedTags.end()) { - // put cached result will try next iteration - m_PrefetchedTags.emplace(tag, tag); + itr = m_PrefetchedTags.emplace(tag, tag).first; } - else if(itr->second.ShouldRefresh(now)) + itr->second.Expire(now); + if(itr->second.ShouldRefresh(now)) { auto path = PickRandomEstablishedPath(); if(path) { - itr->second.pendingTX = GenTXID(); - if(itr->second.SendRequestViaPath(path, m_Router)) - { - m_PendingLookups[itr->second.pendingTX] = &itr->second; - } + itr->second.pendingTX = GenTXID(); + m_PendingLookups[itr->second.pendingTX] = &itr->second; + itr->second.SendRequestViaPath(path, m_Router); } } } @@ -106,17 +103,18 @@ namespace llarp std::set< IntroSet > remote; for(const auto& introset : msg->I) { - if(m_Identity.pub == introset.A) + if(!introset.VerifySignature(crypto)) { - if(!introset.VerifySignature(crypto)) + llarp::LogInfo("invalid introset signature for ", introset, + " on endpoint ", Name()); + if(m_Identity.pub == introset.A) { - llarp::LogWarn( - "invalid signature in got intro message for service endpoint ", - Name()); - IntroSetPublishFail(); - return false; } + return false; + } + if(m_Identity.pub == introset.A) + { llarp::LogInfo( "got introset publish confirmation for hidden service endpoint ", Name()); @@ -167,13 +165,34 @@ namespace llarp bool Endpoint::CachedTagResult::HandleResponse( - const std::set< IntroSet >& results) + const std::set< IntroSet >& introsets) { - llarp::LogInfo("Tag result for ", tag.ToString(), " got ", results.size(), - " results"); + llarp::LogInfo("Tag result for ", tag.ToString(), " got ", + introsets.size(), " results"); + lastModified = llarp_time_now_ms(); + pendingTX = 0; + for(const auto& introset : introsets) + result.insert(introset); return true; } + void + Endpoint::CachedTagResult::Expire(llarp_time_t now) + { + auto itr = result.begin(); + while(itr != result.end()) + { + if(itr->HasExpiredIntros(now)) + { + itr = result.erase(itr); + } + else + { + ++itr; + } + } + } + llarp::routing::IMessage* Endpoint::CachedTagResult::BuildRequestMessage() { @@ -191,7 +210,7 @@ namespace llarp m_CurrentPublishTX = rand(); llarp::routing::DHTMessage msg; msg.M.push_back(new llarp::dht::PublishIntroMessage( - m_IntroSet, m_CurrentPublishTX)); + m_IntroSet, m_CurrentPublishTX, 3)); if(path->SendRoutingMessage(&msg, r)) { m_LastPublishAttempt = llarp_time_now_ms(); @@ -211,10 +230,9 @@ namespace llarp } bool - Endpoint::ShouldPublishDescriptors() const + Endpoint::ShouldPublishDescriptors(llarp_time_t now) const { - auto now = llarp_time_now_ms(); - if(m_IntroSet.HasExpiredIntros()) + if(m_IntroSet.HasExpiredIntros(now)) return m_CurrentPublishTX == 0 && now - m_LastPublishAttempt >= INTROSET_PUBLISH_RETRY_INTERVAL; return m_CurrentPublishTX == 0 @@ -244,6 +262,7 @@ namespace llarp const llarp::dht::GotIntroMessage* msg) { // TODO: implement me + return false; } } // namespace service