From fe01c38d8edafe4973f426e0bf28018c9af3c9ca Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 12 Jul 2018 09:43:37 -0400 Subject: [PATCH] * fix dht feedback loop * start on dht lookups for hidden services * make debug logging confurable on runtime with env var LLARP_DEBUG=1 * make eventloop tick only when we get traffic * make testnet parameters configurable on runtime --- Makefile | 8 +++-- include/llarp/dht/context.hpp | 4 +-- include/llarp/dht/messages/gotintro.hpp | 17 ++++++++-- include/llarp/dht/search_job.hpp | 30 ++++++++++++----- include/llarp/iwp/transit_message.hpp | 3 ++ include/llarp/service/Info.hpp | 2 +- include/llarp/service/Intro.hpp | 6 ++-- include/llarp/service/address.hpp | 21 +++++++++--- llarp/context.cpp | 7 +++- llarp/dht/context.cpp | 7 ++-- llarp/dht/decode.cpp | 13 ++++---- llarp/dht/dht_immediate.cpp | 10 +++++- llarp/dht/got_intro.cpp | 35 ++++++++++++++++++-- llarp/dht/got_router.cpp | 8 ++--- llarp/dht/publish_intro.cpp | 25 ++++++++++++-- llarp/dht/search_job.cpp | 43 ++++++++++++++++++++++--- llarp/ev_epoll.hpp | 13 ++++---- llarp/ev_kqueue.hpp | 13 ++++---- llarp/iwp/frame_state.cpp | 3 +- llarp/iwp/session.cpp | 4 ++- llarp/iwp/transit_message.cpp | 6 ++++ llarp/service.cpp | 6 ++-- llarp/service/context.cpp | 3 +- llarp/service/endpoint.cpp | 3 +- 24 files changed, 220 insertions(+), 70 deletions(-) diff --git a/Makefile b/Makefile index 5e685fda8..f6b9ebca4 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,10 @@ TESTNET_ROOT=$(REPO)/testnet_tmp TESTNET_CONF=$(TESTNET_ROOT)/supervisor.conf TESTNET_LOG=$(TESTNET_ROOT)/testnet.log +TESTNET_CLIENTS ?= 50 +TESTNET_SERVERS ?= 50 +TESTNET_DEBUG ?= 0 + clean: rm -f build.ninja rules.ninja cmake_install.cmake CMakeCache.txt rm -rf CMakeFiles @@ -70,8 +74,8 @@ testnet-build: testnet-configure testnet: testnet-build mkdir -p $(TESTNET_ROOT) - python3 contrib/testnet/genconf.py --bin=$(REPO)/llarpd --svc=30 --clients=1 --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) - supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF) + python3 contrib/testnet/genconf.py --bin=$(REPO)/llarpd --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) + LLARP_DEBUG=$(TESTNET_DEBUG) supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF) test: debug-configure ninja diff --git a/include/llarp/dht/context.hpp b/include/llarp/dht/context.hpp index a529a6930..475337ee7 100644 --- a/include/llarp/dht/context.hpp +++ b/include/llarp/dht/context.hpp @@ -112,8 +112,8 @@ namespace llarp std::unordered_map< TXOwner, SearchJob, TXOwnerHash > pendingTX; Key_t ourKey; }; - } -} + } // namespace dht +} // namespace llarp struct llarp_dht_context { diff --git a/include/llarp/dht/messages/gotintro.hpp b/include/llarp/dht/messages/gotintro.hpp index ca75d75dc..0ad0ed402 100644 --- a/include/llarp/dht/messages/gotintro.hpp +++ b/include/llarp/dht/messages/gotintro.hpp @@ -13,7 +13,7 @@ namespace llarp std::list< llarp::service::IntroSet > I; uint64_t T = 0; - GotIntroMessage() : IMessage({}) + GotIntroMessage(const Key_t& from) : IMessage(from) { } @@ -27,10 +27,21 @@ namespace llarp bool DecodeKey(llarp_buffer_t key, llarp_buffer_t* val); + virtual bool + HandleMessage(llarp_dht_context* ctx, + std::vector< IMessage* >& replies) const; + }; + + struct RelayedGotIntroMessage : public GotIntroMessage + { + RelayedGotIntroMessage() : GotIntroMessage({}) + { + } + bool HandleMessage(llarp_dht_context* ctx, std::vector< IMessage* >& replies) const; }; - } -} + } // namespace dht +} // namespace llarp #endif \ No newline at end of file diff --git a/include/llarp/dht/search_job.hpp b/include/llarp/dht/search_job.hpp index bf0a41b3a..d5063f468 100644 --- a/include/llarp/dht/search_job.hpp +++ b/include/llarp/dht/search_job.hpp @@ -3,8 +3,9 @@ #define LLARP_DHT_SEARCH_JOB_HPP #include #include +#include #include - +#include #include namespace llarp @@ -14,26 +15,39 @@ namespace llarp struct SearchJob { const static uint64_t JobTimeout = 30000; - + typedef std::function< void(const llarp::service::IntroSet*) > + IntroSetHookFunc; SearchJob(); - + /// for routers + SearchJob(const Key_t& requester, uint64_t requesterTX, + const Key_t& target, const std::set< Key_t >& excludes, + llarp_router_lookup_job* job); + /// for introsets SearchJob(const Key_t& requester, uint64_t requesterTX, - const Key_t& target, llarp_router_lookup_job* job, - const std::set< Key_t >& excludes); + const Key_t& target, const std::set< Key_t >& excludes, + IntroSetHookFunc found); + + void + FoundRouter(const llarp_rc* router) const; + + void + FoundIntro(const llarp::service::IntroSet* introset) const; void - Completed(const llarp_rc* router, bool timeout = false) const; + Timeout() const; bool IsExpired(llarp_time_t now) const; + // only set if looking up router llarp_router_lookup_job* job = nullptr; + IntroSetHookFunc foundIntroHook; llarp_time_t started; Key_t requester; uint64_t requesterTX; Key_t target; std::set< Key_t > exclude; }; - } -} + } // namespace dht +} // namespace llarp #endif \ No newline at end of file diff --git a/include/llarp/iwp/transit_message.hpp b/include/llarp/iwp/transit_message.hpp index 8938b492c..6537c410d 100644 --- a/include/llarp/iwp/transit_message.hpp +++ b/include/llarp/iwp/transit_message.hpp @@ -41,6 +41,9 @@ struct transit_message bool should_send_ack(llarp_time_t now) const; + bool + should_resend_frags(llarp_time_t now) const; + bool should_resend_xmit(llarp_time_t now) const; bool diff --git a/include/llarp/service/Info.hpp b/include/llarp/service/Info.hpp index cf15bbd22..05959d29a 100644 --- a/include/llarp/service/Info.hpp +++ b/include/llarp/service/Info.hpp @@ -45,7 +45,7 @@ namespace llarp /// calculate our address bool - CalculateAddress(Address& addr) const; + CalculateAddress(byte_t* buf) const; bool BEncode(llarp_buffer_t* buf) const; diff --git a/include/llarp/service/Intro.hpp b/include/llarp/service/Intro.hpp index 1c22f8d00..4b2b41ef3 100644 --- a/include/llarp/service/Intro.hpp +++ b/include/llarp/service/Intro.hpp @@ -14,9 +14,9 @@ namespace llarp { llarp::PubKey router; llarp::PathID_t pathID; - uint64_t latency = 0; - uint64_t version = 0; - uint64_t expiresAt; + uint64_t latency = 0; + uint64_t version = 0; + uint64_t expiresAt = 0; Introduction() = default; Introduction(const Introduction& other) diff --git a/include/llarp/service/address.hpp b/include/llarp/service/address.hpp index 6a6ba4b5a..4113f3667 100644 --- a/include/llarp/service/address.hpp +++ b/include/llarp/service/address.hpp @@ -1,17 +1,28 @@ #ifndef LLARP_SERVICE_ADDRESS_HPP #define LLARP_SERVICE_ADDRESS_HPP #include +#include #include namespace llarp { namespace service { - typedef llarp::AlignedBuffer< 32 > Address; + struct Address : public llarp::AlignedBuffer< 32 > + { + std::string + ToString() const; - std::string - AddressToString(const Address& addr); - } -} + Address() : llarp::AlignedBuffer< 32 >() + { + } + + Address(const byte_t* data) : llarp::AlignedBuffer< 32 >(data) + { + } + }; + + } // namespace service +} // namespace llarp #endif \ No newline at end of file diff --git a/llarp/context.cpp b/llarp/context.cpp index 7f2106b78..3edefc2fe 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include "logger.hpp" @@ -308,7 +309,11 @@ llarp_main_init(const char *fname, bool multiProcess) { if(!fname) fname = "daemon.ini"; - + char *var = getenv("LLARP_DEBUG"); + if(var && *var == '1') + { + cSetLogLevel(eLogDebug); + } llarp_main *m = new llarp_main; m->ctx.reset(new llarp::Context(std::cout, !multiProcess)); if(!m->ctx->LoadConfig(fname)) diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index 5f37bcb2b..8b83cb27d 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -123,7 +123,7 @@ namespace llarp { if(itr->second.IsExpired(now)) { - itr->second.Completed(nullptr, true); + itr->second.Timeout(); itr = pendingTX.erase(itr); } else @@ -174,9 +174,8 @@ namespace llarp ownerKey.txid = id; if(txid == 0) txid = id; - - pendingTX[ownerKey] = SearchJob(whoasked, txid, target, job, excludes); - + SearchJob j(whoasked, txid, target, excludes, job); + pendingTX[ownerKey] = j; llarp::LogInfo("Asking ", askpeer, " for router ", target, " for ", whoasked); auto msg = new llarp::DHTImmeidateMessage(askpeer); diff --git a/llarp/dht/decode.cpp b/llarp/dht/decode.cpp index ad99dd249..95eb369c6 100644 --- a/llarp/dht/decode.cpp +++ b/llarp/dht/decode.cpp @@ -60,14 +60,13 @@ namespace llarp case 'G': if(dec->relayed) { - dec->msg = new GotIntroMessage(); + dec->msg = new RelayedGotIntroMessage(); break; } else { - llarp::LogWarn( - "GotIntroMessage found when parsing direct DHT message"); - return false; + dec->msg = new GotIntroMessage(dec->From); + break; } default: llarp::LogWarn("unknown dht message type: ", (char)*strbuf.base); @@ -91,7 +90,9 @@ namespace llarp r.user = &dec; r.on_key = &MessageDecoder::on_key; if(bencode_read_dict(buf, &r)) + { return dec.msg; + } else { if(dec.msg) @@ -137,5 +138,5 @@ namespace llarp r.on_item = &ListDecoder::on_item; return bencode_read_list(buf, &r); } - } -} \ No newline at end of file + } // namespace dht +} // namespace llarp \ No newline at end of file diff --git a/llarp/dht/dht_immediate.cpp b/llarp/dht/dht_immediate.cpp index 92a67f315..1fbc09337 100644 --- a/llarp/dht/dht_immediate.cpp +++ b/llarp/dht/dht_immediate.cpp @@ -68,6 +68,14 @@ namespace llarp { result &= msg->HandleMessage(router->dht, reply->msgs); } - return result && router->SendToOrQueue(remote.data(), reply); + if(reply->msgs.size()) + { + return result && router->SendToOrQueue(remote.data(), reply); + } + else + { + delete reply; + return result; + } } } \ No newline at end of file diff --git a/llarp/dht/got_intro.cpp b/llarp/dht/got_intro.cpp index ce6bc105c..8d1008f3e 100644 --- a/llarp/dht/got_intro.cpp +++ b/llarp/dht/got_intro.cpp @@ -25,6 +25,37 @@ namespace llarp bool GotIntroMessage::HandleMessage(llarp_dht_context *ctx, std::vector< IMessage * > &replies) const + { + auto &dht = ctx->impl; + auto crypto = &dht.router->crypto; + if(I.size() == 1) + { + const auto &introset = I.front(); + if(!introset.VerifySignature(crypto)) + { + llarp::LogWarn( + "Invalid introset signature while handling direct GotIntro from ", + From); + return false; + } + llarp::dht::Key_t addr; + if(!introset.A.CalculateAddress(addr)) + { + llarp::LogWarn( + "failed to calculate hidden service address for direct GotIntro " + "message from ", + From); + return false; + } + // TODO: inform any pending tx + return true; + } + return false; + } + + bool + RelayedGotIntroMessage::HandleMessage( + llarp_dht_context *ctx, std::vector< IMessage * > &replies) const { // TODO: implement me better? auto pathset = ctx->impl.router->paths.GetLocalPathSet(pathID); @@ -66,5 +97,5 @@ namespace llarp return false; return bencode_end(buf); } - } -} \ No newline at end of file + } // namespace dht +} // namespace llarp \ No newline at end of file diff --git a/llarp/dht/got_router.cpp b/llarp/dht/got_router.cpp index c468b22fe..d50e880a6 100644 --- a/llarp/dht/got_router.cpp +++ b/llarp/dht/got_router.cpp @@ -67,7 +67,7 @@ namespace llarp { if(R.size()) { - pending->Completed(&R[0]); + pending->FoundRouter(&R[0]); if(pending->requester != dht.OurKey()) { replies.push_back(new GotRouterMessage( @@ -95,7 +95,7 @@ namespace llarp { llarp::LogInfo(pending->target, " was not found via ", From, " and we won't look it up"); - pending->Completed(nullptr); + pending->FoundRouter(nullptr); if(pending->requester != dht.OurKey()) { replies.push_back(new GotRouterMessage( @@ -110,5 +110,5 @@ namespace llarp "Got response for DHT transaction we are not tracking, txid=", txid); return false; } - } -} \ No newline at end of file + } // namespace dht +} // namespace llarp \ No newline at end of file diff --git a/llarp/dht/publish_intro.cpp b/llarp/dht/publish_intro.cpp index 7aa9f3427..602302b92 100644 --- a/llarp/dht/publish_intro.cpp +++ b/llarp/dht/publish_intro.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include "router.hpp" namespace llarp @@ -49,9 +50,27 @@ namespace llarp llarp::LogWarn("proof of work not good enough for IntroSet"); return false; } - // TODO: make this smarter (?) + llarp::dht::Key_t addr; + if(!I.A.CalculateAddress(addr)) + { + llarp::LogWarn( + "failed to calculate hidden service address for PubIntro message"); + return false; + } dht.services->PutNode(I); replies.push_back(new GotIntroMessage(txID, &I)); + Key_t peer; + std::set< Key_t > exclude; + exclude.insert(From); + if(txID && dht.nodes->FindCloseExcluding(addr, peer, exclude)) + { + // we are not the closest to this address, send it to the closest node + llarp::LogInfo("telling closer peer ", peer, " we got an IntroSet for ", + addr); + auto msg = new llarp::DHTImmeidateMessage(peer); + msg->msgs.push_back(new PublishIntroMessage(I, 0)); + dht.router->SendToOrQueue(peer, msg); + } return true; } @@ -77,5 +96,5 @@ namespace llarp return false; return bencode_end(buf); } - } -} \ No newline at end of file + } // namespace dht +} // namespace llarp \ No newline at end of file diff --git a/llarp/dht/search_job.cpp b/llarp/dht/search_job.cpp index e2d524cc5..72d93b53f 100644 --- a/llarp/dht/search_job.cpp +++ b/llarp/dht/search_job.cpp @@ -11,8 +11,8 @@ namespace llarp } SearchJob::SearchJob(const Key_t &asker, uint64_t tx, const Key_t &key, - llarp_router_lookup_job *j, - const std::set< Key_t > &excludes) + const std::set< Key_t > &excludes, + llarp_router_lookup_job *j) : job(j) , started(llarp_time_now_ms()) , requester(asker) @@ -22,8 +22,27 @@ namespace llarp { } + SearchJob::SearchJob(const Key_t &asker, uint64_t tx, const Key_t &key, + const std::set< Key_t > &excludes, + IntroSetHookFunc foundIntroset) + : foundIntroHook(foundIntroset) + , started(llarp_time_now_ms()) + , requester(asker) + , requesterTX(tx) + , target(key) + , exclude(excludes) + { + } + + void + SearchJob::FoundIntro(const llarp::service::IntroSet *introset) const + { + if(foundIntroHook) + foundIntroHook(introset); + } + void - SearchJob::Completed(const llarp_rc *router, bool timeout) const + SearchJob::FoundRouter(const llarp_rc *router) const { if(job && job->hook) { @@ -41,5 +60,19 @@ namespace llarp { return now - started >= JobTimeout; } - } -} \ No newline at end of file + + void + SearchJob::Timeout() const + { + if(job) + { + job->found = false; + job->hook(job); + } + else if(foundIntroHook) + { + foundIntroHook(nullptr); + } + } + } // namespace dht +} // namespace llarp \ No newline at end of file diff --git a/llarp/ev_epoll.hpp b/llarp/ev_epoll.hpp index 5eddce18f..6dd647417 100644 --- a/llarp/ev_epoll.hpp +++ b/llarp/ev_epoll.hpp @@ -130,10 +130,10 @@ struct llarp_epoll_loop : public llarp_ev_loop } ++idx; } + for(auto& l : udp_listeners) + if(l->tick) + l->tick(l); } - for(auto& l : udp_listeners) - if(l->tick) - l->tick(l); return result; } @@ -168,10 +168,11 @@ struct llarp_epoll_loop : public llarp_ev_loop } ++idx; } + for(auto& l : udp_listeners) + if(l->tick) + l->tick(l); } - for(auto& l : udp_listeners) - if(l->tick) - l->tick(l); + } while(epollfd != -1); return result; } diff --git a/llarp/ev_kqueue.hpp b/llarp/ev_kqueue.hpp index 74abd0355..39c501d6f 100644 --- a/llarp/ev_kqueue.hpp +++ b/llarp/ev_kqueue.hpp @@ -122,10 +122,11 @@ struct llarp_kqueue_loop : public llarp_ev_loop } ++idx; } + + for(auto& l : udp_listeners) + if(l->tick) + l->tick(l); } - for(auto& l : udp_listeners) - if(l->tick) - l->tick(l); return result; } @@ -153,10 +154,10 @@ struct llarp_kqueue_loop : public llarp_ev_loop } ++idx; } + for(auto& l : udp_listeners) + if(l->tick) + l->tick(l); } - for(auto& l : udp_listeners) - if(l->tick) - l->tick(l); } while(result != -1); return result; } diff --git a/llarp/iwp/frame_state.cpp b/llarp/iwp/frame_state.cpp index c7bfaed7a..5e8b4ac28 100644 --- a/llarp/iwp/frame_state.cpp +++ b/llarp/iwp/frame_state.cpp @@ -276,7 +276,7 @@ frame_state::got_acks(frame_header hdr, size_t sz) tx.erase(msgid); delete msg; } - else + else if(msg->should_resend_frags(llarp_time_now_ms())) { llarp::LogDebug("message ", msgid, " retransmit fragments"); msg->retransmit_frags(sendqueue, txflags); @@ -348,6 +348,7 @@ frame_state::queue_tx(uint64_t id, transit_message *msg) { tx.insert(std::make_pair(id, msg)); msg->generate_xmit(sendqueue, txflags); + msg->retransmit_frags(sendqueue, txflags); } void diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index c0208e9e2..d20d149a3 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -417,7 +417,6 @@ llarp_link_session::TickLogic() q.pop(); } frame.process_inbound_queue(); - frame.retransmit(llarp_time_now_ms()); pump(); } @@ -605,7 +604,10 @@ llarp_link_session::decrypt_frame(const void *buf, size_t sz) decryptedFrames.Put(f); } else + { llarp::LogWarn("decrypt frame fail"); + delete f; + } // f->hook = &handle_frame_decrypt; // iwp_call_async_frame_decrypt(iwp, f); } diff --git a/llarp/iwp/transit_message.cpp b/llarp/iwp/transit_message.cpp index b15215671..a5eae879d 100644 --- a/llarp/iwp/transit_message.cpp +++ b/llarp/iwp/transit_message.cpp @@ -76,6 +76,12 @@ transit_message::should_resend_xmit(llarp_time_t now) const return lastAck == 0 && now - started > 1000; } +bool +transit_message::should_resend_frags(llarp_time_t now) const +{ + return now - lastAck > 1000 && !completed(); +} + bool transit_message::completed() const { diff --git a/llarp/service.cpp b/llarp/service.cpp index 455d65c51..f3dfc50b6 100644 --- a/llarp/service.cpp +++ b/llarp/service.cpp @@ -59,14 +59,14 @@ namespace llarp } bool - ServiceInfo::CalculateAddress(Address& addr) const + ServiceInfo::CalculateAddress(byte_t* addr) const { byte_t tmp[128]; auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); if(!BEncode(&buf)) return false; - return crypto_generichash(addr, addr.size(), buf.base, buf.cur - buf.base, - nullptr, 0) + return crypto_generichash(addr, 32, buf.base, buf.cur - buf.base, nullptr, + 0) != -1; } diff --git a/llarp/service/context.cpp b/llarp/service/context.cpp index 95abc5071..e7c98698c 100644 --- a/llarp/service/context.cpp +++ b/llarp/service/context.cpp @@ -13,7 +13,6 @@ namespace llarp auto itr = m_Endpoints.begin(); while(itr != m_Endpoints.end()) { - delete itr->second; itr = m_Endpoints.erase(itr); } } @@ -46,6 +45,8 @@ namespace llarp auto &v = option.second; if(!service->SetOption(k, v)) { + llarp::LogError("failed to set ", k, "=", v, + " for hidden service endpoint ", conf.first); delete service; return false; } diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 42e318741..b0453f161 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -16,9 +16,8 @@ namespace llarp if(k == "keyfile") { m_Keyfile = v; - return true; } - return false; + return true; } void