lokinet/llarp/dht/context.cpp

834 lines
24 KiB
C++
Raw Normal View History

2018-12-12 00:48:54 +00:00
#include <dht/context.hpp>
#include <dht/messages/gotrouter.hpp>
#include <messages/dht.hpp>
#include <messages/dht_immediate.hpp>
#include <router.hpp>
#include <vector>
namespace llarp
{
namespace dht
{
Context::Context()
{
randombytes((byte_t *)&ids, sizeof(uint64_t));
2018-08-29 20:40:26 +00:00
allowTransit = false;
}
Context::~Context()
{
if(nodes)
delete nodes;
if(services)
delete services;
}
void
2018-09-06 20:31:58 +00:00
Context::Explore(size_t N)
{
2018-08-29 20:40:26 +00:00
// ask N random peers for new routers
llarp::LogInfo("Exploring network via ", N, " peers");
2018-08-29 20:40:26 +00:00
std::set< Key_t > peers;
2018-09-06 20:31:58 +00:00
if(nodes->GetManyRandom(peers, N))
{
2018-08-29 20:40:26 +00:00
for(const auto &peer : peers)
ExploreNetworkVia(peer);
}
2018-08-29 20:40:26 +00:00
else
llarp::LogError("failed to select random nodes for exploration");
}
2018-08-29 20:40:26 +00:00
struct ExploreNetworkJob : public TX< RouterID, RouterID >
{
2018-08-29 20:40:26 +00:00
ExploreNetworkJob(const RouterID &peer, Context *ctx)
: TX< RouterID, RouterID >(TXOwner{}, peer, ctx)
{
}
bool
Validate(const RouterID &) const override
{
// TODO: check with lokid
return true;
}
2018-08-29 20:40:26 +00:00
void
Start(const TXOwner &peer) override
2018-08-29 20:40:26 +00:00
{
parent->DHTSendTo(peer.node.as_array(),
new FindRouterMessage(peer.txid));
2018-08-29 20:40:26 +00:00
}
bool
GetNextPeer(Key_t &, const std::set< Key_t > &) override
2018-08-29 20:40:26 +00:00
{
return false;
}
void
DoNextRequest(const Key_t &) override
2018-08-29 20:40:26 +00:00
{
}
void
SendReply() override
{
2018-08-29 20:40:26 +00:00
llarp::LogInfo("got ", valuesFound.size(), " routers from exploration");
for(const auto &pk : valuesFound)
{
2018-11-28 15:18:18 +00:00
// lookup router
parent->LookupRouter(
pk,
std::bind(&llarp::Router::HandleDHTLookupForExplore,
parent->router, pk, std::placeholders::_1));
}
}
2018-08-29 20:40:26 +00:00
};
void
Context::ExploreNetworkVia(const Key_t &askpeer)
{
2018-11-28 15:27:36 +00:00
uint64_t txid = ++ids;
TXOwner peer(askpeer, txid);
TXOwner whoasked(OurKey(), txid);
pendingExploreLookups.NewTX(
peer, whoasked, askpeer.as_array(),
new ExploreNetworkJob(askpeer.as_array(), this));
}
void
Context::handle_explore_timer(void *u, uint64_t orig, uint64_t left)
{
if(left)
return;
Context *ctx = static_cast< Context * >(u);
2018-09-08 08:27:05 +00:00
ctx->Explore(1);
2018-12-10 14:14:55 +00:00
ctx->router->logic->call_later({orig, ctx, &handle_explore_timer});
}
void
Context::handle_cleaner_timer(void *u,
__attribute__((unused)) uint64_t orig,
uint64_t left)
{
if(left)
return;
Context *ctx = static_cast< Context * >(u);
// clean up transactions
ctx->CleanupTX();
2018-07-18 00:25:24 +00:00
if(ctx->services)
{
// expire intro sets
2018-10-29 16:48:36 +00:00
auto now = ctx->Now();
2018-07-18 00:25:24 +00:00
auto &nodes = ctx->services->nodes;
auto itr = nodes.begin();
while(itr != nodes.end())
{
if(itr->second.introset.IsExpired(now))
2018-08-04 02:59:32 +00:00
{
llarp::LogDebug("introset expired ", itr->second.introset.A.Addr());
2018-07-18 00:25:24 +00:00
itr = nodes.erase(itr);
2018-08-04 02:59:32 +00:00
}
2018-07-18 00:25:24 +00:00
else
++itr;
}
}
ctx->ScheduleCleanupTimer();
}
std::set< service::IntroSet >
2018-08-29 20:40:26 +00:00
Context::FindRandomIntroSetsWithTagExcluding(
const service::Tag &tag, size_t max,
const std::set< service::IntroSet > &exclude)
{
std::set< service::IntroSet > found;
2018-07-18 22:50:05 +00:00
auto &nodes = services->nodes;
if(nodes.size() == 0)
2018-08-10 03:51:38 +00:00
{
2018-07-18 22:50:05 +00:00
return found;
2018-08-10 03:51:38 +00:00
}
2018-07-18 22:50:05 +00:00
auto itr = nodes.begin();
// start at random middle point
auto start = llarp::randint() % nodes.size();
2018-07-18 22:50:05 +00:00
std::advance(itr, start);
2018-08-04 02:59:32 +00:00
auto end = itr;
std::string tagname = tag.ToString();
2018-07-18 22:50:05 +00:00
while(itr != nodes.end())
{
2018-08-04 02:59:32 +00:00
if(itr->second.introset.topic.ToString() == tagname)
2018-07-18 22:50:05 +00:00
{
2018-08-29 20:40:26 +00:00
if(exclude.count(itr->second.introset) == 0)
{
found.insert(itr->second.introset);
if(found.size() == max)
return found;
}
2018-07-18 22:50:05 +00:00
}
++itr;
}
itr = nodes.begin();
while(itr != end)
{
2018-08-04 02:59:32 +00:00
if(itr->second.introset.topic.ToString() == tagname)
2018-07-18 22:50:05 +00:00
{
2018-08-29 20:40:26 +00:00
if(exclude.count(itr->second.introset) == 0)
{
found.insert(itr->second.introset);
if(found.size() == max)
return found;
}
2018-07-18 22:50:05 +00:00
}
++itr;
}
return found;
2018-07-18 03:10:21 +00:00
}
void
Context::LookupRouterRelayed(
const Key_t &requester, uint64_t txid, const Key_t &target,
bool recursive, std::vector< std::unique_ptr< IMessage > > &replies)
{
if(target == ourKey)
{
// we are the target, give them our RC
replies.emplace_back(
new GotRouterMessage(requester, txid, {router->rc()}, false));
return;
}
Key_t next;
std::set< Key_t > excluding = {requester, ourKey};
if(nodes->FindCloseExcluding(target, next, excluding))
{
if(next == target)
{
// we know it
replies.emplace_back(new GotRouterMessage(
2018-08-30 18:48:43 +00:00
requester, txid, {nodes->nodes[target].rc}, false));
}
else if(recursive) // are we doing a recursive lookup?
{
2018-08-29 20:40:26 +00:00
// is the next peer we ask closer to the target than us?
if((next ^ target) < (ourKey ^ target))
{
// yes it is closer, ask neighbour recursively
LookupRouterRecursive(target.as_array(), requester, txid, next);
}
else
{
2018-08-29 20:40:26 +00:00
// no we are closer to the target so tell requester it's not there
// so they switch to iterative lookup
replies.emplace_back(
new GotRouterMessage(requester, txid, {}, false));
}
}
2018-11-08 15:15:02 +00:00
else
{
2018-11-08 15:15:02 +00:00
// iterative lookup and we don't have it tell them who is closer
replies.emplace_back(
2018-11-08 15:15:02 +00:00
new GotRouterMessage(requester, next, txid, false));
}
}
else
{
2018-08-29 20:40:26 +00:00
// we don't know it and have no closer peers to ask
replies.emplace_back(new GotRouterMessage(requester, txid, {}, false));
}
}
const llarp::service::IntroSet *
Context::GetIntroSetByServiceAddress(
const llarp::service::Address &addr) const
{
auto key = addr.ToKey();
auto itr = services->nodes.find(key);
if(itr == services->nodes.end())
return nullptr;
return &itr->second.introset;
}
void
Context::CleanupTX()
{
2018-10-29 16:48:36 +00:00
auto now = Now();
llarp::LogDebug("DHT tick");
2018-08-29 20:40:26 +00:00
pendingRouterLookups.Expire(now);
pendingIntrosetLookups.Expire(now);
pendingTagLookups.Expire(now);
pendingExploreLookups.Expire(now);
}
void
Context::Init(const Key_t &us, llarp::Router *r,
llarp_time_t exploreInterval)
{
router = r;
ourKey = us;
nodes = new Bucket< RCNode >(ourKey);
services = new Bucket< ISNode >(ourKey);
llarp::LogDebug("intialize dht with key ", ourKey);
// start exploring
2018-12-10 14:14:55 +00:00
r->logic->call_later(
{exploreInterval, this, &llarp::dht::Context::handle_explore_timer});
2018-10-27 20:02:24 +00:00
// start cleanup timer
ScheduleCleanupTimer();
}
void
Context::ScheduleCleanupTimer()
{
2018-12-10 14:14:55 +00:00
router->logic->call_later({1000, this, &handle_cleaner_timer});
}
2018-08-10 21:34:11 +00:00
void
Context::DHTSendTo(const RouterID &peer, IMessage *msg, bool keepalive)
2018-08-10 21:34:11 +00:00
{
llarp::DHTImmeidateMessage m;
m.msgs.emplace_back(msg);
router->SendToOrQueue(peer, &m);
2018-08-29 20:40:26 +00:00
if(keepalive)
{
2018-10-29 16:48:36 +00:00
auto now = Now();
2018-08-29 20:40:26 +00:00
router->PersistSessionUntil(peer, now + 10000);
}
2018-08-10 21:34:11 +00:00
}
bool
Context::RelayRequestForPath(const llarp::PathID_t &id, const IMessage *msg)
{
llarp::routing::DHTMessage reply;
if(!msg->HandleMessage(router->dht, reply.M))
return false;
2018-08-10 03:51:38 +00:00
if(reply.M.size())
{
auto path = router->paths.GetByUpstream(router->pubkey(), id);
return path && path->SendRoutingMessage(&reply, router);
}
return true;
}
2018-08-29 20:40:26 +00:00
struct ServiceAddressLookup
: public TX< service::Address, service::IntroSet >
2018-07-18 03:10:21 +00:00
{
2018-08-29 20:40:26 +00:00
IntroSetLookupHandler handleResult;
uint64_t R;
ServiceAddressLookup(const TXOwner &asker, const service::Address &addr,
Context *ctx, uint64_t r,
IntroSetLookupHandler handler)
: TX< service::Address, service::IntroSet >(asker, addr, ctx)
, handleResult(handler)
, R(r)
2018-08-10 21:34:11 +00:00
{
2018-08-29 20:40:26 +00:00
peersAsked.insert(ctx->OurKey());
2018-08-10 21:34:11 +00:00
}
bool
Validate(const service::IntroSet &value) const override
{
2018-10-29 16:48:36 +00:00
if(!value.Verify(parent->Crypto(), parent->Now()))
{
2018-09-20 11:27:18 +00:00
llarp::LogWarn("Got invalid introset from service lookup");
return false;
}
if(value.A.Addr() != target)
{
llarp::LogWarn("got introset with wrong target from service lookup");
return false;
}
return true;
}
2018-08-14 21:17:18 +00:00
bool
GetNextPeer(Key_t &next, const std::set< Key_t > &exclude) override
2018-08-29 20:40:26 +00:00
{
Key_t k = target.ToKey();
2018-08-29 20:40:26 +00:00
return parent->nodes->FindCloseExcluding(k, next, exclude);
}
void
Start(const TXOwner &peer) override
2018-08-10 21:34:11 +00:00
{
parent->DHTSendTo(peer.node.as_array(),
2018-08-29 20:40:26 +00:00
new FindIntroMessage(peer.txid, target, R));
}
void
DoNextRequest(const Key_t &ask) override
{
if(R)
parent->LookupIntroSetRecursive(target, whoasked.node, whoasked.txid,
ask, R - 1);
else
parent->LookupIntroSetIterative(target, whoasked.node, whoasked.txid,
ask);
}
2018-08-29 20:40:26 +00:00
virtual void
SendReply() override
2018-08-29 20:40:26 +00:00
{
if(handleResult)
handleResult(valuesFound);
2018-11-01 12:47:14 +00:00
parent->DHTSendTo(whoasked.node.as_array(),
2018-11-01 12:47:14 +00:00
new GotIntroMessage(valuesFound, whoasked.txid));
2018-08-29 20:40:26 +00:00
}
};
struct LocalServiceAddressLookup : public ServiceAddressLookup
{
PathID_t localPath;
LocalServiceAddressLookup(const PathID_t &pathid, uint64_t txid,
const service::Address &addr, Context *ctx,
__attribute__((unused)) const Key_t &askpeer)
2018-11-01 12:47:14 +00:00
: ServiceAddressLookup(TXOwner{ctx->OurKey(), txid}, addr, ctx, 5,
2018-08-29 20:40:26 +00:00
nullptr)
, localPath(pathid)
{
}
void
SendReply() override
2018-08-29 20:40:26 +00:00
{
auto path = parent->router->paths.GetByUpstream(
parent->OurKey().as_array(), localPath);
2018-08-29 20:40:26 +00:00
if(!path)
{
llarp::LogWarn(
"did not send reply for relayed dht request, no such local path "
"for pathid=",
localPath);
return;
}
routing::DHTMessage msg;
msg.M.emplace_back(new GotIntroMessage(valuesFound, whoasked.txid));
2018-08-29 20:40:26 +00:00
if(!path->SendRoutingMessage(&msg, parent->router))
{
llarp::LogWarn(
"failed to send routing message when informing result of dht "
"request, pathid=",
localPath);
}
}
};
void
Context::LookupIntroSetForPath(const service::Address &addr, uint64_t txid,
const llarp::PathID_t &path,
const Key_t &askpeer)
{
TXOwner asker(OurKey(), txid);
TXOwner peer(askpeer, ++ids);
pendingIntrosetLookups.NewTX(
2018-11-08 15:15:02 +00:00
peer, asker, addr,
2018-08-29 20:40:26 +00:00
new LocalServiceAddressLookup(path, txid, addr, this, askpeer));
}
struct PublishServiceJob : public TX< service::Address, service::IntroSet >
{
uint64_t S;
std::set< Key_t > dontTell;
service::IntroSet I;
PublishServiceJob(const TXOwner &asker, const service::IntroSet &introset,
Context *ctx, uint64_t s,
const std::set< Key_t > &exclude)
: TX< service::Address, service::IntroSet >(asker, introset.A.Addr(),
ctx)
, S(s)
, dontTell(exclude)
, I(introset)
{
}
bool
Validate(const service::IntroSet &introset) const override
{
if(I.A != introset.A)
{
llarp::LogWarn(
"publish introset acknoledgement acked a different service");
return false;
}
return true;
}
2018-08-29 20:40:26 +00:00
void
Start(const TXOwner &peer) override
2018-08-29 20:40:26 +00:00
{
std::vector< Key_t > exclude;
for(const auto &router : dontTell)
exclude.push_back(router);
parent->DHTSendTo(peer.node.as_array(),
2018-08-29 20:40:26 +00:00
new PublishIntroMessage(I, peer.txid, S, exclude));
2018-08-10 21:34:11 +00:00
}
bool
GetNextPeer(Key_t &, const std::set< Key_t > &) override
2018-07-18 03:10:21 +00:00
{
2018-08-29 20:40:26 +00:00
return false;
}
void
DoNextRequest(const Key_t &) override
2018-08-29 20:40:26 +00:00
{
}
void
SendReply() override
2018-08-29 20:40:26 +00:00
{
// don't need this
}
};
void
Context::PropagateIntroSetTo(const Key_t &from, uint64_t txid,
const service::IntroSet &introset,
const Key_t &tellpeer, uint64_t S,
const std::set< Key_t > &exclude)
{
TXOwner asker(from, txid);
TXOwner peer(tellpeer, ++ids);
service::Address addr = introset.A.Addr();
pendingIntrosetLookups.NewTX(
2018-11-08 15:15:02 +00:00
peer, asker, addr,
new PublishServiceJob(asker, introset, this, S, exclude));
2018-08-29 20:40:26 +00:00
}
void
Context::LookupIntroSetRecursive(const service::Address &addr,
const Key_t &whoasked, uint64_t txid,
const Key_t &askpeer, uint64_t R,
IntroSetLookupHandler handler)
{
TXOwner asker(whoasked, txid);
TXOwner peer(askpeer, ++ids);
pendingIntrosetLookups.NewTX(
2018-11-08 15:15:02 +00:00
peer, asker, addr,
new ServiceAddressLookup(asker, addr, this, R, handler));
2018-08-29 20:40:26 +00:00
}
void
Context::LookupIntroSetIterative(const service::Address &addr,
const Key_t &whoasked, uint64_t txid,
const Key_t &askpeer,
IntroSetLookupHandler handler)
{
TXOwner asker(whoasked, txid);
TXOwner peer(askpeer, ++ids);
pendingIntrosetLookups.NewTX(
2018-11-08 15:15:02 +00:00
peer, asker, addr,
new ServiceAddressLookup(asker, addr, this, 0, handler));
2018-08-29 20:40:26 +00:00
}
struct TagLookup : public TX< service::Tag, service::IntroSet >
{
uint64_t R;
TagLookup(const TXOwner &asker, const service::Tag &tag, Context *ctx,
uint64_t r)
: TX< service::Tag, service::IntroSet >(asker, tag, ctx), R(r)
{
}
bool
Validate(const service::IntroSet &introset) const override
{
2018-10-29 16:48:36 +00:00
if(!introset.Verify(parent->Crypto(), parent->Now()))
{
2018-09-20 11:27:18 +00:00
llarp::LogWarn("got invalid introset from tag lookup");
return false;
}
if(introset.topic != target)
{
llarp::LogWarn("got introset with missmatched topic in tag lookup");
return false;
}
return true;
}
2018-08-29 20:40:26 +00:00
void
Start(const TXOwner &peer) override
2018-08-29 20:40:26 +00:00
{
parent->DHTSendTo(peer.node.as_array(),
new FindIntroMessage(target, peer.txid, R));
2018-08-29 20:40:26 +00:00
}
bool
GetNextPeer(Key_t &, const std::set< Key_t > &) override
2018-08-29 20:40:26 +00:00
{
return false;
}
void
DoNextRequest(const Key_t &) override
2018-08-29 20:40:26 +00:00
{
}
void
SendReply() override
2018-08-29 20:40:26 +00:00
{
std::set< service::IntroSet > found;
for(const auto &remoteTag : valuesFound)
2018-08-02 01:41:40 +00:00
{
2018-08-29 20:40:26 +00:00
found.insert(remoteTag);
2018-08-02 01:41:40 +00:00
}
2018-08-29 20:40:26 +00:00
// collect our local values if we haven't hit a limit
2018-11-08 19:25:04 +00:00
if(found.size() < 2)
2018-07-18 03:10:21 +00:00
{
2018-08-29 20:40:26 +00:00
for(const auto &localTag :
parent->FindRandomIntroSetsWithTagExcluding(target, 1, found))
2018-08-10 21:34:11 +00:00
{
2018-08-29 20:40:26 +00:00
found.insert(localTag);
2018-08-10 21:34:11 +00:00
}
}
2018-08-29 20:40:26 +00:00
std::vector< service::IntroSet > values;
for(const auto &introset : found)
2018-08-10 21:34:11 +00:00
{
2018-08-29 20:40:26 +00:00
values.push_back(introset);
2018-07-18 03:10:21 +00:00
}
parent->DHTSendTo(whoasked.node.as_array(),
2018-08-29 20:40:26 +00:00
new GotIntroMessage(values, whoasked.txid));
2018-07-18 03:10:21 +00:00
}
};
void
2018-08-29 20:40:26 +00:00
Context::LookupTagRecursive(const service::Tag &tag, const Key_t &whoasked,
uint64_t whoaskedTX, const Key_t &askpeer,
uint64_t R)
2018-07-17 04:37:50 +00:00
{
2018-08-29 20:40:26 +00:00
TXOwner asker(whoasked, whoaskedTX);
TXOwner peer(askpeer, ++ids);
2018-11-08 15:15:02 +00:00
pendingTagLookups.NewTX(peer, asker, tag,
new TagLookup(asker, tag, this, R));
llarp::LogInfo("ask ", askpeer, " for ", tag, " on behalf of ", whoasked,
" R=", R);
}
struct LocalTagLookup : public TagLookup
{
PathID_t localPath;
LocalTagLookup(const PathID_t &path, uint64_t txid,
const service::Tag &target, Context *ctx)
2018-11-08 19:25:04 +00:00
: TagLookup(TXOwner{ctx->OurKey(), txid}, target, ctx, 0)
, localPath(path)
{
}
void
SendReply() override
{
auto path = parent->router->paths.GetByUpstream(
parent->OurKey().as_array(), localPath);
if(!path)
{
llarp::LogWarn(
"did not send reply for relayed dht request, no such local path "
"for pathid=",
localPath);
return;
}
routing::DHTMessage msg;
msg.M.emplace_back(new GotIntroMessage(valuesFound, whoasked.txid));
if(!path->SendRoutingMessage(&msg, parent->router))
{
llarp::LogWarn(
"failed to send routing message when informing result of dht "
"request, pathid=",
localPath);
}
}
};
void
Context::LookupTagForPath(const service::Tag &tag, uint64_t txid,
const llarp::PathID_t &path, const Key_t &askpeer)
{
TXOwner peer(askpeer, ++ids);
2018-11-08 19:25:04 +00:00
TXOwner whoasked(OurKey(), txid);
2018-11-08 15:15:02 +00:00
pendingTagLookups.NewTX(peer, whoasked, tag,
2018-11-08 19:25:04 +00:00
new LocalTagLookup(path, txid, tag, this));
2018-07-17 04:37:50 +00:00
}
bool
Context::HandleExploritoryRouterLookup(
const Key_t &requester, uint64_t txid, const RouterID &target,
std::vector< std::unique_ptr< IMessage > > &reply)
{
std::vector< RouterID > closer;
Key_t t(target.as_array());
std::set< Key_t > found;
2018-11-19 17:14:35 +00:00
if(!nodes)
return false;
2018-12-18 18:36:19 +00:00
size_t nodeCount = nodes->Size();
if(nodeCount == 0)
{
llarp::LogError(
"cannot handle exploritory router lookup, no dht peers");
return false;
}
2018-12-23 13:29:11 +00:00
llarp::LogDebug("We have ", nodes->Size(),
" connected nodes into the DHT");
// ourKey should never be in the connected list
// requester is likely in the connected list
// 4 or connection nodes (minus a potential requestor), whatever is less
size_t want = std::min(size_t(4), nodeCount - 1);
llarp::LogDebug("We want ", want, " connected nodes in the DHT");
if(!nodes->GetManyNearExcluding(t, found, want,
std::set< Key_t >{ourKey, requester}))
{
llarp::LogError(
"not enough dht nodes to handle exploritory router lookup, "
"need a minimum of ",
want, " dht peers");
return false;
}
for(const auto &f : found)
closer.emplace_back(f.as_array());
reply.emplace_back(new GotRouterMessage(txid, closer, false));
return true;
}
2018-08-30 18:48:43 +00:00
struct RecursiveRouterLookup : public TX< RouterID, RouterContact >
{
2018-08-30 18:48:43 +00:00
RouterLookupHandler resultHandler;
2018-08-29 20:40:26 +00:00
RecursiveRouterLookup(const TXOwner &whoasked, const RouterID &target,
2018-08-30 18:48:43 +00:00
Context *ctx, RouterLookupHandler result)
: TX< RouterID, RouterContact >(whoasked, target, ctx)
, resultHandler(result)
2018-08-29 20:40:26 +00:00
{
2018-08-29 20:40:26 +00:00
peersAsked.insert(ctx->OurKey());
}
2018-08-29 20:40:26 +00:00
bool
Validate(const RouterContact &rc) const override
{
if(!rc.Verify(parent->Crypto(), parent->Now()))
{
2018-10-15 12:02:32 +00:00
llarp::LogWarn("rc from lookup result is invalid");
return false;
}
return true;
}
2018-08-29 20:40:26 +00:00
bool
GetNextPeer(Key_t &, const std::set< Key_t > &) override
2018-08-29 20:40:26 +00:00
{
return false;
}
void
DoNextRequest(const Key_t &) override
2018-08-29 20:40:26 +00:00
{
}
void
Start(const TXOwner &peer) override
2018-08-29 20:40:26 +00:00
{
parent->DHTSendTo(peer.node.as_array(), new FindRouterMessage(peer.txid, target));
2018-08-29 20:40:26 +00:00
}
2018-08-31 12:46:54 +00:00
virtual void
SendReply() override
2018-08-29 20:40:26 +00:00
{
2018-08-30 18:48:43 +00:00
if(resultHandler)
2018-08-29 20:40:26 +00:00
{
2018-08-30 18:48:43 +00:00
resultHandler(valuesFound);
2018-08-29 20:40:26 +00:00
}
else
{
2018-09-06 20:31:58 +00:00
parent->DHTSendTo(
whoasked.node.as_array(),
2018-09-06 20:31:58 +00:00
new GotRouterMessage({}, whoasked.txid, valuesFound, false));
2018-08-29 20:40:26 +00:00
}
}
};
2018-08-31 12:46:54 +00:00
struct LocalRouterLookup : public RecursiveRouterLookup
{
PathID_t localPath;
LocalRouterLookup(const PathID_t &path, uint64_t txid,
const RouterID &target, Context *ctx)
: RecursiveRouterLookup(TXOwner{ctx->OurKey(), txid}, target, ctx,
nullptr)
, localPath(path)
{
}
void
SendReply() override
2018-08-31 12:46:54 +00:00
{
auto path = parent->router->paths.GetByUpstream(
parent->OurKey().as_array(), localPath);
2018-08-31 12:46:54 +00:00
if(!path)
{
llarp::LogWarn(
"did not send reply for relayed dht request, no such local path "
"for pathid=",
localPath);
return;
}
routing::DHTMessage msg;
msg.M.emplace_back(new GotRouterMessage(parent->OurKey(), whoasked.txid,
valuesFound, true));
2018-08-31 12:46:54 +00:00
if(!path->SendRoutingMessage(&msg, parent->router))
{
llarp::LogWarn(
"failed to send routing message when informing result of dht "
"request, pathid=",
localPath);
}
}
};
void
Context::LookupRouterForPath(const RouterID &target, uint64_t txid,
const llarp::PathID_t &path,
const Key_t &askpeer)
{
TXOwner peer(askpeer, ++ids);
2018-11-08 15:15:02 +00:00
TXOwner whoasked(OurKey(), txid);
pendingRouterLookups.NewTX(
2018-11-08 15:15:02 +00:00
peer, whoasked, target,
new LocalRouterLookup(path, txid, target, this));
2018-08-31 12:46:54 +00:00
}
2018-08-29 20:40:26 +00:00
void
Context::LookupRouterRecursive(const RouterID &target,
const Key_t &whoasked, uint64_t txid,
const Key_t &askpeer,
2018-08-30 18:48:43 +00:00
RouterLookupHandler handler)
2018-08-29 20:40:26 +00:00
{
TXOwner asker(whoasked, txid);
2018-11-28 16:38:20 +00:00
TXOwner peer(askpeer, ++ids);
2018-09-15 11:54:08 +00:00
if(target != askpeer)
{
pendingRouterLookups.NewTX(
2018-11-08 15:15:02 +00:00
peer, asker, target,
2018-09-15 11:54:08 +00:00
new RecursiveRouterLookup(asker, target, this, handler));
}
}
llarp::Crypto *
Context::Crypto()
{
return &router->crypto;
}
2018-10-29 16:48:36 +00:00
llarp_time_t
Context::Now()
{
return llarp_ev_loop_time_now_ms(router->netloop);
}
} // namespace dht
2018-07-13 09:27:57 +00:00
} // namespace llarp