Split classes out of dht::Context

pull/247/head
Michael 5 years ago
parent 5b5ea74c40
commit 40449df0f1
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C

@ -510,8 +510,12 @@ set(LIB_SRC
llarp/dht/bucket.cpp
llarp/dht/context.cpp
llarp/dht/dht.cpp
llarp/dht/explorenetworkjob.cpp
llarp/dht/kademlia.cpp
llarp/dht/key.cpp
llarp/dht/localtaglookup.cpp
llarp/dht/localrouterlookup.cpp
llarp/dht/localserviceaddresslookup.cpp
llarp/dht/message.cpp
llarp/dht/messages/findintro.cpp
llarp/dht/messages/findrouter.cpp
@ -519,6 +523,12 @@ set(LIB_SRC
llarp/dht/messages/gotrouter.cpp
llarp/dht/messages/pubintro.cpp
llarp/dht/node.cpp
llarp/dht/publishservicejob.cpp
llarp/dht/recursiverouterlookup.cpp
llarp/dht/serviceaddresslookup.cpp
llarp/dht/taglookup.cpp
llarp/dht/tx.cpp
llarp/dht/txholder.cpp
llarp/dht/txowner.cpp
llarp/dns.cpp
llarp/dnsc.cpp

@ -1,10 +1,18 @@
#include <dht/context.hpp>
#include <dht/explorenetworkjob.hpp>
#include <dht/localrouterlookup.hpp>
#include <dht/localserviceaddresslookup.hpp>
#include <dht/localtaglookup.hpp>
#include <dht/messages/findrouter.hpp>
#include <dht/messages/gotintro.hpp>
#include <dht/messages/gotrouter.hpp>
#include <dht/messages/pubintro.hpp>
#include <dht/node.hpp>
#include <dht/publishservicejob.hpp>
#include <dht/recursiverouterlookup.hpp>
#include <dht/serviceaddresslookup.hpp>
#include <dht/taglookup.hpp>
#include <messages/dht.hpp>
#include <messages/dht_immediate.hpp>
#include <router/router.hpp>
@ -15,18 +23,9 @@ namespace llarp
{
namespace dht
{
Context::Context()
Context::Context() : router(nullptr), allowTransit(false)
{
randombytes((byte_t *)&ids, sizeof(uint64_t));
allowTransit = false;
}
Context::~Context()
{
if(nodes)
delete nodes;
if(services)
delete services;
}
void
@ -45,53 +44,6 @@ namespace llarp
llarp::LogError("failed to select random nodes for exploration");
}
struct ExploreNetworkJob : public TX< RouterID, RouterID >
{
ExploreNetworkJob(const RouterID &peer, Context *ctx)
: TX< RouterID, RouterID >(TXOwner{}, peer, ctx)
{
}
bool
Validate(const RouterID &) const override
{
// TODO: check with lokid
return true;
}
void
Start(const TXOwner &peer) override
{
parent->DHTSendTo(peer.node.as_array(),
new FindRouterMessage(peer.txid));
}
bool
GetNextPeer(Key_t &, const std::set< Key_t > &) override
{
return false;
}
void
DoNextRequest(const Key_t &) override
{
}
void
SendReply() override
{
llarp::LogInfo("got ", valuesFound.size(), " routers from exploration");
for(const auto &pk : valuesFound)
{
// lookup router
parent->LookupRouter(
pk,
std::bind(&llarp::Router::HandleDHTLookupForExplore,
parent->router, pk, std::placeholders::_1));
}
}
};
void
Context::ExploreNetworkVia(const Key_t &askpeer)
{
@ -272,8 +224,8 @@ namespace llarp
{
router = r;
ourKey = us;
nodes = new Bucket< RCNode >(ourKey, llarp::randint);
services = new Bucket< ISNode >(ourKey, llarp::randint);
nodes = std::make_unique< Bucket< RCNode > >(ourKey, llarp::randint);
services = std::make_unique< Bucket< ISNode > >(ourKey, llarp::randint);
llarp::LogDebug("initialize dht with key ", ourKey);
// start exploring
@ -308,7 +260,7 @@ namespace llarp
llarp::routing::DHTMessage reply;
if(!msg->HandleMessage(router->dht, reply.M))
return false;
if(reply.M.size())
if(!reply.M.empty())
{
auto path = router->paths.GetByUpstream(router->pubkey(), id);
return path && path->SendRoutingMessage(&reply, router);
@ -316,112 +268,6 @@ namespace llarp
return true;
}
struct ServiceAddressLookup
: public TX< service::Address, service::IntroSet >
{
IntroSetLookupHandler handleResult;
uint64_t R;
ServiceAddressLookup(const TXOwner &asker, const service::Address &addr,
Context *ctx, uint64_t r,
IntroSetLookupHandler handler)
: TX< service::Address, service::IntroSet >(asker, addr, ctx)
, handleResult(handler)
, R(r)
{
peersAsked.insert(ctx->OurKey());
}
bool
Validate(const service::IntroSet &value) const override
{
if(!value.Verify(parent->Crypto(), parent->Now()))
{
llarp::LogWarn("Got invalid introset from service lookup");
return false;
}
if(value.A.Addr() != target)
{
llarp::LogWarn("got introset with wrong target from service lookup");
return false;
}
return true;
}
bool
GetNextPeer(Key_t &next, const std::set< Key_t > &exclude) override
{
Key_t k = target.ToKey();
return parent->nodes->FindCloseExcluding(k, next, exclude);
}
void
Start(const TXOwner &peer) override
{
parent->DHTSendTo(peer.node.as_array(),
new FindIntroMessage(peer.txid, target, R));
}
void
DoNextRequest(const Key_t &ask) override
{
if(R)
parent->LookupIntroSetRecursive(target, whoasked.node, whoasked.txid,
ask, R - 1);
else
parent->LookupIntroSetIterative(target, whoasked.node, whoasked.txid,
ask);
}
virtual void
SendReply() override
{
if(handleResult)
handleResult(valuesFound);
parent->DHTSendTo(whoasked.node.as_array(),
new GotIntroMessage(valuesFound, whoasked.txid));
}
};
struct LocalServiceAddressLookup : public ServiceAddressLookup
{
PathID_t localPath;
LocalServiceAddressLookup(const PathID_t &pathid, uint64_t txid,
const service::Address &addr, Context *ctx,
__attribute__((unused)) const Key_t &askpeer)
: ServiceAddressLookup(TXOwner{ctx->OurKey(), txid}, addr, ctx, 5,
nullptr)
, localPath(pathid)
{
}
void
SendReply() override
{
auto path = parent->router->paths.GetByUpstream(
parent->OurKey().as_array(), localPath);
if(!path)
{
llarp::LogWarn(
"did not send reply for relayed dht request, no such local path "
"for pathid=",
localPath);
return;
}
routing::DHTMessage msg;
msg.M.emplace_back(new GotIntroMessage(valuesFound, whoasked.txid));
if(!path->SendRoutingMessage(&msg, parent->router))
{
llarp::LogWarn(
"failed to send routing message when informing result of dht "
"request, pathid=",
localPath);
}
}
};
void
Context::LookupIntroSetForPath(const service::Address &addr, uint64_t txid,
const llarp::PathID_t &path,
@ -434,62 +280,6 @@ namespace llarp
new LocalServiceAddressLookup(path, txid, addr, this, askpeer));
}
struct PublishServiceJob : public TX< service::Address, service::IntroSet >
{
uint64_t S;
std::set< Key_t > dontTell;
service::IntroSet I;
PublishServiceJob(const TXOwner &asker, const service::IntroSet &introset,
Context *ctx, uint64_t s,
const std::set< Key_t > &exclude)
: TX< service::Address, service::IntroSet >(asker, introset.A.Addr(),
ctx)
, S(s)
, dontTell(exclude)
, I(introset)
{
}
bool
Validate(const service::IntroSet &introset) const override
{
if(I.A != introset.A)
{
llarp::LogWarn(
"publish introset acknowledgement acked a different service");
return false;
}
return true;
}
void
Start(const TXOwner &peer) override
{
std::vector< Key_t > exclude;
for(const auto &router : dontTell)
exclude.push_back(router);
parent->DHTSendTo(peer.node.as_array(),
new PublishIntroMessage(I, peer.txid, S, exclude));
}
bool
GetNextPeer(Key_t &, const std::set< Key_t > &) override
{
return false;
}
void
DoNextRequest(const Key_t &) override
{
}
void
SendReply() override
{
// don't need this
}
};
void
Context::PropagateIntroSetTo(const Key_t &from, uint64_t txid,
const service::IntroSet &introset,
@ -508,7 +298,7 @@ namespace llarp
Context::LookupIntroSetRecursive(const service::Address &addr,
const Key_t &whoasked, uint64_t txid,
const Key_t &askpeer, uint64_t R,
IntroSetLookupHandler handler)
service::IntroSetLookupHandler handler)
{
TXOwner asker(whoasked, txid);
TXOwner peer(askpeer, ++ids);
@ -521,7 +311,7 @@ namespace llarp
Context::LookupIntroSetIterative(const service::Address &addr,
const Key_t &whoasked, uint64_t txid,
const Key_t &askpeer,
IntroSetLookupHandler handler)
service::IntroSetLookupHandler handler)
{
TXOwner asker(whoasked, txid);
TXOwner peer(askpeer, ++ids);
@ -530,76 +320,6 @@ namespace llarp
new ServiceAddressLookup(asker, addr, this, 0, handler));
}
struct TagLookup : public TX< service::Tag, service::IntroSet >
{
uint64_t R;
TagLookup(const TXOwner &asker, const service::Tag &tag, Context *ctx,
uint64_t r)
: TX< service::Tag, service::IntroSet >(asker, tag, ctx), R(r)
{
}
bool
Validate(const service::IntroSet &introset) const override
{
if(!introset.Verify(parent->Crypto(), parent->Now()))
{
llarp::LogWarn("got invalid introset from tag lookup");
return false;
}
if(introset.topic != target)
{
llarp::LogWarn("got introset with missmatched topic in tag lookup");
return false;
}
return true;
}
void
Start(const TXOwner &peer) override
{
parent->DHTSendTo(peer.node.as_array(),
new FindIntroMessage(target, peer.txid, R));
}
bool
GetNextPeer(Key_t &, const std::set< Key_t > &) override
{
return false;
}
void
DoNextRequest(const Key_t &) override
{
}
void
SendReply() override
{
std::set< service::IntroSet > found;
for(const auto &remoteTag : valuesFound)
{
found.insert(remoteTag);
}
// collect our local values if we haven't hit a limit
if(found.size() < 2)
{
for(const auto &localTag :
parent->FindRandomIntroSetsWithTagExcluding(target, 1, found))
{
found.insert(localTag);
}
}
std::vector< service::IntroSet > values;
for(const auto &introset : found)
{
values.push_back(introset);
}
parent->DHTSendTo(whoasked.node.as_array(),
new GotIntroMessage(values, whoasked.txid));
}
};
void
Context::LookupTagRecursive(const service::Tag &tag, const Key_t &whoasked,
uint64_t whoaskedTX, const Key_t &askpeer,
@ -613,42 +333,6 @@ namespace llarp
" R=", R);
}
struct LocalTagLookup : public TagLookup
{
PathID_t localPath;
LocalTagLookup(const PathID_t &path, uint64_t txid,
const service::Tag &target, Context *ctx)
: TagLookup(TXOwner{ctx->OurKey(), txid}, target, ctx, 0)
, localPath(path)
{
}
void
SendReply() override
{
auto path = parent->router->paths.GetByUpstream(
parent->OurKey().as_array(), localPath);
if(!path)
{
llarp::LogWarn(
"did not send reply for relayed dht request, no such local path "
"for pathid=",
localPath);
return;
}
routing::DHTMessage msg;
msg.M.emplace_back(new GotIntroMessage(valuesFound, whoasked.txid));
if(!path->SendRoutingMessage(&msg, parent->router))
{
llarp::LogWarn(
"failed to send routing message when informing result of dht "
"request, pathid=",
localPath);
}
}
};
void
Context::LookupTagForPath(const service::Tag &tag, uint64_t txid,
const llarp::PathID_t &path, const Key_t &askpeer)
@ -699,101 +383,6 @@ namespace llarp
return true;
}
struct RecursiveRouterLookup : public TX< RouterID, RouterContact >
{
RouterLookupHandler resultHandler;
RecursiveRouterLookup(const TXOwner &whoasked, const RouterID &target,
Context *ctx, RouterLookupHandler result)
: TX< RouterID, RouterContact >(whoasked, target, ctx)
, resultHandler(result)
{
peersAsked.insert(ctx->OurKey());
}
bool
Validate(const RouterContact &rc) const override
{
if(!rc.Verify(parent->Crypto(), parent->Now()))
{
llarp::LogWarn("rc from lookup result is invalid");
return false;
}
return true;
}
bool
GetNextPeer(Key_t &, const std::set< Key_t > &) override
{
return false;
}
void
DoNextRequest(const Key_t &) override
{
}
void
Start(const TXOwner &peer) override
{
parent->DHTSendTo(peer.node.as_array(),
new FindRouterMessage(peer.txid, target));
}
virtual void
SendReply() override
{
if(resultHandler)
{
resultHandler(valuesFound);
}
else
{
parent->DHTSendTo(
whoasked.node.as_array(),
new GotRouterMessage({}, whoasked.txid, valuesFound, false));
}
}
};
struct LocalRouterLookup : public RecursiveRouterLookup
{
PathID_t localPath;
LocalRouterLookup(const PathID_t &path, uint64_t txid,
const RouterID &target, Context *ctx)
: RecursiveRouterLookup(TXOwner{ctx->OurKey(), txid}, target, ctx,
nullptr)
, localPath(path)
{
}
void
SendReply() override
{
auto path = parent->router->paths.GetByUpstream(
parent->OurKey().as_array(), localPath);
if(!path)
{
llarp::LogWarn(
"did not send reply for relayed dht request, no such local path "
"for pathid=",
localPath);
return;
}
routing::DHTMessage msg;
msg.M.emplace_back(new GotRouterMessage(parent->OurKey(), whoasked.txid,
valuesFound, true));
if(!path->SendRoutingMessage(&msg, parent->router))
{
llarp::LogWarn(
"failed to send routing message when informing result of dht "
"request, pathid=",
localPath);
}
}
};
void
Context::LookupRouterForPath(const RouterID &target, uint64_t txid,
const llarp::PathID_t &path,

@ -1,5 +1,5 @@
#ifndef LLARP_DHT_CONTEXT_HPP
#define LLARP_DHT_CONTEXT_HPP
#ifndef LLARP_DHT_CONTEXT
#define LLARP_DHT_CONTEXT
#include <dht/bucket.hpp>
#include <dht/dht.h>
@ -7,6 +7,8 @@
#include <dht/message.hpp>
#include <dht/messages/findintro.hpp>
#include <dht/node.hpp>
#include <dht/tx.hpp>
#include <dht/txholder.hpp>
#include <dht/txowner.hpp>
#include <service/IntroSet.hpp>
#include <util/time.hpp>
@ -19,93 +21,9 @@ namespace llarp
namespace dht
{
struct Context;
template < typename K, typename V >
struct TX
{
TX(const TXOwner& asker, const K& k, Context* p)
: target(k), whoasked(asker)
{
parent = p;
}
virtual ~TX(){};
K target;
Context* parent;
std::set< Key_t > peersAsked;
std::vector< V > valuesFound;
TXOwner whoasked;
virtual bool
Validate(const V& value) const = 0;
void
OnFound(const Key_t askedPeer, const V& value)
{
peersAsked.insert(askedPeer);
if(Validate(value))
valuesFound.push_back(value);
}
virtual void
Start(const TXOwner& peer) = 0;
virtual bool
GetNextPeer(Key_t& next, const std::set< Key_t >& exclude) = 0;
virtual void
DoNextRequest(const Key_t& peer) = 0;
/// return true if we want to persist this tx
bool
AskNextPeer(const Key_t& prevPeer, const std::unique_ptr< Key_t >& next)
{
peersAsked.insert(prevPeer);
Key_t peer;
if(next)
{
// explicit next peer provided
peer = *next;
}
else if(!GetNextPeer(peer, peersAsked))
{
// no more peers
llarp::LogInfo("no more peers for request asking for", target);
return false;
}
const Key_t targetKey{target};
if((prevPeer ^ targetKey) < (peer ^ targetKey))
{
// next peer is not closer
llarp::LogInfo("next peer ", peer, " is not closer to ", target,
" than ", prevPeer);
return false;
}
else
{
peersAsked.insert(peer);
}
DoNextRequest(peer);
return true;
}
virtual void
SendReply() = 0;
};
using IntroSetLookupHandler =
std::function< void(const std::vector< service::IntroSet >&) >;
using RouterLookupHandler =
std::function< void(const std::vector< RouterContact >&) >;
struct Context
{
Context();
~Context();
llarp::Crypto*
Crypto();
@ -116,13 +34,13 @@ namespace llarp
LookupIntroSetRecursive(const service::Address& target,
const Key_t& whoasked, uint64_t whoaskedTX,
const Key_t& askpeer, uint64_t R,
IntroSetLookupHandler result = nullptr);
service::IntroSetLookupHandler result = nullptr);
void
LookupIntroSetIterative(const service::Address& target,
const Key_t& whoasked, uint64_t whoaskedTX,
const Key_t& askpeer,
IntroSetLookupHandler result = nullptr);
service::IntroSetLookupHandler result = nullptr);
/// on behalf of whoasked request router with public key target from dht
/// router with key askpeer
@ -136,7 +54,9 @@ namespace llarp
{
Key_t askpeer;
if(!nodes->FindClosest(Key_t(target), askpeer))
{
return false;
}
LookupRouterRecursive(target, OurKey(), 0, askpeer, result);
return true;
}
@ -221,13 +141,13 @@ namespace llarp
void
Explore(size_t N = 3);
llarp::Router* router = nullptr;
llarp::Router* router;
// for router contacts
Bucket< RCNode >* nodes = nullptr;
std::unique_ptr< Bucket< RCNode > > nodes;
// for introduction sets
Bucket< ISNode >* services = nullptr;
bool allowTransit = false;
std::unique_ptr< Bucket< ISNode > > services;
bool allowTransit;
const Key_t&
OurKey() const
@ -235,130 +155,6 @@ namespace llarp
return ourKey;
}
template < typename K, typename V, typename K_Hash,
llarp_time_t requestTimeoutMS = 5000UL >
struct TXHolder
{
// tx who are waiting for a reply for each key
std::unordered_multimap< K, TXOwner, K_Hash > waiting;
// tx timesouts by key
std::unordered_map< K, llarp_time_t, K_Hash > timeouts;
// maps remote peer with tx to handle reply from them
std::unordered_map< TXOwner, std::unique_ptr< TX< K, V > >,
TXOwner::Hash >
tx;
const TX< K, V >*
GetPendingLookupFrom(const TXOwner& owner) const
{
auto itr = tx.find(owner);
if(itr == tx.end())
return nullptr;
else
return itr->second.get();
}
bool
HasLookupFor(const K& target) const
{
return timeouts.find(target) != timeouts.end();
}
bool
HasPendingLookupFrom(const TXOwner& owner) const
{
return GetPendingLookupFrom(owner) != nullptr;
}
void
NewTX(const TXOwner& askpeer, const TXOwner& whoasked, const K& k,
TX< K, V >* t)
{
(void)whoasked;
tx.emplace(askpeer, std::unique_ptr< TX< K, V > >(t));
auto count = waiting.count(k);
waiting.insert(std::make_pair(k, askpeer));
auto itr = timeouts.find(k);
if(itr == timeouts.end())
{
timeouts.insert(
std::make_pair(k, time_now_ms() + requestTimeoutMS));
}
if(count == 0)
t->Start(askpeer);
}
/// mark tx as not fond
void
NotFound(const TXOwner& from, const std::unique_ptr< Key_t >& next)
{
bool sendReply = true;
auto txitr = tx.find(from);
if(txitr == tx.end())
return;
// ask for next peer
if(txitr->second->AskNextPeer(from.node, next))
sendReply = false;
llarp::LogWarn("Target key ", txitr->second->target);
Inform(from, txitr->second->target, {}, sendReply, sendReply);
}
void
Found(const TXOwner& from, const K& k, const std::vector< V >& values)
{
Inform(from, k, values, true);
}
/// inform all watches for key of values found
void
Inform(TXOwner from, K key, std::vector< V > values,
bool sendreply = false, bool removeTimeouts = true)
{
auto range = waiting.equal_range(key);
auto itr = range.first;
while(itr != range.second)
{
auto txitr = tx.find(itr->second);
if(txitr != tx.end())
{
for(const auto& value : values)
txitr->second->OnFound(from.node, value);
if(sendreply)
{
txitr->second->SendReply();
tx.erase(txitr);
}
}
++itr;
}
if(sendreply)
waiting.erase(key);
if(removeTimeouts)
timeouts.erase(key);
}
void
Expire(llarp_time_t now)
{
auto itr = timeouts.begin();
while(itr != timeouts.end())
{
if(now > itr->second && now - itr->second >= requestTimeoutMS)
{
Inform(TXOwner{}, itr->first, {}, true, false);
itr = timeouts.erase(itr);
}
else
++itr;
}
}
};
TXHolder< service::Address, service::IntroSet, service::Address::Hash >
pendingIntrosetLookups;

@ -0,0 +1,31 @@
#include <dht/explorenetworkjob.hpp>
#include <dht/context.hpp>
#include <dht/messages/findrouter.hpp>
#include <router/router.hpp>
namespace llarp
{
namespace dht
{
void
ExploreNetworkJob::Start(const TXOwner &peer)
{
parent->DHTSendTo(peer.node.as_array(), new FindRouterMessage(peer.txid));
}
void
ExploreNetworkJob::SendReply()
{
llarp::LogInfo("got ", valuesFound.size(), " routers from exploration");
for(const auto &pk : valuesFound)
{
// lookup router
parent->LookupRouter(
pk,
std::bind(&Router::HandleDHTLookupForExplore, parent->router, pk,
std::placeholders::_1));
}
}
} // namespace dht
} // namespace llarp

@ -0,0 +1,45 @@
#ifndef LLARP_DHT_EXPLORENETWORKJOB
#define LLARP_DHT_EXPLORENETWORKJOB
#include <dht/tx.hpp>
#include <router_id.hpp>
namespace llarp
{
namespace dht
{
struct ExploreNetworkJob : public TX< RouterID, RouterID >
{
ExploreNetworkJob(const RouterID &peer, Context *ctx)
: TX< RouterID, RouterID >(TXOwner{}, peer, ctx)
{
}
bool
Validate(const RouterID &) const override
{
// TODO: check with lokid
return true;
}
void
Start(const TXOwner &peer) override;
bool
GetNextPeer(Key_t &, const std::set< Key_t > &) override
{
return false;
}
void
DoNextRequest(const Key_t &) override
{
}
void
SendReply() override;
};
} // namespace dht
} // namespace llarp
#endif

@ -0,0 +1,46 @@
#include <dht/localrouterlookup.hpp>
#include <dht/context.hpp>
#include <dht/messages/gotrouter.hpp>
#include <messages/dht.hpp>
#include <router/router.hpp>
#include <util/logger.hpp>
namespace llarp
{
namespace dht
{
LocalRouterLookup::LocalRouterLookup(const PathID_t &path, uint64_t txid,
const RouterID &target, Context *ctx)
: RecursiveRouterLookup(TXOwner{ctx->OurKey(), txid}, target, ctx,
nullptr)
, localPath(path)
{
}
void
LocalRouterLookup::SendReply()
{
auto path = parent->router->paths.GetByUpstream(
parent->OurKey().as_array(), localPath);
if(!path)
{
llarp::LogWarn(
"did not send reply for relayed dht request, no such local path "
"for pathid=",
localPath);
return;
}
routing::DHTMessage msg;
msg.M.emplace_back(new GotRouterMessage(parent->OurKey(), whoasked.txid,
valuesFound, true));
if(!path->SendRoutingMessage(&msg, parent->router))
{
llarp::LogWarn(
"failed to send routing message when informing result of dht "
"request, pathid=",
localPath);
}
}
} // namespace dht
} // namespace llarp

@ -0,0 +1,27 @@
#ifndef LLARP_DHT_LOCALROUTERLOOKUP
#define LLARP_DHT_LOCALROUTERLOOKUP
#include <dht/recursiverouterlookup.hpp>
#include <path/path_types.hpp>
#include <router_contact.hpp>
#include <router_id.hpp>
namespace llarp
{
namespace dht
{
struct LocalRouterLookup : public RecursiveRouterLookup
{
PathID_t localPath;
LocalRouterLookup(const PathID_t &path, uint64_t txid,
const RouterID &target, Context *ctx);
void
SendReply() override;
};
} // namespace dht
} // namespace llarp
#endif

@ -0,0 +1,45 @@
#include <dht/localserviceaddresslookup.hpp>
#include <dht/context.hpp>
#include <dht/messages/gotintro.hpp>
#include <router/router.hpp>
#include <util/logger.hpp>
namespace llarp
{
namespace dht
{
LocalServiceAddressLookup::LocalServiceAddressLookup(
const PathID_t &pathid, uint64_t txid, const service::Address &addr,
Context *ctx, __attribute__((unused)) const Key_t &askpeer)
: ServiceAddressLookup(TXOwner{ctx->OurKey(), txid}, addr, ctx, 5,
nullptr)
, localPath(pathid)
{
}
void
LocalServiceAddressLookup::SendReply()
{
auto path = parent->router->paths.GetByUpstream(
parent->OurKey().as_array(), localPath);
if(!path)
{
llarp::LogWarn(
"did not send reply for relayed dht request, no such local path "
"for pathid=",
localPath);
return;
}
routing::DHTMessage msg;
msg.M.emplace_back(new GotIntroMessage(valuesFound, whoasked.txid));
if(!path->SendRoutingMessage(&msg, parent->router))
{
llarp::LogWarn(
"failed to send routing message when informing result of dht "
"request, pathid=",
localPath);
}
}
} // namespace dht
} // namespace llarp

@ -0,0 +1,27 @@
#ifndef LLARP_DHT_LOCALSERVICEADDRESSLOOKUP
#define LLARP_DHT_LOCALSERVICEADDRESSLOOKUP
#include <dht/serviceaddresslookup.hpp>
#include <path/path_types.hpp>
namespace llarp
{
namespace dht
{
struct LocalServiceAddressLookup : public ServiceAddressLookup
{
PathID_t localPath;
LocalServiceAddressLookup(const PathID_t &pathid, uint64_t txid,
const service::Address &addr, Context *ctx,
__attribute__((unused)) const Key_t &askpeer);
void
SendReply() override;
};
} // namespace dht
} // namespace llarp
#endif

@ -0,0 +1,43 @@
#include <dht/localtaglookup.hpp>
#include <dht/context.hpp>
#include <dht/messages/gotintro.hpp>
#include <messages/dht.hpp>
#include <router/router.hpp>
namespace llarp
{
namespace dht
{
LocalTagLookup::LocalTagLookup(const PathID_t &path, uint64_t txid,
const service::Tag &target, Context *ctx)
: TagLookup(TXOwner{ctx->OurKey(), txid}, target, ctx, 0)
, localPath(path)
{
}
void
LocalTagLookup::SendReply()
{
auto path = parent->router->paths.GetByUpstream(
parent->OurKey().as_array(), localPath);
if(!path)
{
llarp::LogWarn(
"did not send reply for relayed dht request, no such local path "
"for pathid=",
localPath);
return;
}
routing::DHTMessage msg;
msg.M.emplace_back(new GotIntroMessage(valuesFound, whoasked.txid));
if(!path->SendRoutingMessage(&msg, parent->router))
{
llarp::LogWarn(
"failed to send routing message when informing result of dht "
"request, pathid=",
localPath);
}
}
} // namespace dht
} // namespace llarp

@ -0,0 +1,23 @@
#ifndef LLARP_DHT_LOOKUPTAGLOOKUP
#define LLARP_DHT_LOOKUPTAGLOOKUP
#include <dht/taglookup.hpp>
namespace llarp
{
namespace dht
{
struct LocalTagLookup : public TagLookup
{
PathID_t localPath;
LocalTagLookup(const PathID_t &path, uint64_t txid,
const service::Tag &target, Context *ctx);
void
SendReply() override;
};
} // namespace dht
} // namespace llarp
#endif

@ -41,9 +41,8 @@ namespace llarp
ID.Zero();
}
ISNode(const service::IntroSet& other)
ISNode(const service::IntroSet& other) : introset(other)
{
introset = other;
introset.A.CalculateAddress(ID.as_array());
}

