dht fixups

pull/6/head^2
Jeff Becker 6 years ago
parent 7c518bc4da
commit 994a5fe928

@ -58,7 +58,7 @@ namespace llarp
const llarp::PathID_t& path, const Key_t& askpeer); const llarp::PathID_t& path, const Key_t& askpeer);
std::set< service::IntroSet > std::set< service::IntroSet >
FindIntroSetsWithTag(const service::Tag& tag); FindRandomIntroSetsWithTag(const service::Tag& tag, size_t max = 2);
void void
LookupRouterRelayed(const Key_t& requester, uint64_t txid, LookupRouterRelayed(const Key_t& requester, uint64_t txid,
@ -68,6 +68,9 @@ namespace llarp
bool bool
RelayRequestForPath(const llarp::PathID_t& localPath, RelayRequestForPath(const llarp::PathID_t& localPath,
const IMessage* msg); const IMessage* msg);
void
PropagateIntroSetTo(const service::IntroSet & introset, const Key_t & peer, uint64_t S);
void void
Init(const Key_t& us, llarp_router* router); Init(const Key_t& us, llarp_router* router);

@ -17,10 +17,11 @@ namespace llarp
{ {
} }
PublishIntroMessage(const llarp::service::IntroSet& i, uint64_t tx) PublishIntroMessage(const llarp::service::IntroSet& i, uint64_t tx, uint64_t s)
: IMessage({}), txID(tx) : IMessage({}), txID(tx)
{ {
I = i; I = i;
S = s;
} }
~PublishIntroMessage(); ~PublishIntroMessage();

@ -17,7 +17,8 @@ struct transit_message
std::unordered_map< byte_t, fragment_t > frags; std::unordered_map< byte_t, fragment_t > frags;
fragment_t lastfrag; fragment_t lastfrag;
llarp_time_t lastAck = 0; llarp_time_t lastAck = 0;
llarp_time_t lastRetransmit = 0;
llarp_time_t started; llarp_time_t started;
void void

@ -65,8 +65,9 @@ namespace llarp
/// return true if we should publish a new hidden service descriptor /// return true if we should publish a new hidden service descriptor
virtual bool virtual bool
ShouldPublishDescriptors() const ShouldPublishDescriptors(llarp_time_t now) const
{ {
(void)now;
return false; return false;
} }

@ -73,7 +73,7 @@ namespace llarp
} }
bool bool
HasExpiredIntros() const; HasExpiredIntros(llarp_time_t now) const;
bool bool
IsExpired(llarp_time_t now) const; IsExpired(llarp_time_t now) const;

@ -23,7 +23,7 @@ namespace llarp
SetOption(const std::string& k, const std::string& v); SetOption(const std::string& k, const std::string& v);
void void
Tick(); Tick(llarp_time_t now);
bool bool
Start(); Start();
@ -32,7 +32,7 @@ namespace llarp
Name() const; Name() const;
bool bool
ShouldPublishDescriptors() const; ShouldPublishDescriptors(llarp_time_t now) const;
bool bool
PublishIntroSet(llarp_router* r); PublishIntroSet(llarp_router* r);
@ -147,11 +147,14 @@ namespace llarp
~CachedTagResult(); ~CachedTagResult();
void
Expire(llarp_time_t now);
bool bool
ShouldRefresh(llarp_time_t now) const ShouldRefresh(llarp_time_t now) const
{ {
return result.size() == 0 return result.size() == 0
&& (now - lastModified >= TTL && pendingTX == 0); || (now - lastModified >= TTL && pendingTX == 0);
} }
llarp::routing::IMessage* llarp::routing::IMessage*

