#include #include #include #include "router.hpp" #include #include #include // std::find namespace llarp { DHTImmeidateMessage::~DHTImmeidateMessage() { for(auto &msg : msgs) delete msg; msgs.clear(); } bool DHTImmeidateMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *buf) { if(llarp_buffer_eq(key, "m")) return llarp::dht::DecodeMesssageList(remote.data(), buf, msgs); if(llarp_buffer_eq(key, "v")) { if(!bencode_read_integer(buf, &version)) return false; return version == LLARP_PROTO_VERSION; } // bad key return false; } bool DHTImmeidateMessage::BEncode(llarp_buffer_t *buf) const { if(!bencode_start_dict(buf)) return false; // message type if(!bencode_write_bytestring(buf, "a", 1)) return false; if(!bencode_write_bytestring(buf, "m", 1)) return false; // dht messages if(!bencode_write_bytestring(buf, "m", 1)) return false; // begin list if(!bencode_start_list(buf)) return false; for(const auto &msg : msgs) { if(!msg->BEncode(buf)) return false; } // end list if(!bencode_end(buf)) return false; // protocol version if(!bencode_write_version_entry(buf)) return false; return bencode_end(buf); } bool DHTImmeidateMessage::HandleMessage(llarp_router *router) const { DHTImmeidateMessage *reply = new DHTImmeidateMessage(remote); bool result = true; for(auto &msg : msgs) { result &= msg->HandleMessage(router, reply->msgs); } return result && router->SendToOrQueue(remote.data(), {reply}); } namespace dht { GotRouterMessage::~GotRouterMessage() { for(auto &rc : R) llarp_rc_free(&rc); R.clear(); } bool GotRouterMessage::BEncode(llarp_buffer_t *buf) const { return false; } bool GotRouterMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *val) { return false; } bool GotRouterMessage::HandleMessage(llarp_router *router, std::vector< IMessage * > &replies) const { auto &dht = router->dht->impl; auto pending = dht.FindPendingTX(From, txid); if(pending) { if(R.size()) pending->Completed(&R[0]); else pending->Completed(nullptr); dht.RemovePendingLookup(From, txid); return true; } llarp::Warn("Got response for DHT transaction we are not tracking, txid=", txid); return false; } FindRouterMessage::~FindRouterMessage() { } bool FindRouterMessage::BEncode(llarp_buffer_t *buf) const { if(!bencode_start_dict(buf)) return false; // message type if(!bencode_write_bytestring(buf, "A", 1)) return false; if(!bencode_write_bytestring(buf, "R", 1)) return false; // key if(!bencode_write_bytestring(buf, "K", 1)) return false; if(!bencode_write_bytestring(buf, K.data(), K.size())) return false; // txid if(!bencode_write_bytestring(buf, "T", 1)) return false; if(!bencode_write_uint64(buf, txid)) return false; // version if(!bencode_write_bytestring(buf, "V", 1)) return false; if(!bencode_write_uint64(buf, version)) return false; return bencode_end(buf); } bool FindRouterMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *val) { llarp_buffer_t strbuf; if(llarp_buffer_eq(key, "K")) { if(!bencode_read_string(val, &strbuf)) return false; if(strbuf.sz != K.size()) return false; memcpy(K.data(), strbuf.base, K.size()); return true; } if(llarp_buffer_eq(key, "T")) { return bencode_read_integer(val, &txid); } if(llarp_buffer_eq(key, "V")) { return bencode_read_integer(val, &version); } return false; } bool FindRouterMessage::HandleMessage(llarp_router *router, std::vector< IMessage * > &replies) const { auto &dht = router->dht->impl; auto pending = dht.FindPendingTX(From, txid); if(pending) { llarp::Warn("Got duplicate DHT lookup from ", From, " txid=", txid); return false; } dht.LookupRouterRelayed(From, txid, K, replies); return true; } struct MessageDecoder { const Key_t &From; bool firstKey = true; IMessage *msg = nullptr; MessageDecoder(const Key_t &from) : From(from) { } static bool on_key(dict_reader *r, llarp_buffer_t *key) { llarp_buffer_t strbuf; MessageDecoder *dec = static_cast< MessageDecoder * >(r->user); // check for empty dict if(!key) return !dec->firstKey; // first key if(dec->firstKey) { if(!llarp_buffer_eq(*key, "A")) return false; if(!bencode_read_string(r->buffer, &strbuf)) return false; // bad msg size? if(strbuf.sz != 1) return false; switch(*strbuf.base) { case 'R': dec->msg = new FindRouterMessage(dec->From); break; case 'S': dec->msg = new GotRouterMessage(dec->From); break; default: llarp::Warn("unknown dht message type: ", (char)*strbuf.base); // bad msg type return false; } dec->firstKey = false; return true; } else return dec->msg->DecodeKey(*key, r->buffer); } }; IMessage * DecodeMesssage(const Key_t &from, llarp_buffer_t *buf) { MessageDecoder dec(from); dict_reader r; r.user = &dec; r.on_key = &MessageDecoder::on_key; if(bencode_read_dict(buf, &r)) return dec.msg; else { if(dec.msg) delete dec.msg; return nullptr; } } struct ListDecoder { ListDecoder(const Key_t &from, std::vector< IMessage * > &list) : From(from), l(list){}; const Key_t &From; std::vector< IMessage * > &l; static bool on_item(list_reader *r, bool has) { ListDecoder *dec = static_cast< ListDecoder * >(r->user); if(!has) return true; auto msg = DecodeMesssage(dec->From, r->buffer); if(msg) { dec->l.push_back(msg); return true; } else return false; } }; bool DecodeMesssageList(const Key_t &from, llarp_buffer_t *buf, std::vector< IMessage * > &list) { ListDecoder dec(from, list); list_reader r; r.user = &dec; r.on_item = &ListDecoder::on_item; return bencode_read_list(buf, &r); } SearchJob::SearchJob() { started = 0; requestor.Zero(); target.Zero(); } SearchJob::SearchJob(const Key_t &asker, const Key_t &key, llarp_router_lookup_job *j) : started(llarp_time_now_ms()), requestor(asker), target(key), job(j) { } void SearchJob::Completed(const llarp_rc *router, bool timeout) const { if(job && job->hook) { if(router) { job->found = true; llarp_rc_copy(&job->result, router); } job->hook(job); } } bool SearchJob::IsExpired(llarp_time_t now) const { return now - started >= JobTimeout; } bool Bucket::FindClosest(const Key_t &target, Key_t &result) const { auto itr = nodes.lower_bound(target); if(itr == nodes.end()) return false; result = itr->second.ID; return true; } bool Bucket::FindCloseExcluding(const Key_t &target, Key_t &result, const Key_t &exclude) const { auto itr = nodes.lower_bound(target); if(itr == nodes.end()) return false; if(itr->second.ID == exclude) ++itr; if(itr == nodes.end()) return false; result = itr->second.ID; return true; } Context::Context() { randombytes((byte_t *)&ids, sizeof(uint64_t)); } Context::~Context() { if(nodes) delete nodes; } void Context::handle_cleaner_timer(void *u, uint64_t orig, uint64_t left) { if(left) return; Context *ctx = static_cast< Context * >(u); ctx->CleanupTX(); } void Context::LookupRouterRelayed(const Key_t &requester, uint64_t txid, const Key_t &target, std::vector< IMessage * > &replies) { if(target == ourKey) { // we are the target, give them our RC replies.push_back(new GotRouterMessage(requester, txid, &router->rc)); return; } Key_t next = ourKey; nodes->FindClosest(target, next); if(next == ourKey) { // we are closest and don't have a match replies.push_back(new GotRouterMessage(requester, txid, nullptr)); return; } if(next == target) { // we know it replies.push_back( new GotRouterMessage(requester, txid, nodes->nodes[target].rc)); return; } // ask neighbor LookupRouter(target, requester, next); } void Context::RemovePendingLookup(const Key_t &owner, uint64_t id) { TXOwner search; search.requester = owner; search.txid = id; auto itr = pendingTX.find(search); if(itr == pendingTX.end()) return; pendingTX.erase(itr); } SearchJob * Context::FindPendingTX(const Key_t &owner, uint64_t id) { TXOwner search; search.requester = owner; search.txid = id; auto itr = pendingTX.find(search); if(itr == pendingTX.end()) return nullptr; else return &itr->second; } void Context::CleanupTX() { auto now = llarp_time_now_ms(); llarp::Debug("DHT tick"); std::set< TXOwner > expired; for(auto &item : pendingTX) if(item.second.IsExpired(now)) expired.insert(item.first); for(const auto &e : expired) { pendingTX[e].Completed(nullptr, true); RemovePendingLookup(e.requester, e.txid); if(e.requester != ourKey) { // inform not found llarp::DHTImmeidateMessage msg(e.requester); msg.msgs.push_back( new GotRouterMessage(e.requester, e.txid, nullptr)); llarp::Info("DHT reply to ", e.requester); router->SendTo(e.requester, &msg); } } ScheduleCleanupTimer(); } void Context::Init(const Key_t &us, llarp_router *r) { router = r; ourKey = us; nodes = new Bucket(ourKey); llarp::Debug("intialize dht with key ", ourKey); } void Context::ScheduleCleanupTimer() { llarp_logic_call_later(router->logic, {1000, this, &handle_cleaner_timer}); } void Context::LookupRouter(const Key_t &target, const Key_t &whoasked, const Key_t &askpeer, llarp_router_lookup_job *job) { auto id = ++ids; TXOwner ownerKey; ownerKey.requester = whoasked; ownerKey.txid = id; pendingTX[ownerKey] = SearchJob(whoasked, target, job); llarp::Info("Asking ", askpeer, " for router ", target, " for ", whoasked); auto msg = new llarp::DHTImmeidateMessage(askpeer); msg->msgs.push_back(new FindRouterMessage(askpeer, target, id)); router->SendToOrQueue(askpeer, {msg}); } void Context::LookupRouterViaJob(llarp_router_lookup_job *job) { Key_t peer; if(nodes->FindCloseExcluding(job->target, peer, ourKey)) LookupRouter(job->target, ourKey, peer, job); else if(job->hook) { job->found = false; job->hook(job); } } void Context::queue_router_lookup(void *user) { llarp_router_lookup_job *job = static_cast< llarp_router_lookup_job * >(user); job->dht->impl.LookupRouterViaJob(job); } } } llarp_dht_context::llarp_dht_context(llarp_router *router) { parent = router; } extern "C" { struct llarp_dht_context * llarp_dht_context_new(struct llarp_router *router) { return new llarp_dht_context(router); } void llarp_dht_context_free(struct llarp_dht_context *ctx) { delete ctx; } void llarp_dht_put_local_router(struct llarp_dht_context *ctx, struct llarp_rc *rc) { llarp::dht::Key_t k = rc->pubkey; llarp::Debug("put router at ", k); ctx->impl.nodes->nodes[k] = rc; } void llarp_dht_remove_local_router(struct llarp_dht_context *ctx, const byte_t *id) { auto &nodes = ctx->impl.nodes->nodes; auto itr = nodes.find(id); if(itr == nodes.end()) return; nodes.erase(itr); } void llarp_dht_set_msg_handler(struct llarp_dht_context *ctx, llarp_dht_msg_handler handler) { ctx->impl.custom_handler = handler; } void llarp_dht_context_start(struct llarp_dht_context *ctx, const byte_t *key) { ctx->impl.Init(key, ctx->parent); } void llarp_dh_lookup_router(struct llarp_dht_context *ctx, struct llarp_router_lookup_job *job) { job->dht = ctx; llarp_logic_queue_job(ctx->parent->logic, {job, &llarp::dht::Context::queue_router_lookup}); } }