@ -0,0 +1,46 @@
#include <dht/publishservicejob.hpp>
#include <dht/context.hpp>
#include <dht/messages/pubintro.hpp>
namespace llarp
{
namespace dht
{
PublishServiceJob::PublishServiceJob(const TXOwner &asker,
const service::IntroSet &introset,
Context *ctx, uint64_t s,
const std::set< Key_t > &exclude)
: TX< service::Address, service::IntroSet >(asker, introset.A.Addr(),
ctx)
, S(s)
, dontTell(exclude)
, I(introset)
{
}
bool
PublishServiceJob::Validate(const service::IntroSet &introset) const
{
if(I.A != introset.A)
{
llarp::LogWarn(
"publish introset acknowledgement acked a different service");
return false;
}
return true;
}
void
PublishServiceJob::Start(const TXOwner &peer)
{
std::vector< Key_t > exclude;
for(const auto &router : dontTell)
{
exclude.push_back(router);
}
parent->DHTSendTo(peer.node.as_array(),
new PublishIntroMessage(I, peer.txid, S, exclude));
}
} // namespace dht
} // namespace llarp

@ -0,0 +1,51 @@
#ifndef LLARP_DHT_PUBLISHSERVICEJOB
#define LLARP_DHT_PUBLISHSERVICEJOB
#include <dht/tx.hpp>
#include <dht/txowner.hpp>
#include <service/address.hpp>
#include <service/IntroSet.hpp>
#include <set>
namespace llarp
{
namespace dht
{
struct PublishServiceJob : public TX< service::Address, service::IntroSet >
{
uint64_t S;
std::set< Key_t > dontTell;
service::IntroSet I;
PublishServiceJob(const TXOwner &asker, const service::IntroSet &introset,
Context *ctx, uint64_t s,
const std::set< Key_t > &exclude);
bool
Validate(const service::IntroSet &introset) const override;
void
Start(const TXOwner &peer) override;
bool
GetNextPeer(Key_t &, const std::set< Key_t > &) override
{
return false;
}
void
DoNextRequest(const Key_t &) override
{
}
void
SendReply() override
{
// don't need this
}
};
} // namespace dht
} // namespace llarp
#endif