@ -50,6 +50,7 @@ namespace llarp
uint64_t txid; uint64_t txid;
PathID_t pathID; PathID_t pathID;
llarp_router *m_router; llarp_router *m_router;
std::set< service::IntroSet > localtags;
PathTagLookupJob(llarp_router *r, const PathID_t &localpath, uint64_t tx) PathTagLookupJob(llarp_router *r, const PathID_t &localpath, uint64_t tx)
: txid(tx), pathID(localpath), m_router(r) : txid(tx), pathID(localpath), m_router(r)
{ {
@ -62,8 +63,12 @@ namespace llarp
m_router->paths.GetByUpstream(m_router->dht->impl.OurKey(), pathID); m_router->paths.GetByUpstream(m_router->dht->impl.OurKey(), pathID);
if(path) if(path)
{ {
for(const auto &introset : results)
{
localtags.insert(introset);
}
llarp::routing::DHTMessage msg; llarp::routing::DHTMessage msg;
msg.M.push_back(new llarp::dht::GotIntroMessage(results, txid)); msg.M.push_back(new llarp::dht::GotIntroMessage(localtags, txid));
path->SendRoutingMessage(&msg, m_router); path->SendRoutingMessage(&msg, m_router);
} }
else else
@ -74,6 +79,17 @@ namespace llarp
delete this; delete this;
} }
}; };
void
Context::PropagateIntroSetTo(const service::IntroSet & introset, const Key_t & peer, uint64_t S)
{
llarp::LogInfo("Propagate Introset for ", introset.A, " to ", peer);
auto id = ++ids;
auto msg = new llarp::DHTImmeidateMessage(peer);
msg->msgs.push_back(new PublishIntroMessage(introset, id, S));
router->SendToOrQueue(peer, msg);
}
void void
Context::LookupTagForPath(const service::Tag &tag, uint64_t txid, Context::LookupTagForPath(const service::Tag &tag, uint64_t txid,
@ -84,26 +100,51 @@ namespace llarp
ownerKey.node = askpeer; ownerKey.node = askpeer;
ownerKey.txid = id; ownerKey.txid = id;
PathTagLookupJob *j = new PathTagLookupJob(router, path, txid); PathTagLookupJob *j = new PathTagLookupJob(router, path, txid);
j->localtags = FindRandomIntroSetsWithTag(tag);
SearchJob job( SearchJob job(
OurKey(), txid, OurKey(), txid,
std::bind(&PathTagLookupJob::OnResult, j, std::placeholders::_1)); std::bind(&PathTagLookupJob::OnResult, j, std::placeholders::_1));
pendingTX[ownerKey] = job; pendingTX[ownerKey] = job;
auto msg = new llarp::DHTImmeidateMessage(askpeer); auto msg = new llarp::DHTImmeidateMessage(askpeer);
auto dhtmsg = new FindIntroMessage(tag, id); auto dhtmsg = new FindIntroMessage(tag, id);
dhtmsg->iterative = true;
msg->msgs.push_back(dhtmsg); msg->msgs.push_back(dhtmsg);
router->SendToOrQueue(askpeer, msg); router->SendToOrQueue(askpeer, msg);
} }
std::set< service::IntroSet > std::set< service::IntroSet >
Context::FindIntroSetsWithTag(const service::Tag &tag) Context::FindRandomIntroSetsWithTag(const service::Tag &tag, size_t max)
{ {
std::set< service::IntroSet > found; std::set< service::IntroSet > found;
for(const auto &itr : services->nodes) auto &nodes = services->nodes;
if(nodes.size() == 0)
return found;
auto itr = nodes.begin();
// start at random middle point
auto start = rand() % nodes.size();
std::advance(itr, start);
auto end = itr;
while(itr != nodes.end())
{ {
if(itr.second.introset.topic == tag) if(itr->second.introset.topic == tag)
found.insert(itr.second.introset); {
found.insert(itr->second.introset);
if(found.size() == max)
return found;
}
++itr;
}
itr = nodes.begin();
while(itr != end)
{
if(itr->second.introset.topic == tag)
{
found.insert(itr->second.introset);
if(found.size() == max)
return found;
}
++itr;
} }
return found; return found;
} }
@ -278,18 +319,18 @@ namespace llarp
auto id = ++ids; auto id = ++ids;
if(txid == 0) if(txid == 0)
txid = id; txid = id;
TXOwner ownerKey; TXOwner ownerKey;
ownerKey.node = askpeer; ownerKey.node = askpeer;
ownerKey.txid = id; ownerKey.txid = id;
IntroSetInformJob *j = new IntroSetInformJob(router, askpeer, id); IntroSetInformJob *j = new IntroSetInformJob(router, whoasked, txid);
SearchJob job( SearchJob job(
whoasked, txid, whoasked, txid,
std::bind(&IntroSetInformJob::OnResult, j, std::placeholders::_1)); std::bind(&IntroSetInformJob::OnResult, j, std::placeholders::_1));
pendingTX[ownerKey] = job; pendingTX[ownerKey] = job;
auto msg = new llarp::DHTImmeidateMessage(askpeer); auto msg = new llarp::DHTImmeidateMessage(askpeer);
auto dhtmsg = new FindIntroMessage(tag, id); auto dhtmsg = new FindIntroMessage(tag, id);
dhtmsg->iterative = iterative;
msg->msgs.push_back(dhtmsg); msg->msgs.push_back(dhtmsg);
router->SendToOrQueue(askpeer, msg); router->SendToOrQueue(askpeer, msg);
} }
@ -306,7 +347,7 @@ namespace llarp
TXOwner ownerKey; TXOwner ownerKey;
ownerKey.node = askpeer; ownerKey.node = askpeer;
ownerKey.txid = id; ownerKey.txid = id;
IntroSetInformJob *j = new IntroSetInformJob(router, askpeer, id); IntroSetInformJob *j = new IntroSetInformJob(router, askpeer, txid);
SearchJob job( SearchJob job(
whoasked, txid, addr, excludes, whoasked, txid, addr, excludes,
std::bind(&IntroSetInformJob::OnResult, j, std::placeholders::_1)); std::bind(&IntroSetInformJob::OnResult, j, std::placeholders::_1));

