lokinet/llarp/dht/context.cpp

717 lines
21 KiB
C++
Raw Normal View History

2018-12-12 00:48:54 +00:00
#include <dht/context.hpp>
2019-01-22 01:14:02 +00:00
#include <dht/explorenetworkjob.hpp>
#include <dht/localrouterlookup.hpp>
#include <dht/localserviceaddresslookup.hpp>
#include <dht/localtaglookup.hpp>
2019-01-16 00:24:16 +00:00
#include <dht/messages/findrouter.hpp>
#include <dht/messages/gotintro.hpp>
2018-12-12 00:48:54 +00:00
#include <dht/messages/gotrouter.hpp>
2019-01-16 00:24:16 +00:00
#include <dht/messages/pubintro.hpp>
2019-01-19 18:16:40 +00:00
#include <dht/node.hpp>
2019-01-22 01:14:02 +00:00
#include <dht/publishservicejob.hpp>
#include <dht/recursiverouterlookup.hpp>
#include <dht/serviceaddresslookup.hpp>
#include <dht/taglookup.hpp>
#include <messages/dht.hpp>
#include <messages/dht_immediate.hpp>
#include <path/path.hpp>
#include <router/abstractrouter.hpp>
#include <util/logic.hpp>
2019-03-27 12:36:27 +00:00
#include <nodedb.hpp>
#include <vector>
namespace llarp
{
namespace dht
{
2019-02-05 00:41:33 +00:00
AbstractContext::~AbstractContext()
{
}
struct Context final : public AbstractContext
{
Context();
~Context()
{
}
util::StatusObject
ExtractStatus() const override;
llarp::Crypto*
Crypto() const override;
/// on behalf of whoasked request introset for target from dht router with
/// key askpeer
void
LookupIntroSetRecursive(
const service::Address& target, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer, uint64_t R,
service::IntroSetLookupHandler result = nullptr) override;
void
LookupIntroSetIterative(
const service::Address& target, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer,
service::IntroSetLookupHandler result = nullptr) override;
/// on behalf of whoasked request router with public key target from dht
/// router with key askpeer
void
LookupRouterRecursive(const RouterID& target, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer,
RouterLookupHandler result = nullptr);
bool
LookupRouter(const RouterID& target, RouterLookupHandler result) override
{
Key_t askpeer;
if(!nodes->FindClosest(Key_t(target), askpeer))
{
return false;
}
LookupRouterRecursive(target, OurKey(), 0, askpeer, result);
return true;
}
bool
HasRouterLookup(const RouterID& target) const override
{
return pendingRouterLookups().HasLookupFor(target);
}
/// on behalf of whoasked request introsets with tag from dht router with
/// key askpeer with Recursion depth R
void
LookupTagRecursive(const service::Tag& tag, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer,
uint64_t R) override;
/// issue dht lookup for tag via askpeer and send reply to local path
void
LookupTagForPath(const service::Tag& tag, uint64_t txid,
const llarp::PathID_t& path,
const Key_t& askpeer) override;
/// issue dht lookup for router via askpeer and send reply to local path
void
LookupRouterForPath(const RouterID& target, uint64_t txid,
const PathID_t& path, const Key_t& askpeer) override;
/// issue dht lookup for introset for addr via askpeer and send reply to
/// local path
void
LookupIntroSetForPath(const service::Address& addr, uint64_t txid,
const llarp::PathID_t& path,
const Key_t& askpeer) override;
/// send a dht message to peer, if keepalive is true then keep the session
/// with that peer alive for 10 seconds
void
DHTSendTo(const RouterID& peer, IMessage* msg,
bool keepalive = true) override;
/// get routers closest to target excluding requester
bool
HandleExploritoryRouterLookup(
const Key_t& requester, uint64_t txid, const RouterID& target,
std::vector< std::unique_ptr< IMessage > >& reply) override;
std::set< service::IntroSet >
FindRandomIntroSetsWithTagExcluding(
const service::Tag& tag, size_t max = 2,
const std::set< service::IntroSet >& excludes = {}) override;
/// handle rc lookup from requester for target
void
LookupRouterRelayed(
const Key_t& requester, uint64_t txid, const Key_t& target,
bool recursive,
std::vector< std::unique_ptr< IMessage > >& replies) override;
/// relay a dht message from a local path to the main network
bool
RelayRequestForPath(const llarp::PathID_t& localPath,
const IMessage* msg) override;
/// send introset to peer from source with S counter and excluding peers
void
PropagateIntroSetTo(const Key_t& source, uint64_t sourceTX,
const service::IntroSet& introset, const Key_t& peer,
uint64_t S,
const std::set< Key_t >& exclude) override;
/// initialize dht context and explore every exploreInterval milliseconds
void
Init(const Key_t& us, AbstractRouter* router,
llarp_time_t exploreInterval) override;
/// get localally stored introset by service address
const llarp::service::IntroSet*
GetIntroSetByServiceAddress(
const llarp::service::Address& addr) const override;
static void
handle_cleaner_timer(void* user, uint64_t orig, uint64_t left);
static void
handle_explore_timer(void* user, uint64_t orig, uint64_t left);
/// explore dht for new routers
void
Explore(size_t N = 3);
llarp::AbstractRouter* router;
// for router contacts
std::unique_ptr< Bucket< RCNode > > nodes;
// for introduction sets
std::unique_ptr< Bucket< ISNode > > _services;
Bucket< ISNode >*
services() override
{
return _services.get();
}
bool allowTransit;
bool&
AllowTransit() override
{
return allowTransit;
}
const bool&
AllowTransit() const override
{
return allowTransit;
}
Bucket< RCNode >*
Nodes() const override
{
return nodes.get();
}
const Key_t&
OurKey() const override
{
return ourKey;
}
llarp::AbstractRouter*
GetRouter() const override
{
return router;
}
2019-03-27 12:36:27 +00:00
bool
GetRCFromNodeDB(const Key_t& k, llarp::RouterContact& rc) const override
{
return router->nodedb()->Get(k.as_array(), rc);
}
PendingIntrosetLookups _pendingIntrosetLookups;
PendingTagLookups _pendingTagLookups;
PendingRouterLookups _pendingRouterLookups;
PendingExploreLookups _pendingExploreLookups;
PendingIntrosetLookups&
pendingIntrosetLookups() override
{
return _pendingIntrosetLookups;
}
const PendingIntrosetLookups&
pendingIntrosetLookups() const override
{
return _pendingIntrosetLookups;
}
PendingTagLookups&
pendingTagLookups() override
{
return _pendingTagLookups;
}
const PendingTagLookups&
pendingTagLookups() const override
{
return _pendingTagLookups;
}
PendingRouterLookups&
pendingRouterLookups() override
{
return _pendingRouterLookups;
}
const PendingRouterLookups&
pendingRouterLookups() const override
{
return _pendingRouterLookups;
}
PendingExploreLookups&
pendingExploreLookups() override
{
return _pendingExploreLookups;
}
const PendingExploreLookups&
pendingExploreLookups() const override
{
return _pendingExploreLookups;
}
uint64_t
NextID()
{
return ++ids;
}
llarp_time_t
Now() const override;
void
ExploreNetworkVia(const Key_t& peer) override;
private:
void
ScheduleCleanupTimer();
void
CleanupTX();
uint64_t ids;
Key_t ourKey;
};
2019-01-22 01:14:02 +00:00
Context::Context() : router(nullptr), allowTransit(false)
{
randombytes((byte_t*)&ids, sizeof(uint64_t));
}
void
2018-09-06 20:31:58 +00:00
Context::Explore(size_t N)
{
2018-08-29 20:40:26 +00:00
// ask N random peers for new routers
llarp::LogInfo("Exploring network via ", N, " peers");
2018-08-29 20:40:26 +00:00
std::set< Key_t > peers;
2018-09-06 20:31:58 +00:00
if(nodes->GetManyRandom(peers, N))
{
for(const auto& peer : peers)
2018-08-29 20:40:26 +00:00
ExploreNetworkVia(peer);
}
2018-08-29 20:40:26 +00:00
else
llarp::LogError("failed to select random nodes for exploration");
}
2018-08-29 20:40:26 +00:00
void
Context::ExploreNetworkVia(const Key_t& askpeer)
2018-08-29 20:40:26 +00:00
{
2018-11-28 15:27:36 +00:00
uint64_t txid = ++ids;
TXOwner peer(askpeer, txid);
TXOwner whoasked(OurKey(), txid);
pendingExploreLookups().NewTX(
peer, whoasked, askpeer.as_array(),
new ExploreNetworkJob(askpeer.as_array(), this));
}
void
Context::handle_explore_timer(void* u, uint64_t orig, uint64_t left)
{
if(left)
return;
Context* ctx = static_cast< Context* >(u);
2018-09-08 08:27:05 +00:00
ctx->Explore(1);
ctx->router->logic()->call_later({orig, ctx, &handle_explore_timer});
}
void
Context::handle_cleaner_timer(void* u,
__attribute__((unused)) uint64_t orig,
uint64_t left)
{
if(left)
return;
Context* ctx = static_cast< Context* >(u);
// clean up transactions
ctx->CleanupTX();
if(ctx->_services)
2018-07-18 00:25:24 +00:00
{
// expire intro sets
2018-10-29 16:48:36 +00:00
auto now = ctx->Now();
auto& nodes = ctx->_services->nodes;
2018-07-18 00:25:24 +00:00
auto itr = nodes.begin();
while(itr != nodes.end())
{
if(itr->second.introset.IsExpired(now))
2018-08-04 02:59:32 +00:00
{
llarp::LogDebug("introset expired ", itr->second.introset.A.Addr());
2018-07-18 00:25:24 +00:00
itr = nodes.erase(itr);
2018-08-04 02:59:32 +00:00
}
2018-07-18 00:25:24 +00:00
else
++itr;
}
}
ctx->ScheduleCleanupTimer();
}
std::set< service::IntroSet >
2018-08-29 20:40:26 +00:00
Context::FindRandomIntroSetsWithTagExcluding(
const service::Tag& tag, size_t max,
const std::set< service::IntroSet >& exclude)
{
std::set< service::IntroSet > found;
auto& nodes = _services->nodes;
2018-07-18 22:50:05 +00:00
if(nodes.size() == 0)
2018-08-10 03:51:38 +00:00
{
2018-07-18 22:50:05 +00:00
return found;
2018-08-10 03:51:38 +00:00
}
2018-07-18 22:50:05 +00:00
auto itr = nodes.begin();
// start at random middle point
auto start = llarp::randint() % nodes.size();
2018-07-18 22:50:05 +00:00
std::advance(itr, start);
2018-08-04 02:59:32 +00:00
auto end = itr;
std::string tagname = tag.ToString();
2018-07-18 22:50:05 +00:00
while(itr != nodes.end())
{
2018-08-04 02:59:32 +00:00
if(itr->second.introset.topic.ToString() == tagname)
2018-07-18 22:50:05 +00:00
{
2018-08-29 20:40:26 +00:00
if(exclude.count(itr->second.introset) == 0)
{
found.insert(itr->second.introset);
if(found.size() == max)
return found;
}
2018-07-18 22:50:05 +00:00
}
++itr;
}
itr = nodes.begin();
while(itr != end)
{
2018-08-04 02:59:32 +00:00
if(itr->second.introset.topic.ToString() == tagname)
2018-07-18 22:50:05 +00:00
{
2018-08-29 20:40:26 +00:00
if(exclude.count(itr->second.introset) == 0)
{
found.insert(itr->second.introset);
if(found.size() == max)
return found;
}
2018-07-18 22:50:05 +00:00
}
++itr;
}
return found;
2018-07-18 03:10:21 +00:00
}
void
Context::LookupRouterRelayed(
const Key_t& requester, uint64_t txid, const Key_t& target,
bool recursive, std::vector< std::unique_ptr< IMessage > >& replies)
{
if(target == ourKey)
{
// we are the target, give them our RC
replies.emplace_back(
new GotRouterMessage(requester, txid, {router->rc()}, false));
return;
}
Key_t next;
std::set< Key_t > excluding = {requester, ourKey};
if(nodes->FindCloseExcluding(target, next, excluding))
{
if(next == target)
{
// we know it
replies.emplace_back(new GotRouterMessage(
2018-08-30 18:48:43 +00:00
requester, txid, {nodes->nodes[target].rc}, false));
}
else if(recursive) // are we doing a recursive lookup?
{
2018-08-29 20:40:26 +00:00
// is the next peer we ask closer to the target than us?
if((next ^ target) < (ourKey ^ target))
{
// yes it is closer, ask neighbour recursively
LookupRouterRecursive(target.as_array(), requester, txid, next);
}
else
{
2018-08-29 20:40:26 +00:00
// no we are closer to the target so tell requester it's not there
// so they switch to iterative lookup
replies.emplace_back(
new GotRouterMessage(requester, txid, {}, false));
}
}
2018-11-08 15:15:02 +00:00
else
{
2018-11-08 15:15:02 +00:00
// iterative lookup and we don't have it tell them who is closer
replies.emplace_back(
2018-11-08 15:15:02 +00:00
new GotRouterMessage(requester, next, txid, false));
}
}
else
{
2018-08-29 20:40:26 +00:00
// we don't know it and have no closer peers to ask
replies.emplace_back(new GotRouterMessage(requester, txid, {}, false));
}
}
const llarp::service::IntroSet*
Context::GetIntroSetByServiceAddress(
const llarp::service::Address& addr) const
{
auto key = addr.ToKey();
auto itr = _services->nodes.find(key);
if(itr == _services->nodes.end())
return nullptr;
return &itr->second.introset;
}
void
Context::CleanupTX()
{
2018-10-29 16:48:36 +00:00
auto now = Now();
llarp::LogDebug("DHT tick");
pendingRouterLookups().Expire(now);
_pendingIntrosetLookups.Expire(now);
pendingTagLookups().Expire(now);
pendingExploreLookups().Expire(now);
}
2019-02-11 17:14:43 +00:00
util::StatusObject
Context::ExtractStatus() const
2019-02-08 19:43:25 +00:00
{
2019-02-11 17:14:43 +00:00
util::StatusObject obj{
{"pendingRouterLookups", pendingRouterLookups().ExtractStatus()},
{"pendingIntrosetLookups", _pendingIntrosetLookups.ExtractStatus()},
{"pendingTagLookups", pendingTagLookups().ExtractStatus()},
{"pendingExploreLookups", pendingExploreLookups().ExtractStatus()},
2019-02-11 17:14:43 +00:00
{"nodes", nodes->ExtractStatus()},
{"services", _services->ExtractStatus()},
2019-02-11 17:14:43 +00:00
{"ourKey", ourKey.ToHex()}};
return obj;
2019-02-08 19:43:25 +00:00
}
void
Context::Init(const Key_t& us, AbstractRouter* r,
llarp_time_t exploreInterval)
{
router = r;
ourKey = us;
nodes = std::make_unique< Bucket< RCNode > >(ourKey, llarp::randint);
_services = std::make_unique< Bucket< ISNode > >(ourKey, llarp::randint);
2019-01-16 00:24:16 +00:00
llarp::LogDebug("initialize dht with key ", ourKey);
// start exploring
2018-12-10 14:14:55 +00:00
r->logic()->call_later(
{exploreInterval, this, &llarp::dht::Context::handle_explore_timer});
2018-10-27 20:02:24 +00:00
// start cleanup timer
ScheduleCleanupTimer();
}
void
Context::ScheduleCleanupTimer()
{
router->logic()->call_later({1000, this, &handle_cleaner_timer});
}
2018-08-10 21:34:11 +00:00
void
Context::DHTSendTo(const RouterID& peer, IMessage* msg, bool keepalive)
2018-08-10 21:34:11 +00:00
{
2019-01-19 13:54:50 +00:00
llarp::DHTImmediateMessage m;
m.msgs.emplace_back(msg);
router->SendToOrQueue(peer, &m);
2018-08-29 20:40:26 +00:00
if(keepalive)
{
2018-10-29 16:48:36 +00:00
auto now = Now();
2018-08-29 20:40:26 +00:00
router->PersistSessionUntil(peer, now + 10000);
}
2018-08-10 21:34:11 +00:00
}
bool
Context::RelayRequestForPath(const llarp::PathID_t& id, const IMessage* msg)
{
llarp::routing::DHTMessage reply;
if(!msg->HandleMessage(router->dht(), reply.M))
return false;
2019-01-22 01:14:02 +00:00
if(!reply.M.empty())
2018-08-10 03:51:38 +00:00
{
auto path = router->pathContext().GetByUpstream(router->pubkey(), id);
2018-08-10 03:51:38 +00:00
return path && path->SendRoutingMessage(&reply, router);
}
return true;
}
2018-08-29 20:40:26 +00:00
void
Context::LookupIntroSetForPath(const service::Address& addr, uint64_t txid,
const llarp::PathID_t& path,
const Key_t& askpeer)
2018-08-29 20:40:26 +00:00
{
TXOwner asker(OurKey(), txid);
TXOwner peer(askpeer, ++ids);
_pendingIntrosetLookups.NewTX(
2018-11-08 15:15:02 +00:00
peer, asker, addr,
2018-08-29 20:40:26 +00:00
new LocalServiceAddressLookup(path, txid, addr, this, askpeer));
}
void
Context::PropagateIntroSetTo(const Key_t& from, uint64_t txid,
const service::IntroSet& introset,
const Key_t& tellpeer, uint64_t S,
const std::set< Key_t >& exclude)
2018-08-29 20:40:26 +00:00
{
TXOwner asker(from, txid);
TXOwner peer(tellpeer, ++ids);
service::Address addr = introset.A.Addr();
_pendingIntrosetLookups.NewTX(
2018-11-08 15:15:02 +00:00
peer, asker, addr,
new PublishServiceJob(asker, introset, this, S, exclude));
2018-08-29 20:40:26 +00:00
}
void
Context::LookupIntroSetRecursive(const service::Address& addr,
const Key_t& whoasked, uint64_t txid,
const Key_t& askpeer, uint64_t R,
2019-01-22 01:14:02 +00:00
service::IntroSetLookupHandler handler)
2018-08-29 20:40:26 +00:00
{
TXOwner asker(whoasked, txid);
TXOwner peer(askpeer, ++ids);
_pendingIntrosetLookups.NewTX(
2018-11-08 15:15:02 +00:00
peer, asker, addr,
new ServiceAddressLookup(asker, addr, this, R, handler));
2018-08-29 20:40:26 +00:00
}
void
Context::LookupIntroSetIterative(const service::Address& addr,
const Key_t& whoasked, uint64_t txid,
const Key_t& askpeer,
2019-01-22 01:14:02 +00:00
service::IntroSetLookupHandler handler)
2018-08-29 20:40:26 +00:00
{
TXOwner asker(whoasked, txid);
TXOwner peer(askpeer, ++ids);
_pendingIntrosetLookups.NewTX(
2018-11-08 15:15:02 +00:00
peer, asker, addr,
new ServiceAddressLookup(asker, addr, this, 0, handler));
2018-08-29 20:40:26 +00:00
}
void
Context::LookupTagRecursive(const service::Tag& tag, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer,
2018-08-29 20:40:26 +00:00
uint64_t R)
2018-07-17 04:37:50 +00:00
{
2018-08-29 20:40:26 +00:00
TXOwner asker(whoasked, whoaskedTX);
TXOwner peer(askpeer, ++ids);
_pendingTagLookups.NewTX(peer, asker, tag,
new TagLookup(asker, tag, this, R));
llarp::LogInfo("ask ", askpeer, " for ", tag, " on behalf of ", whoasked,
" R=", R);
}
void
Context::LookupTagForPath(const service::Tag& tag, uint64_t txid,
const llarp::PathID_t& path, const Key_t& askpeer)
{
TXOwner peer(askpeer, ++ids);
2018-11-08 19:25:04 +00:00
TXOwner whoasked(OurKey(), txid);
_pendingTagLookups.NewTX(peer, whoasked, tag,
new LocalTagLookup(path, txid, tag, this));
2018-07-17 04:37:50 +00:00
}
bool
Context::HandleExploritoryRouterLookup(
const Key_t& requester, uint64_t txid, const RouterID& target,
std::vector< std::unique_ptr< IMessage > >& reply)
{
std::vector< RouterID > closer;
Key_t t(target.as_array());
std::set< Key_t > found;
2018-11-19 17:14:35 +00:00
if(!nodes)
return false;
2018-12-18 18:36:19 +00:00
size_t nodeCount = nodes->size();
if(nodeCount == 0)
{
llarp::LogError(
"cannot handle exploritory router lookup, no dht peers");
return false;
}
llarp::LogDebug("We have ", nodes->size(),
2018-12-23 13:29:11 +00:00
" connected nodes into the DHT");
// ourKey should never be in the connected list
// requester is likely in the connected list
// 4 or connection nodes (minus a potential requestor), whatever is less
size_t want = std::min(size_t(4), nodeCount - 1);
llarp::LogDebug("We want ", want, " connected nodes in the DHT");
if(!nodes->GetManyNearExcluding(t, found, want,
std::set< Key_t >{ourKey, requester}))
{
llarp::LogError(
"not enough dht nodes to handle exploritory router lookup, "
"need a minimum of ",
want, " dht peers");
return false;
}
for(const auto& f : found)
closer.emplace_back(f.as_array());
reply.emplace_back(new GotRouterMessage(txid, closer, false));
return true;
}
2018-08-31 12:46:54 +00:00
void
Context::LookupRouterForPath(const RouterID& target, uint64_t txid,
const llarp::PathID_t& path,
const Key_t& askpeer)
2018-08-31 12:46:54 +00:00
{
TXOwner peer(askpeer, ++ids);
2018-11-08 15:15:02 +00:00
TXOwner whoasked(OurKey(), txid);
_pendingRouterLookups.NewTX(
2018-11-08 15:15:02 +00:00
peer, whoasked, target,
new LocalRouterLookup(path, txid, target, this));
2018-08-31 12:46:54 +00:00
}
2018-08-29 20:40:26 +00:00
void
Context::LookupRouterRecursive(const RouterID& target,
const Key_t& whoasked, uint64_t txid,
const Key_t& askpeer,
2018-08-30 18:48:43 +00:00
RouterLookupHandler handler)
2018-08-29 20:40:26 +00:00
{
TXOwner asker(whoasked, txid);
2018-11-28 16:38:20 +00:00
TXOwner peer(askpeer, ++ids);
2018-09-15 11:54:08 +00:00
if(target != askpeer)
{
_pendingRouterLookups.NewTX(
2018-11-08 15:15:02 +00:00
peer, asker, target,
2018-09-15 11:54:08 +00:00
new RecursiveRouterLookup(asker, target, this, handler));
}
}
llarp::Crypto*
Context::Crypto() const
{
return router->crypto();
}
2018-10-29 16:48:36 +00:00
llarp_time_t
Context::Now() const
2018-10-29 16:48:36 +00:00
{
return router->Now();
2018-10-29 16:48:36 +00:00
}
2019-02-22 19:04:47 +00:00
std::unique_ptr< AbstractContext >
makeContext()
{
return std::make_unique< Context >();
}
} // namespace dht
2018-07-13 09:27:57 +00:00
} // namespace llarp