@ -0,0 +1,55 @@
#include <dht/recursiverouterlookup.hpp>
#include <dht/context.hpp>
#include <dht/messages/findrouter.hpp>
#include <dht/messages/gotrouter.hpp>
namespace llarp
{
namespace dht
{
RecursiveRouterLookup::RecursiveRouterLookup(const TXOwner &whoasked,
const RouterID &target,
Context *ctx,
RouterLookupHandler result)
: TX< RouterID, RouterContact >(whoasked, target, ctx)
, resultHandler(result)
{
peersAsked.insert(ctx->OurKey());
}
bool
RecursiveRouterLookup::Validate(const RouterContact &rc) const
{
if(!rc.Verify(parent->Crypto(), parent->Now()))
{
llarp::LogWarn("rc from lookup result is invalid");
return false;
}
return true;
}
void
RecursiveRouterLookup::Start(const TXOwner &peer)
{
parent->DHTSendTo(peer.node.as_array(),
new FindRouterMessage(peer.txid, target));
}
void
RecursiveRouterLookup::SendReply()
{
if(resultHandler)
{
resultHandler(valuesFound);
}
else
{
parent->DHTSendTo(
whoasked.node.as_array(),
new GotRouterMessage({}, whoasked.txid, valuesFound, false));
}
}
} // namespace dht
} // namespace llarp