@ -146,29 +146,18 @@ namespace llarp
} }
else else
{ {
auto introsets = dht.FindIntroSetsWithTag(N); if(iterative)
if(introsets.size())
{ {
auto introsets = dht.FindRandomIntroSetsWithTag(N, 8);
// we are iterative and don't have it, reply with a direct reply
replies.push_back(new GotIntroMessage(introsets, T)); replies.push_back(new GotIntroMessage(introsets, T));
} }
else else
{ {
if(iterative) // tag lookup
if(dht.nodes->FindCloseExcluding(N.Key(), peer, exclude))
{ {
// we are iterative and don't have it, reply with a direct reply dht.LookupTag(N, From, T, peer, true);
replies.push_back(new GotIntroMessage({}, T));
}
else
{
// tag lookup
if(dht.nodes->FindCloseExcluding(N.Key(), peer, exclude))
{
dht.LookupTag(N, From, T, peer);
}
else
{
llarp::LogWarn("no closer peers for tag ", N.ToString());
}
} }
} }
} }

@ -61,6 +61,8 @@ namespace llarp
} }
else else
{ {
llarp::LogWarn("got GIM from ", From,
" with no previous pending transaction, txid=", T);
return false; return false;
} }
} }

@ -39,6 +39,11 @@ namespace llarp
PublishIntroMessage::HandleMessage(llarp_dht_context *ctx, PublishIntroMessage::HandleMessage(llarp_dht_context *ctx,
std::vector< IMessage * > &replies) const std::vector< IMessage * > &replies) const
{ {
if(S > 5)
{
llarp::LogWarn("invalid S value ", S, " > 5");
return false;
}
auto &dht = ctx->impl; auto &dht = ctx->impl;
if(!I.VerifySignature(&dht.router->crypto)) if(!I.VerifySignature(&dht.router->crypto))
{ {
@ -60,16 +65,10 @@ namespace llarp
dht.services->PutNode(I); dht.services->PutNode(I);
replies.push_back(new GotIntroMessage({I}, txID)); replies.push_back(new GotIntroMessage({I}, txID));
Key_t peer; Key_t peer;
std::set< Key_t > exclude; std::set< Key_t > exclude = {dht.OurKey(), From};
exclude.insert(From); if(S && dht.nodes->FindCloseExcluding(addr, peer, exclude))
if(txID && dht.nodes->FindCloseExcluding(addr, peer, exclude))
{ {
// we are not the closest to this address, send it to the closest node dht.PropagateIntroSetTo(I, peer, S - 1);
llarp::LogInfo("telling closer peer ", peer, " we got an IntroSet for ",
addr);
auto msg = new llarp::DHTImmeidateMessage(peer);
msg->msgs.push_back(new PublishIntroMessage(I, 0));
dht.router->SendToOrQueue(peer, msg);
} }
return true; return true;
} }
@ -85,11 +84,8 @@ namespace llarp
return false; return false;
if(!BEncodeWriteDictInt(buf, "R", R)) if(!BEncodeWriteDictInt(buf, "R", R))
return false; return false;
if(hasS) if(!BEncodeWriteDictInt(buf, "S", S))
{ return false;
if(!BEncodeWriteDictInt(buf, "S", S))
return false;
}
if(!BEncodeWriteDictInt(buf, "T", txID)) if(!BEncodeWriteDictInt(buf, "T", txID))
return false; return false;
if(!BEncodeWriteDictInt(buf, "V", LLARP_PROTO_VERSION)) if(!BEncodeWriteDictInt(buf, "V", LLARP_PROTO_VERSION))

@ -67,7 +67,7 @@ transit_message::should_send_ack(llarp_time_t now) const
{ {
if(msginfo.numfrags() == 0) if(msginfo.numfrags() == 0)
return true; return true;
return now - lastAck > 250; return now - lastRetransmit > 250;
} }
bool bool
@ -123,6 +123,7 @@ transit_message::retransmit_frags(sendqueue_t &queue, byte_t flags)
body_ptr[8] = frag.first; body_ptr[8] = frag.first;
memcpy(body_ptr + 9, frag.second.data(), fragsize); memcpy(body_ptr + 9, frag.second.data(), fragsize);
} }
lastRetransmit = llarp_time_now_ms();
} }
bool bool

