lokinet/llarp/dht/context.hpp

439 lines
12 KiB
C++
Raw Normal View History

#ifndef LLARP_DHT_CONTEXT_HPP
#define LLARP_DHT_CONTEXT_HPP
2018-12-12 00:48:54 +00:00
#include <dht.h>
#include <dht/bucket.hpp>
#include <dht/key.hpp>
#include <dht/message.hpp>
#include <dht/messages/findintro.hpp>
#include <dht/node.hpp>
2018-12-12 02:15:08 +00:00
#include <service/IntroSet.hpp>
#include <time.hpp>
#include <set>
namespace llarp
{
struct Router;
namespace dht
{
2018-08-29 20:40:26 +00:00
struct TXOwner
{
Key_t node;
uint64_t txid = 0;
TXOwner() = default;
TXOwner(const Key_t& k, uint64_t id) : node(k), txid(id)
{
}
bool
operator==(const TXOwner& other) const
{
return txid == other.txid && node == other.node;
}
bool
operator<(const TXOwner& other) const
{
return txid < other.txid || node < other.node;
}
struct Hash
{
std::size_t
operator()(TXOwner const& o) const noexcept
{
std::size_t sz2;
memcpy(&sz2, &o.node[0], sizeof(std::size_t));
return o.txid ^ (sz2 << 1);
}
};
};
struct Context;
template < typename K, typename V >
struct TX
{
TX(const TXOwner& asker, const K& k, Context* p)
: target(k), whoasked(asker)
{
parent = p;
}
2018-09-06 11:46:19 +00:00
virtual ~TX(){};
2018-08-29 20:40:26 +00:00
K target;
Context* parent;
std::set< Key_t > peersAsked;
std::vector< V > valuesFound;
TXOwner whoasked;
virtual bool
Validate(const V& value) const = 0;
2018-08-29 20:40:26 +00:00
void
OnFound(const Key_t askedPeer, const V& value)
2018-08-29 20:40:26 +00:00
{
peersAsked.insert(askedPeer);
if(Validate(value))
valuesFound.push_back(value);
2018-08-29 20:40:26 +00:00
}
virtual void
Start(const TXOwner& peer) = 0;
virtual bool
GetNextPeer(Key_t& next, const std::set< Key_t >& exclude) = 0;
virtual void
DoNextRequest(const Key_t& peer) = 0;
/// return true if we want to persist this tx
bool
2018-11-08 15:15:02 +00:00
AskNextPeer(const Key_t& prevPeer, const std::unique_ptr< Key_t >& next)
2018-08-29 20:40:26 +00:00
{
peersAsked.insert(prevPeer);
Key_t peer;
2018-11-08 15:15:02 +00:00
if(next)
2018-08-29 20:40:26 +00:00
{
// explicit next peer provided
peer = *next;
2018-11-08 15:15:02 +00:00
}
else if(!GetNextPeer(peer, peersAsked))
2018-11-08 15:15:02 +00:00
{
// no more peers
2018-11-22 15:52:04 +00:00
llarp::LogInfo("no more peers for request asking for", target);
return false;
2018-08-29 20:40:26 +00:00
}
2018-11-08 15:15:02 +00:00
const Key_t targetKey{target};
if((prevPeer ^ targetKey) < (peer ^ targetKey))
{
// next peer is not closer
2018-11-22 15:53:11 +00:00
llarp::LogInfo("next peer ", peer, " is not closer to ", target,
" than ", prevPeer);
return false;
}
else
{
peersAsked.insert(peer);
}
2018-08-29 20:40:26 +00:00
DoNextRequest(peer);
return true;
}
virtual void
SendReply() = 0;
};
using IntroSetLookupHandler =
std::function< void(const std::vector< service::IntroSet >&) >;
2018-08-29 20:40:26 +00:00
using RouterLookupHandler =
std::function< void(const std::vector< RouterContact >&) >;
2018-08-30 18:48:43 +00:00
struct Context
{
Context();
~Context();
llarp::Crypto*
Crypto();
2018-08-29 20:40:26 +00:00
/// on behalf of whoasked request introset for target from dht router with
/// key askpeer
void
2018-08-29 20:40:26 +00:00
LookupIntroSetRecursive(const service::Address& target,
const Key_t& whoasked, uint64_t whoaskedTX,
const Key_t& askpeer, uint64_t R,
IntroSetLookupHandler result = nullptr);
2018-07-12 18:21:44 +00:00
void
2018-08-29 20:40:26 +00:00
LookupIntroSetIterative(const service::Address& target,
const Key_t& whoasked, uint64_t whoaskedTX,
const Key_t& askpeer,
IntroSetLookupHandler result = nullptr);
2018-07-12 18:21:44 +00:00
2018-08-29 20:40:26 +00:00
/// on behalf of whoasked request router with public key target from dht
/// router with key askpeer
void
2018-08-29 20:40:26 +00:00
LookupRouterRecursive(const RouterID& target, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer,
2018-08-30 18:48:43 +00:00
RouterLookupHandler result = nullptr);
bool
LookupRouter(const RouterID& target, RouterLookupHandler result)
{
Key_t askpeer;
if(!nodes->FindClosest(Key_t(target), askpeer))
2018-08-30 18:48:43 +00:00
return false;
2018-11-28 16:38:20 +00:00
LookupRouterRecursive(target, OurKey(), 0, askpeer, result);
2018-08-30 18:48:43 +00:00
return true;
}
bool
HasRouterLookup(const RouterID& target) const
{
return pendingRouterLookups.HasLookupFor(target);
}
2018-08-29 20:40:26 +00:00
/// on behalf of whoasked request introsets with tag from dht router with
/// key askpeer with Recursion depth R
2018-07-17 04:37:50 +00:00
void
2018-08-29 20:40:26 +00:00
LookupTagRecursive(const service::Tag& tag, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer, uint64_t R);
2018-07-17 04:37:50 +00:00
2018-08-29 20:40:26 +00:00
/// issue dht lookup for tag via askpeer and send reply to local path
void
LookupTagForPath(const service::Tag& tag, uint64_t txid,
const llarp::PathID_t& path, const Key_t& askpeer);
2018-08-30 18:48:43 +00:00
/// issue dht lookup for router via askpeer and send reply to local path
void
LookupRouterForPath(const RouterID& target, uint64_t txid,
const llarp::PathID_t& path, const Key_t& askpeer);
2018-08-29 20:40:26 +00:00
/// issue dht lookup for introset for addr via askpeer and send reply to
/// local path
2018-08-02 01:41:40 +00:00
void
LookupIntroSetForPath(const service::Address& addr, uint64_t txid,
2018-08-29 20:40:26 +00:00
const llarp::PathID_t& path, const Key_t& askpeer);
2018-08-10 21:34:11 +00:00
2018-08-29 20:40:26 +00:00
/// send a dht message to peer, if keepalive is true then keep the session
/// with that peer alive for 10 seconds
2018-08-10 21:34:11 +00:00
void
DHTSendTo(const RouterID& peer, IMessage* msg, bool keepalive = true);
2018-08-02 01:41:40 +00:00
2018-08-29 20:40:26 +00:00
/// get routers closest to target excluding requester
bool
HandleExploritoryRouterLookup(
const Key_t& requester, uint64_t txid, const RouterID& target,
std::vector< std::unique_ptr< IMessage > >& reply);
2018-08-02 04:34:46 +00:00
std::set< service::IntroSet >
2018-08-29 20:40:26 +00:00
FindRandomIntroSetsWithTagExcluding(
const service::Tag& tag, size_t max = 2,
const std::set< service::IntroSet >& excludes = {});
2018-08-29 20:40:26 +00:00
/// handle rc lookup from requester for target
void
LookupRouterRelayed(const Key_t& requester, uint64_t txid,
const Key_t& target, bool recursive,
std::vector< std::unique_ptr< IMessage > >& replies);
2018-08-29 20:40:26 +00:00
/// relay a dht messeage from a local path to the main network
bool
RelayRequestForPath(const llarp::PathID_t& localPath,
const IMessage* msg);
2018-07-18 22:50:16 +00:00
2018-08-29 20:40:26 +00:00
/// send introset to peer from source with S counter and excluding peers
2018-07-18 22:50:16 +00:00
void
2018-08-29 20:40:26 +00:00
PropagateIntroSetTo(const Key_t& source, uint64_t sourceTX,
2018-07-19 04:58:39 +00:00
const service::IntroSet& introset, const Key_t& peer,
2018-08-01 22:10:38 +00:00
uint64_t S, const std::set< Key_t >& exclude);
2018-08-29 20:40:26 +00:00
/// initialize dht context and explore every exploreInterval milliseconds
void
Init(const Key_t& us, llarp::Router* router,
llarp_time_t exploreInterval);
2018-08-29 20:40:26 +00:00
/// get localally stored introset by service address
const llarp::service::IntroSet*
GetIntroSetByServiceAddress(const llarp::service::Address& addr) const;
static void
handle_cleaner_timer(void* user, uint64_t orig, uint64_t left);
static void
handle_explore_timer(void* user, uint64_t orig, uint64_t left);
/// explore dht for new routers
void
2018-09-06 20:31:58 +00:00
Explore(size_t N = 3);
llarp::Router* router = nullptr;
// for router contacts
Bucket< RCNode >* nodes = nullptr;
// for introduction sets
Bucket< ISNode >* services = nullptr;
bool allowTransit = false;
const Key_t&
OurKey() const
{
return ourKey;
}
2018-08-29 20:40:26 +00:00
template < typename K, typename V, typename K_Hash,
llarp_time_t requestTimeoutMS = 5000UL >
struct TXHolder
{
// tx who are waiting for a reply for each key
std::unordered_multimap< K, TXOwner, K_Hash > waiting;
// tx timesouts by key
std::unordered_map< K, llarp_time_t, K_Hash > timeouts;
// maps remote peer with tx to handle reply from them
std::unordered_map< TXOwner, std::unique_ptr< TX< K, V > >,
TXOwner::Hash >
tx;
const TX< K, V >*
GetPendingLookupFrom(const TXOwner& owner) const
{
auto itr = tx.find(owner);
if(itr == tx.end())
return nullptr;
else
return itr->second.get();
}
bool
HasLookupFor(const K& target) const
{
return timeouts.find(target) != timeouts.end();
}
2018-08-29 20:40:26 +00:00
bool
HasPendingLookupFrom(const TXOwner& owner) const
{
return GetPendingLookupFrom(owner) != nullptr;
}
void
2018-11-08 15:15:02 +00:00
NewTX(const TXOwner& askpeer, const TXOwner& whoasked, const K& k,
2018-11-08 19:25:04 +00:00
TX< K, V >* t)
2018-08-29 20:40:26 +00:00
{
2018-11-08 19:25:04 +00:00
(void)whoasked;
2018-11-08 15:15:02 +00:00
tx.emplace(askpeer, std::unique_ptr< TX< K, V > >(t));
2018-11-22 15:52:04 +00:00
auto count = waiting.count(k);
2018-11-08 19:25:04 +00:00
waiting.insert(std::make_pair(k, askpeer));
2018-08-29 20:40:26 +00:00
auto itr = timeouts.find(k);
if(itr == timeouts.end())
{
2018-08-29 20:40:26 +00:00
timeouts.insert(
2018-11-19 22:45:37 +00:00
std::make_pair(k, time_now_ms() + requestTimeoutMS));
}
2018-11-22 15:52:04 +00:00
if(count == 0)
2018-11-22 15:53:11 +00:00
t->Start(askpeer);
2018-08-29 20:40:26 +00:00
}
2018-08-29 20:40:26 +00:00
/// mark tx as not fond
void
2018-11-08 15:15:02 +00:00
NotFound(const TXOwner& from, const std::unique_ptr< Key_t >& next)
2018-08-29 20:40:26 +00:00
{
bool sendReply = true;
auto txitr = tx.find(from);
if(txitr == tx.end())
return;
// ask for next peer
if(txitr->second->AskNextPeer(from.node, next))
sendReply = false;
llarp::LogWarn("Target key ", txitr->second->target);
2018-08-29 20:40:26 +00:00
Inform(from, txitr->second->target, {}, sendReply, sendReply);
}
2018-08-29 20:40:26 +00:00
void
Found(const TXOwner& from, const K& k, const std::vector< V >& values)
{
Inform(from, k, values, true);
}
2018-08-29 20:40:26 +00:00
/// inform all watches for key of values found
void
2018-10-03 10:59:06 +00:00
Inform(TXOwner from, K key, std::vector< V > values,
2018-09-27 12:47:21 +00:00
bool sendreply = false, bool removeTimeouts = true)
{
2018-08-29 20:40:26 +00:00
auto range = waiting.equal_range(key);
auto itr = range.first;
while(itr != range.second)
{
auto txitr = tx.find(itr->second);
if(txitr != tx.end())
{
for(const auto& value : values)
txitr->second->OnFound(from.node, value);
if(sendreply)
{
txitr->second->SendReply();
tx.erase(txitr);
}
}
++itr;
}
2018-10-04 13:03:48 +00:00
if(sendreply)
waiting.erase(key);
2018-08-29 20:40:26 +00:00
if(removeTimeouts)
timeouts.erase(key);
}
2018-08-29 20:40:26 +00:00
void
Expire(llarp_time_t now)
{
2018-08-29 20:40:26 +00:00
auto itr = timeouts.begin();
while(itr != timeouts.end())
{
if(now > itr->second && now - itr->second >= requestTimeoutMS)
{
Inform(TXOwner{}, itr->first, {}, true, false);
itr = timeouts.erase(itr);
}
else
++itr;
}
}
};
2018-08-29 20:40:26 +00:00
TXHolder< service::Address, service::IntroSet, service::Address::Hash >
pendingIntrosetLookups;
TXHolder< service::Tag, service::IntroSet, service::Tag::Hash >
pendingTagLookups;
2018-08-30 18:48:43 +00:00
TXHolder< RouterID, RouterContact, RouterID::Hash > pendingRouterLookups;
2018-08-29 20:40:26 +00:00
TXHolder< RouterID, RouterID, RouterID::Hash > pendingExploreLookups;
uint64_t
NextID()
{
2018-08-29 20:40:26 +00:00
return ++ids;
}
2018-10-29 16:48:36 +00:00
llarp_time_t
Now();
2018-08-29 20:40:26 +00:00
void
ExploreNetworkVia(const Key_t& peer);
2018-11-28 15:27:36 +00:00
private:
2018-08-29 20:40:26 +00:00
void
ScheduleCleanupTimer();
void
CleanupTX();
uint64_t ids;
Key_t ourKey;
2018-07-17 04:37:50 +00:00
}; // namespace llarp
} // namespace dht
} // namespace llarp
struct llarp_dht_context
{
llarp::dht::Context impl;
llarp::Router* parent;
llarp_dht_context(llarp::Router* router);
};
#endif