@ -0,0 +1,42 @@
#ifndef LLARP_DHT_RECURSIVEROUTERLOOKUP
#define LLARP_DHT_RECURSIVEROUTERLOOKUP
#include <dht/tx.hpp>
#include <router_contact.hpp>
#include <router_id.hpp>
namespace llarp
{
namespace dht
{
struct RecursiveRouterLookup : public TX< RouterID, RouterContact >
{
RouterLookupHandler resultHandler;
RecursiveRouterLookup(const TXOwner &whoasked, const RouterID &target,
Context *ctx, RouterLookupHandler result);
bool
Validate(const RouterContact &rc) const override;
bool
GetNextPeer(Key_t &, const std::set< Key_t > &) override
{
return false;
}
void
DoNextRequest(const Key_t &) override
{
}
void
Start(const TXOwner &peer) override;
virtual void
SendReply() override;
};
} // namespace dht
} // namespace llarp
#endif

@ -0,0 +1,79 @@
#include <dht/serviceaddresslookup.hpp>
#include <dht/context.hpp>
#include <dht/messages/findintro.hpp>
#include <dht/messages/gotintro.hpp>
namespace llarp
{
namespace dht
{
ServiceAddressLookup::ServiceAddressLookup(
const TXOwner &asker, const service::Address &addr, Context *ctx,
uint64_t r, service::IntroSetLookupHandler handler)
: TX< service::Address, service::IntroSet >(asker, addr, ctx)
, handleResult(handler)
, R(r)
{
peersAsked.insert(ctx->OurKey());
}
bool
ServiceAddressLookup::Validate(const service::IntroSet &value) const
{
if(!value.Verify(parent->Crypto(), parent->Now()))
{
llarp::LogWarn("Got invalid introset from service lookup");
return false;
}
if(value.A.Addr() != target)
{
llarp::LogWarn("got introset with wrong target from service lookup");
return false;
}
return true;
}
bool
ServiceAddressLookup::GetNextPeer(Key_t &next,
const std::set< Key_t > &exclude)
{
Key_t k = target.ToKey();
return parent->nodes->FindCloseExcluding(k, next, exclude);
}
void
ServiceAddressLookup::Start(const TXOwner &peer)
{
parent->DHTSendTo(peer.node.as_array(),
new FindIntroMessage(peer.txid, target, R));
}
void
ServiceAddressLookup::DoNextRequest(const Key_t &ask)
{
if(R)
{
parent->LookupIntroSetRecursive(target, whoasked.node, whoasked.txid,
ask, R - 1);
}
else
{
parent->LookupIntroSetIterative(target, whoasked.node, whoasked.txid,
ask);
}
}
void
ServiceAddressLookup::SendReply()
{
if(handleResult)
{
handleResult(valuesFound);
}
parent->DHTSendTo(whoasked.node.as_array(),
new GotIntroMessage(valuesFound, whoasked.txid));
}
} // namespace dht
} // namespace llarp