@ -80,9 +80,8 @@ namespace llarp
} }
bool bool
IntroSet::HasExpiredIntros() const IntroSet::HasExpiredIntros(llarp_time_t now) const
{ {
auto now = llarp_time_now_ms();
for(const auto& i : I) for(const auto& i : I)
if(now >= i.expiresAt) if(now >= i.expiresAt)
return true; return true;

@ -20,14 +20,16 @@ namespace llarp
void void
Context::Tick() Context::Tick()
{ {
auto now = llarp_time_now_ms();
auto itr = m_Endpoints.begin(); auto itr = m_Endpoints.begin();
while(itr != m_Endpoints.end()) while(itr != m_Endpoints.end())
{ {
itr->second->Tick(); itr->second->Tick(now);
++itr; ++itr;
} }
} }
bool bool
Context::AddEndpoint(const Config::section_t &conf) Context::AddEndpoint(const Config::section_t &conf)
{ {

@ -10,6 +10,7 @@ namespace llarp
Endpoint::Endpoint(const std::string& name, llarp_router* r) Endpoint::Endpoint(const std::string& name, llarp_router* r)
: llarp_pathbuilder_context(r, r->dht, 2), m_Router(r), m_Name(name) : llarp_pathbuilder_context(r, r->dht, 2), m_Router(r), m_Name(name)
{ {
m_Tag.Zero();
} }
bool bool
@ -32,9 +33,9 @@ namespace llarp
} }
void void
Endpoint::Tick() Endpoint::Tick(llarp_time_t now)
{ {
if(ShouldPublishDescriptors()) if(ShouldPublishDescriptors(now))
{ {
std::list< Introduction > I; std::list< Introduction > I;
if(!GetCurrentIntroductions(I)) if(!GetCurrentIntroductions(I))
@ -43,9 +44,8 @@ namespace llarp
" because we couldn't get any introductions"); " because we couldn't get any introductions");
return; return;
} }
m_IntroSet.I = I; m_IntroSet.I = I;
if(!m_Tag.IsZero()) m_IntroSet.topic = m_Tag;
m_IntroSet.topic = m_Tag;
if(!m_Identity.SignIntroSet(m_IntroSet, &m_Router->crypto)) if(!m_Identity.SignIntroSet(m_IntroSet, &m_Router->crypto))
{ {
llarp::LogWarn("failed to sign introset for endpoint ", Name()); llarp::LogWarn("failed to sign introset for endpoint ", Name());
@ -60,25 +60,22 @@ namespace llarp
llarp::LogWarn("failed to publish intro set for endpoint ", Name()); llarp::LogWarn("failed to publish intro set for endpoint ", Name());
} }
} }
auto now = llarp_time_now_ms();
for(const auto& tag : m_PrefetchTags) for(const auto& tag : m_PrefetchTags)
{ {
auto itr = m_PrefetchedTags.find(tag); auto itr = m_PrefetchedTags.find(tag);
if(itr == m_PrefetchedTags.end()) if(itr == m_PrefetchedTags.end())
{ {
// put cached result will try next iteration itr = m_PrefetchedTags.emplace(tag, tag).first;
m_PrefetchedTags.emplace(tag, tag);
} }
else if(itr->second.ShouldRefresh(now)) itr->second.Expire(now);
if(itr->second.ShouldRefresh(now))
{ {
auto path = PickRandomEstablishedPath(); auto path = PickRandomEstablishedPath();
if(path) if(path)
{ {
itr->second.pendingTX = GenTXID(); itr->second.pendingTX = GenTXID();
if(itr->second.SendRequestViaPath(path, m_Router)) m_PendingLookups[itr->second.pendingTX] = &itr->second;
{ itr->second.SendRequestViaPath(path, m_Router);
m_PendingLookups[itr->second.pendingTX] = &itr->second;
}
} }
} }
} }
@ -106,17 +103,18 @@ namespace llarp
std::set< IntroSet > remote; std::set< IntroSet > remote;
for(const auto& introset : msg->I) for(const auto& introset : msg->I)
{ {
if(m_Identity.pub == introset.A) if(!introset.VerifySignature(crypto))
{ {
if(!introset.VerifySignature(crypto)) llarp::LogInfo("invalid introset signature for ", introset,
" on endpoint ", Name());
if(m_Identity.pub == introset.A)
{ {
llarp::LogWarn(
"invalid signature in got intro message for service endpoint ",
Name());
IntroSetPublishFail(); IntroSetPublishFail();
return false;
} }
return false;
}
if(m_Identity.pub == introset.A)
{
llarp::LogInfo( llarp::LogInfo(
"got introset publish confirmation for hidden service endpoint ", "got introset publish confirmation for hidden service endpoint ",
Name()); Name());
@ -167,13 +165,34 @@ namespace llarp
bool bool
Endpoint::CachedTagResult::HandleResponse( Endpoint::CachedTagResult::HandleResponse(
const std::set< IntroSet >& results) const std::set< IntroSet >& introsets)
{ {
llarp::LogInfo("Tag result for ", tag.ToString(), " got ", results.size(), llarp::LogInfo("Tag result for ", tag.ToString(), " got ",
" results"); introsets.size(), " results");
lastModified = llarp_time_now_ms();
pendingTX = 0;
for(const auto& introset : introsets)
result.insert(introset);
return true; return true;
} }
void
Endpoint::CachedTagResult::Expire(llarp_time_t now)
{
auto itr = result.begin();
while(itr != result.end())
{
if(itr->HasExpiredIntros(now))
{
itr = result.erase(itr);
}
else
{
++itr;
}
}
}
llarp::routing::IMessage* llarp::routing::IMessage*
Endpoint::CachedTagResult::BuildRequestMessage() Endpoint::CachedTagResult::BuildRequestMessage()
{ {
@ -191,7 +210,7 @@ namespace llarp
m_CurrentPublishTX = rand(); m_CurrentPublishTX = rand();
llarp::routing::DHTMessage msg; llarp::routing::DHTMessage msg;
msg.M.push_back(new llarp::dht::PublishIntroMessage( msg.M.push_back(new llarp::dht::PublishIntroMessage(
m_IntroSet, m_CurrentPublishTX)); m_IntroSet, m_CurrentPublishTX, 3));
if(path->SendRoutingMessage(&msg, r)) if(path->SendRoutingMessage(&msg, r))
{ {
m_LastPublishAttempt = llarp_time_now_ms(); m_LastPublishAttempt = llarp_time_now_ms();
@ -211,10 +230,9 @@ namespace llarp
} }
bool bool
Endpoint::ShouldPublishDescriptors() const Endpoint::ShouldPublishDescriptors(llarp_time_t now) const
{ {
auto now = llarp_time_now_ms(); if(m_IntroSet.HasExpiredIntros(now))
if(m_IntroSet.HasExpiredIntros())
return m_CurrentPublishTX == 0 return m_CurrentPublishTX == 0
&& now - m_LastPublishAttempt >= INTROSET_PUBLISH_RETRY_INTERVAL; && now - m_LastPublishAttempt >= INTROSET_PUBLISH_RETRY_INTERVAL;
return m_CurrentPublishTX == 0 return m_CurrentPublishTX == 0
@ -244,6 +262,7 @@ namespace llarp
const llarp::dht::GotIntroMessage* msg) const llarp::dht::GotIntroMessage* msg)
{ {
// TODO: implement me // TODO: implement me
return false; return false;
} }
} // namespace service } // namespace service

Loading…
Cancel
Save