diff --git a/include/llarp/dht.h b/include/llarp/dht.h index 18417e8f2..eefeb81a9 100644 --- a/include/llarp/dht.h +++ b/include/llarp/dht.h @@ -2,6 +2,7 @@ #define LLARP_DHT_H_ #include +#include /** * dht.h @@ -17,7 +18,7 @@ struct llarp_dht_context; /// allocator struct llarp_dht_context* -llarp_dht_context_new(); +llarp_dht_context_new(struct llarp_router* parent); /// deallocator void @@ -31,14 +32,40 @@ struct llarp_dht_msg; typedef bool (*llarp_dht_msg_handler)(struct llarp_dht_msg*, struct llarp_dht_msg*); +/// start dht context with our location in keyspace void -llarp_dht_context_set_our_key(struct llarp_dht_context* ctx, const byte_t* key); +llarp_dht_context_start(struct llarp_dht_context* ctx, const byte_t* key); // override dht message handler with custom handler void llarp_dht_set_msg_handler(struct llarp_dht_context* ctx, llarp_dht_msg_handler func); +struct llarp_router_lookup_job; + +typedef void (*llarp_rotuer_lookup_handler)(struct llarp_router_lookup_job*); + +struct llarp_router_lookup_job +{ + void* user; + llarp_rotuer_lookup_handler hook; + struct llarp_dht_context* dht; + llarp_pubkey_t target; + bool found; + llarp_rc result; +}; + +// shallow copy +void +llarp_dht_put_local_router(struct llarp_dht_context* ctx, struct llarp_rc* rc); + +void +llarp_dht_remove_local_router(struct llarp_dht_context* ctx, const byte_t* id); + +void +llarp_dht_lookup_router(struct llarp_dht_context* ctx, + struct llarp_router_lookup_job* job); + #ifdef __cplusplus } #endif diff --git a/include/llarp/dht.hpp b/include/llarp/dht.hpp index 07701eb89..157e09e3d 100644 --- a/include/llarp/dht.hpp +++ b/include/llarp/dht.hpp @@ -4,9 +4,11 @@ #include #include #include +#include #include #include +#include #include #include @@ -16,17 +18,20 @@ namespace llarp { const size_t MAX_MSG_SIZE = 2048; - struct SearchJob; - struct Node { - llarp_rc rc; + llarp_rc* rc; const byte_t* ID() const; - Node(); - ~Node(); + Node() : rc(nullptr) + { + } + + Node(llarp_rc* other) : rc(other) + { + } }; struct Key_t : public llarp::AlignedBuffer< 32 > @@ -55,6 +60,28 @@ namespace llarp } }; + struct SearchJob + { + const static uint64_t JobTimeout = 30000; + + SearchJob(); + + SearchJob(const Key_t& requestor, const Key_t& target, + llarp_router_lookup_job* job); + + void + Completed(const llarp_rc* router, bool timeout = false) const; + + bool + IsExpired(llarp_time_t now) const; + + private: + llarp_time_t started; + Key_t requestor; + Key_t target; + llarp_router_lookup_job* job; + }; + struct XorMetric { const Key_t& us; @@ -105,6 +132,10 @@ namespace llarp bool FindClosest(const Key_t& target, Key_t& result) const; + bool + FindCloseExcluding(const Key_t& target, Key_t& result, + const Key_t& exclude) const; + BucketStorage_t nodes; }; @@ -115,11 +146,78 @@ namespace llarp llarp_dht_msg_handler custom_handler = nullptr; + SearchJob* + FindPendingTX(const Key_t& owner, uint64_t txid); + + void + RemovePendingLookup(const Key_t& owner, uint64_t txid); + void - Init(const Key_t& us); + LookupRouter(const Key_t& target, const Key_t& whoasked, + const Key_t& askpeer, + llarp_router_lookup_job* job = nullptr); + + void + LookupRouterViaJob(llarp_router_lookup_job* job); + + void + LookupRouterRelayed(const Key_t& requester, uint64_t txid, + const Key_t& target, + std::vector< IMessage* >& replies); + + void + Init(const Key_t& us, llarp_router* router); + + void + QueueRouterLookup(llarp_router_lookup_job* job); + + static void + handle_cleaner_timer(void* user, uint64_t orig, uint64_t left); + + static void + queue_router_lookup(void* user); + + llarp_router* router = nullptr; + Bucket* nodes = nullptr; private: - Bucket* nodes = nullptr; + void + ScheduleCleanupTimer(); + + void + CleanupTX(); + + uint64_t ids; + + struct TXOwner + { + Key_t requester = {0}; + uint64_t txid = 0; + + bool + operator==(const TXOwner& other) const + { + return txid == other.txid && requester == other.requester; + } + bool + operator<(const TXOwner& other) const + { + return txid < other.txid && requester < other.requester; + } + }; + + struct TXOwnerHash + { + std::size_t + operator()(TXOwner const& o) const noexcept + { + std::size_t sz2; + memcpy(&sz2, &o.requester[0], sizeof(std::size_t)); + return o.txid ^ (sz2 << 1); + } + }; + + std::unordered_map< TXOwner, SearchJob, TXOwnerHash > pendingTX; Key_t ourKey; }; @@ -128,6 +226,16 @@ namespace llarp GotRouterMessage(const Key_t& from) : IMessage(from) { } + GotRouterMessage(const Key_t& from, uint64_t id, const llarp_rc* result) + : IMessage(from), txid(id) + { + if(result) + { + R.emplace_back(); + llarp_rc_clear(&R.back()); + llarp_rc_copy(&R.back(), result); + } + } ~GotRouterMessage(); @@ -152,6 +260,11 @@ namespace llarp { } + FindRouterMessage(const Key_t& from, const Key_t& target, uint64_t id) + : IMessage(from), K(target), txid(id) + { + } + ~FindRouterMessage(); bool @@ -174,6 +287,8 @@ namespace llarp struct llarp_dht_context { llarp::dht::Context impl; + llarp_router* parent; + llarp_dht_context(llarp_router* router); }; #endif diff --git a/include/llarp/router_contact.h b/include/llarp/router_contact.h index 038565fa1..95f0b06de 100644 --- a/include/llarp/router_contact.h +++ b/include/llarp/router_contact.h @@ -30,7 +30,7 @@ bool llarp_rc_verify_sig(struct llarp_crypto *crypto, struct llarp_rc *rc); void -llarp_rc_copy(struct llarp_rc *dst, struct llarp_rc *src); +llarp_rc_copy(struct llarp_rc *dst, const struct llarp_rc *src); void llarp_rc_set_addrs(struct llarp_rc *rc, struct llarp_alloc *mem, diff --git a/llarp/dht.cpp b/llarp/dht.cpp index 879a1df1c..ab57c37e1 100644 --- a/llarp/dht.cpp +++ b/llarp/dht.cpp @@ -3,6 +3,10 @@ #include #include "router.hpp" +#include + +#include + namespace llarp { DHTImmeidateMessage::~DHTImmeidateMessage() @@ -33,6 +37,7 @@ namespace llarp if(!bencode_start_dict(buf)) return false; + // message type if(!bencode_write_bytestring(buf, "a", 1)) return false; if(!bencode_write_bytestring(buf, "m", 1)) @@ -53,6 +58,7 @@ namespace llarp if(!bencode_end(buf)) return false; + // protocol version if(!bencode_write_version_entry(buf)) return false; @@ -96,6 +102,20 @@ namespace llarp GotRouterMessage::HandleMessage(llarp_router *router, std::vector< IMessage * > &replies) const { + auto &dht = router->dht->impl; + auto pending = dht.FindPendingTX(From, txid); + if(pending) + { + if(R.size()) + pending->Completed(&R[0]); + else + pending->Completed(nullptr); + + dht.RemovePendingLookup(From, txid); + return true; + } + llarp::Warn("Got response for DHT transaction we are not tracking, txid=", + txid); return false; } @@ -106,7 +126,34 @@ namespace llarp bool FindRouterMessage::BEncode(llarp_buffer_t *buf) const { - return false; + if(!bencode_start_dict(buf)) + return false; + + // message type + if(!bencode_write_bytestring(buf, "A", 1)) + return false; + if(!bencode_write_bytestring(buf, "R", 1)) + return false; + + // key + if(!bencode_write_bytestring(buf, "K", 1)) + return false; + if(!bencode_write_bytestring(buf, K.data(), K.size())) + return false; + + // txid + if(!bencode_write_bytestring(buf, "T", 1)) + return false; + if(!bencode_write_uint64(buf, txid)) + return false; + + // version + if(!bencode_write_bytestring(buf, "V", 1)) + return false; + if(!bencode_write_uint64(buf, version)) + return false; + + return bencode_end(buf); } bool @@ -129,9 +176,7 @@ namespace llarp } if(llarp_buffer_eq(key, "V")) { - if(!bencode_read_integer(val, &version)) - return false; - return version == LLARP_PROTO_VERSION; + return bencode_read_integer(val, &version); } return false; } @@ -140,12 +185,20 @@ namespace llarp FindRouterMessage::HandleMessage(llarp_router *router, std::vector< IMessage * > &replies) const { - return false; + auto &dht = router->dht->impl; + auto pending = dht.FindPendingTX(From, txid); + if(pending) + { + llarp::Warn("Got duplicate DHT lookup from ", From, " txid=", txid); + return false; + } + dht.LookupRouterRelayed(From, txid, K, replies); + return true; } struct MessageDecoder { - Key_t From; + const Key_t &From; bool firstKey = true; IMessage *msg = nullptr; @@ -181,6 +234,7 @@ namespace llarp dec->msg = new GotRouterMessage(dec->From); break; default: + llarp::Warn("unknown dht message type: ", (char)*strbuf.base); // bad msg type return false; } @@ -213,7 +267,8 @@ namespace llarp { ListDecoder(const Key_t &from, std::vector< IMessage * > &list) : From(from), l(list){}; - Key_t From; + + const Key_t &From; std::vector< IMessage * > &l; static bool @@ -245,18 +300,77 @@ namespace llarp return bencode_read_list(buf, &r); } - Node::Node() + const byte_t * + Node::ID() const + { + if(rc) + return rc->pubkey; + else + return nullptr; + } + + SearchJob::SearchJob() + { + started = 0; + requestor.Zero(); + target.Zero(); + } + + SearchJob::SearchJob(const Key_t &asker, const Key_t &key, + llarp_router_lookup_job *j) + : started(llarp_time_now_ms()), requestor(asker), target(key), job(j) + { + } + + void + SearchJob::Completed(const llarp_rc *router, bool timeout) const + { + if(job && job->hook) + { + if(router) + { + job->found = true; + llarp_rc_copy(&job->result, router); + } + job->hook(job); + } + } + + bool + SearchJob::IsExpired(llarp_time_t now) const + { + return now - started >= JobTimeout; + } + + bool + Bucket::FindClosest(const Key_t &target, Key_t &result) const { - llarp_rc_clear(&rc); + auto itr = nodes.lower_bound(target); + if(itr == nodes.end()) + return false; + + result = itr->second.ID(); + return true; } - Node::~Node() + bool + Bucket::FindCloseExcluding(const Key_t &target, Key_t &result, + const Key_t &exclude) const { - llarp_rc_free(&rc); + auto itr = nodes.lower_bound(target); + if(itr == nodes.end()) + return false; + if(itr->second.ID() == exclude) + ++itr; + if(itr == nodes.end()) + return false; + result = itr->second.ID(); + return true; } Context::Context() { + randombytes((byte_t *)&ids, sizeof(uint64_t)); } Context::~Context() @@ -266,20 +380,155 @@ namespace llarp } void - Context::Init(const Key_t &us) + Context::handle_cleaner_timer(void *u, uint64_t orig, uint64_t left) + { + if(left) + return; + Context *ctx = static_cast< Context * >(u); + + ctx->CleanupTX(); + } + + void + Context::LookupRouterRelayed(const Key_t &requester, uint64_t txid, + const Key_t &target, + std::vector< IMessage * > &replies) { + if(target == ourKey) + { + // we are the target, give them our RC + replies.push_back(new GotRouterMessage(requester, txid, &router->rc)); + return; + } + Key_t next = ourKey; + nodes->FindClosest(target, next); + if(next == ourKey) + { + // we are closest and don't have a match + replies.push_back(new GotRouterMessage(requester, txid, nullptr)); + return; + } + if(next == target) + { + // we know it + replies.push_back( + new GotRouterMessage(requester, txid, nodes->nodes[target].rc)); + return; + } + + // ask neighbor + LookupRouter(target, requester, next); + } + + void + Context::RemovePendingLookup(const Key_t &owner, uint64_t id) + { + auto itr = pendingTX.find({owner, id}); + if(itr == pendingTX.end()) + return; + pendingTX.erase(itr); + } + + SearchJob * + Context::FindPendingTX(const Key_t &owner, uint64_t id) + { + auto itr = pendingTX.find({owner, id}); + if(itr == pendingTX.end()) + return nullptr; + else + return &itr->second; + } + + void + Context::CleanupTX() + { + auto now = llarp_time_now_ms(); + std::set< TXOwner > expired; + + for(auto &item : pendingTX) + if(item.second.IsExpired(now)) + expired.insert(item.first); + + for(const auto &e : expired) + { + pendingTX[e].Completed(nullptr, true); + RemovePendingLookup(e.requester, e.txid); + if(e.requester != ourKey) + { + // inform not found + auto msg = new llarp::DHTImmeidateMessage(e.requester); + msg->msgs.push_back( + new GotRouterMessage(e.requester, e.txid, nullptr)); + router->SendToOrQueue(e.requester, {msg}); + } + } + + ScheduleCleanupTimer(); + } + + void + Context::Init(const Key_t &us, llarp_router *r) + { + router = r; ourKey = us; nodes = new Bucket(ourKey); } + + void + Context::ScheduleCleanupTimer() + { + llarp_logic_call_later(router->logic, + {1000, this, &handle_cleaner_timer}); + } + + void + Context::LookupRouter(const Key_t &target, const Key_t &whoasked, + const Key_t &askpeer, llarp_router_lookup_job *job) + { + auto id = ++ids; + pendingTX[{whoasked, id}] = SearchJob(whoasked, target, job); + + llarp::Info("Asking ", askpeer, " for router ", target, " for ", + whoasked); + auto msg = new llarp::DHTImmeidateMessage(askpeer); + msg->msgs.push_back(new FindRouterMessage(askpeer, target, id)); + router->SendToOrQueue(askpeer, {msg}); + } + + void + Context::LookupRouterViaJob(llarp_router_lookup_job *job) + { + Key_t peer; + if(nodes->FindCloseExcluding(job->target, peer, ourKey)) + LookupRouter(job->target, ourKey, peer, job); + else if(job->hook) + { + job->found = false; + job->hook(job); + } + } + + void + Context::queue_router_lookup(void *user) + { + llarp_router_lookup_job *job = + static_cast< llarp_router_lookup_job * >(user); + job->dht->impl.LookupRouterViaJob(job); + } } } +llarp_dht_context::llarp_dht_context(llarp_router *router) +{ + parent = router; +} + extern "C" { struct llarp_dht_context * -llarp_dht_context_new() +llarp_dht_context_new(struct llarp_router *router) { - return new llarp_dht_context; + return new llarp_dht_context(router); } void @@ -288,6 +537,23 @@ llarp_dht_context_free(struct llarp_dht_context *ctx) delete ctx; } +void +llarp_dht_put_local_router(struct llarp_dht_context *ctx, struct llarp_rc *rc) + +{ + ctx->impl.nodes->nodes[rc->pubkey] = rc; +} + +void +llarp_dht_remove_local_router(struct llarp_dht_context *ctx, const byte_t *id) +{ + auto &nodes = ctx->impl.nodes->nodes; + auto itr = nodes.find(id); + if(itr == nodes.end()) + return; + nodes.erase(itr); +} + void llarp_dht_set_msg_handler(struct llarp_dht_context *ctx, llarp_dht_msg_handler handler) @@ -296,8 +562,17 @@ llarp_dht_set_msg_handler(struct llarp_dht_context *ctx, } void -llarp_dht_context_set_our_key(struct llarp_dht_context *ctx, const byte_t *key) +llarp_dht_context_start(struct llarp_dht_context *ctx, const byte_t *key) +{ + ctx->impl.Init(key, ctx->parent); +} + +void +llarp_dh_lookup_router(struct llarp_dht_context *ctx, + struct llarp_router_lookup_job *job) { - ctx->impl.Init(key); + job->dht = ctx; + llarp_logic_queue_job(ctx->parent->logic, + {job, &llarp::dht::Context::queue_router_lookup}); } } diff --git a/llarp/iwp_link.cpp b/llarp/iwp_link.cpp index 2fe63f561..86e1f1b95 100644 --- a/llarp/iwp_link.cpp +++ b/llarp/iwp_link.cpp @@ -1232,6 +1232,10 @@ namespace iwp [src](const auto &item) -> bool { return src == item.second; }); if(itr == std::end(m_Connected)) return; + + // remove from dht tracking + llarp_dht_remove_local_router(router->dht, itr->first); + m_Connected.erase(itr); } diff --git a/llarp/router.cpp b/llarp/router.cpp index 27c30e68c..8b0a25402 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -28,7 +28,7 @@ namespace llarp } // namespace llarp llarp_router::llarp_router() - : ready(false), dht(llarp_dht_context_new()), inbound_msg_parser(this) + : ready(false), dht(llarp_dht_context_new(this)), inbound_msg_parser(this) { llarp_rc_clear(&rc); } @@ -310,6 +310,8 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job) if(job->session) { auto session = job->session; + llarp_dht_put_local_router(router->dht, + session->get_remote_router(session)); router->async_verify_RC(session, false, job); return; } diff --git a/llarp/router_contact.cpp b/llarp/router_contact.cpp index 41559b975..deb6b3dd1 100644 --- a/llarp/router_contact.cpp +++ b/llarp/router_contact.cpp @@ -93,7 +93,7 @@ llarp_rc_decode_dict(struct dict_reader *r, llarp_buffer_t *key) } void -llarp_rc_copy(struct llarp_rc *dst, struct llarp_rc *src) +llarp_rc_copy(struct llarp_rc *dst, const struct llarp_rc *src) { llarp_rc_free(dst); llarp_rc_clear(dst);