@ -0,0 +1,44 @@
#ifndef LLARP_DHT_SERVICEADDRESSLOOKUP
#define LLARP_DHT_SERVICEADDRESSLOOKUP
#include <dht/key.hpp>
#include <dht/tx.hpp>
#include <service/address.hpp>
#include <service/IntroSet.hpp>
namespace llarp
{
namespace dht
{
struct TXOwner;
struct ServiceAddressLookup
: public TX< service::Address, service::IntroSet >
{
service::IntroSetLookupHandler handleResult;
uint64_t R;
ServiceAddressLookup(const TXOwner &asker, const service::Address &addr,
Context *ctx, uint64_t r,
service::IntroSetLookupHandler handler);
bool
Validate(const service::IntroSet &value) const override;
bool
GetNextPeer(Key_t &next, const std::set< Key_t > &exclude) override;
void
Start(const TXOwner &peer) override;
void
DoNextRequest(const Key_t &ask) override;
virtual void
SendReply() override;
};
} // namespace dht
} // namespace llarp
#endif

@ -0,0 +1,59 @@
#include <dht/taglookup.hpp>
#include <dht/context.hpp>
#include <dht/messages/gotintro.hpp>
namespace llarp
{
namespace dht
{
bool
TagLookup::Validate(const service::IntroSet &introset) const
{
if(!introset.Verify(parent->Crypto(), parent->Now()))
{
llarp::LogWarn("got invalid introset from tag lookup");
return false;
}
if(introset.topic != target)
{
llarp::LogWarn("got introset with missmatched topic in tag lookup");
return false;
}
return true;
}
void
TagLookup::Start(const TXOwner &peer)
{
parent->DHTSendTo(peer.node.as_array(),
new FindIntroMessage(target, peer.txid, R));
}
void
TagLookup::SendReply()
{
std::set< service::IntroSet > found;
for(const auto &remoteTag : valuesFound)
{
found.insert(remoteTag);
}
// collect our local values if we haven't hit a limit
if(found.size() < 2)
{
for(const auto &localTag :
parent->FindRandomIntroSetsWithTagExcluding(target, 1, found))
{
found.insert(localTag);
}
}
std::vector< service::IntroSet > values;
for(const auto &introset : found)
{
values.push_back(introset);
}
parent->DHTSendTo(whoasked.node.as_array(),
new GotIntroMessage(values, whoasked.txid));
}
} // namespace dht
} // namespace llarp

@ -0,0 +1,44 @@
#ifndef LLARP_DHT_TAGLOOKUP
#define LLARP_DHT_TAGLOOKUP
#include <dht/tx.hpp>
#include <service/IntroSet.hpp>
#include <service/tag.hpp>
namespace llarp
{
namespace dht
{
struct TagLookup : public TX< service::Tag, service::IntroSet >
{
uint64_t R;
TagLookup(const TXOwner &asker, const service::Tag &tag, Context *ctx,
uint64_t r)
: TX< service::Tag, service::IntroSet >(asker, tag, ctx), R(r)
{
}
bool
Validate(const service::IntroSet &introset) const override;
void
Start(const TXOwner &peer) override;
bool
GetNextPeer(Key_t &, const std::set< Key_t > &) override
{
return false;
}
void
DoNextRequest(const Key_t &) override
{
}
void
SendReply() override;
};
} // namespace dht
} // namespace llarp
#endif

@ -0,0 +1 @@
#include <dht/tx.hpp>

@ -0,0 +1,107 @@
#ifndef LLARP_DHT_TX
#define LLARP_DHT_TX
#include <dht/key.hpp>
#include <dht/txowner.hpp>
#include <util/logger.hpp>
#include <set>
#include <vector>
namespace llarp
{
struct Router;
namespace dht
{
struct Context;
template < typename K, typename V >
struct TX
{
K target;
Context* parent;
std::set< Key_t > peersAsked;
std::vector< V > valuesFound;
TXOwner whoasked;
TX(const TXOwner& asker, const K& k, Context* p)
: target(k), whoasked(asker)
{
parent = p;
}
virtual ~TX(){};
virtual bool
Validate(const V& value) const = 0;
void
OnFound(const Key_t askedPeer, const V& value);
virtual void
Start(const TXOwner& peer) = 0;
virtual bool
GetNextPeer(Key_t& next, const std::set< Key_t >& exclude) = 0;
virtual void
DoNextRequest(const Key_t& peer) = 0;
/// return true if we want to persist this tx
bool
AskNextPeer(const Key_t& prevPeer, const std::unique_ptr< Key_t >& next);
virtual void
SendReply() = 0;
};
template < typename K, typename V >
inline void
TX< K, V >::OnFound(const Key_t askedPeer, const V& value)
{
peersAsked.insert(askedPeer);
if(Validate(value))
{
valuesFound.push_back(value);
}
}
template < typename K, typename V >
inline bool
TX< K, V >::AskNextPeer(const Key_t& prevPeer,
const std::unique_ptr< Key_t >& next)
{
peersAsked.insert(prevPeer);
Key_t peer;
if(next)
{
// explicit next peer provided
peer = *next;
}
else if(!GetNextPeer(peer, peersAsked))
{
// no more peers
llarp::LogInfo("no more peers for request asking for", target);
return false;
}
const Key_t targetKey{target};
if((prevPeer ^ targetKey) < (peer ^ targetKey))
{
// next peer is not closer
llarp::LogInfo("next peer ", peer, " is not closer to ", target,
" than ", prevPeer);
return false;
}
else
{
peersAsked.insert(peer);
}
DoNextRequest(peer);
return true;
}
} // namespace dht
} // namespace llarp
#endif

@ -0,0 +1 @@
#include <dht/txholder.hpp>

@ -0,0 +1,190 @@
#ifndef LLARP_DHT_TXHOLDER
#define LLARP_DHT_TXHOLDER
#include <dht/tx.hpp>
#include <dht/txowner.hpp>
#include <util/logger.hpp>
#include <util/time.hpp>
#include <memory>
#include <unordered_map>
namespace llarp
{
namespace dht
{
template < typename K, typename V, typename K_Hash,
llarp_time_t requestTimeoutMS = 5000UL >
struct TXHolder
{
using TXPtr = std::unique_ptr< TX< K, V > >;
// tx who are waiting for a reply for each key
std::unordered_multimap< K, TXOwner, K_Hash > waiting;
// tx timesouts by key
std::unordered_map< K, llarp_time_t, K_Hash > timeouts;
// maps remote peer with tx to handle reply from them
std::unordered_map< TXOwner, TXPtr, TXOwner::Hash > tx;
const TX< K, V >*
GetPendingLookupFrom(const TXOwner& owner) const;
bool
HasLookupFor(const K& target) const
{
return timeouts.find(target) != timeouts.end();
}
bool
HasPendingLookupFrom(const TXOwner& owner) const
{
return GetPendingLookupFrom(owner) != nullptr;
}
void
NewTX(const TXOwner& askpeer, const TXOwner& whoasked, const K& k,
TX< K, V >* t);
/// mark tx as not fond
void
NotFound(const TXOwner& from, const std::unique_ptr< Key_t >& next);
void
Found(const TXOwner& from, const K& k, const std::vector< V >& values)
{
Inform(from, k, values, true);
}
/// inform all watches for key of values found
void
Inform(TXOwner from, K key, std::vector< V > values,
bool sendreply = false, bool removeTimeouts = true);
void
Expire(llarp_time_t now);
};
template < typename K, typename V, typename K_Hash,
llarp_time_t requestTimeoutMS >
const TX< K, V >*
TXHolder< K, V, K_Hash, requestTimeoutMS >::GetPendingLookupFrom(
const TXOwner& owner) const
{
auto itr = tx.find(owner);
if(itr == tx.end())
{
return nullptr;
}
else
{
return itr->second.get();
}
}
template < typename K, typename V, typename K_Hash,
llarp_time_t requestTimeoutMS >
void
TXHolder< K, V, K_Hash, requestTimeoutMS >::NewTX(const TXOwner& askpeer,
const TXOwner& whoasked,
const K& k, TX< K, V >* t)
{
(void)whoasked;
tx.emplace(askpeer, std::unique_ptr< TX< K, V > >(t));
auto count = waiting.count(k);
waiting.emplace(k, askpeer);
auto itr = timeouts.find(k);
if(itr == timeouts.end())
{
timeouts.emplace(k, time_now_ms() + requestTimeoutMS);
}
if(count == 0)
{
t->Start(askpeer);
}
}
template < typename K, typename V, typename K_Hash,
llarp_time_t requestTimeoutMS >
void
TXHolder< K, V, K_Hash, requestTimeoutMS >::NotFound(
const TXOwner& from, const std::unique_ptr< Key_t >& next)
{
bool sendReply = true;
auto txitr = tx.find(from);
if(txitr == tx.end())
{
return;
}
// ask for next peer
if(txitr->second->AskNextPeer(from.node, next))
{
sendReply = false;
}
llarp::LogWarn("Target key ", txitr->second->target);
Inform(from, txitr->second->target, {}, sendReply, sendReply);
}
template < typename K, typename V, typename K_Hash,
llarp_time_t requestTimeoutMS >
void
TXHolder< K, V, K_Hash, requestTimeoutMS >::Inform(TXOwner from, K key,
std::vector< V > values,
bool sendreply,
bool removeTimeouts)
{
auto range = waiting.equal_range(key);
auto itr = range.first;
while(itr != range.second)
{
auto txitr = tx.find(itr->second);
if(txitr != tx.end())
{
for(const auto& value : values)
{
txitr->second->OnFound(from.node, value);
}
if(sendreply)
{
txitr->second->SendReply();
tx.erase(txitr);
}
}
++itr;
}
if(sendreply)
{
waiting.erase(key);
}
if(removeTimeouts)
{
timeouts.erase(key);
}
}
template < typename K, typename V, typename K_Hash,
llarp_time_t requestTimeoutMS >
void
TXHolder< K, V, K_Hash, requestTimeoutMS >::Expire(llarp_time_t now)
{
auto itr = timeouts.begin();
while(itr != timeouts.end())
{
if(now > itr->second && now - itr->second >= requestTimeoutMS)
{
Inform(TXOwner{}, itr->first, {}, true, false);
itr = timeouts.erase(itr);
}
else
{
++itr;
}
}
}
} // namespace dht
} // namespace llarp
#endif

@ -8,6 +8,7 @@
#include <util/aligned.hpp>
#include <util/bencode.hpp>
#include <functional>
#include <vector>
#define MAX_RC_SIZE (1024)
@ -188,6 +189,9 @@ namespace llarp
bool
VerifySignature(llarp::Crypto *crypto) const;
};
using RouterLookupHandler =
std::function< void(const std::vector< RouterContact >&) >;
} // namespace llarp
#endif

@ -10,6 +10,7 @@
#include <util/time.hpp>
#include <algorithm>
#include <functional>
#include <iostream>
#include <vector>
@ -153,6 +154,10 @@ namespace llarp
bool
Verify(llarp::Crypto* crypto, llarp_time_t now) const;
};
using IntroSetLookupHandler =
std::function< void(const std::vector< IntroSet >&) >;
} // namespace service
} // namespace llarp

Loading…
Cancel
Save