From 545021aa3d6607f4673da326cde6056731b30a98 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 12 Apr 2021 07:39:07 -0400 Subject: [PATCH] temp commit --- CMakeLists.txt | 2 +- include/lokinet.h | 2 +- include/lokinet/lokinet_addr.h | 2 +- include/lokinet/lokinet_context.h | 2 - include/lokinet/lokinet_misc.h | 1 - include/lokinet/lokinet_os.h | 26 ++++++ include/lokinet/lokinet_socket.h | 23 +++++ include/lokinet/lokinet_srv.h | 29 ++++--- include/lokinet/lokinet_stream.h | 4 - include/lokinet/lokinet_udp.h | 83 ++++++++++++++++++ llarp/config/config.cpp | 13 ++- llarp/config/config.hpp | 2 +- llarp/dht/context.cpp | 21 ++++- llarp/dns/srv_data.cpp | 3 +- llarp/endpoint_base.hpp | 7 ++ llarp/handlers/exit.cpp | 8 ++ llarp/handlers/exit.hpp | 6 ++ llarp/handlers/tun.cpp | 4 +- llarp/lokinet_shared.cpp | 95 +++++++++++++++++++++ llarp/messages/dht_immediate.cpp | 2 +- llarp/messages/relay_status.cpp | 2 +- llarp/path/path.cpp | 2 +- llarp/path/path_context.cpp | 4 +- llarp/path/pathbuilder.cpp | 12 +-- llarp/path/transit_hop.cpp | 4 +- llarp/quic/connection.cpp | 2 +- llarp/quic/stream.hpp | 2 +- llarp/router/abstractrouter.hpp | 2 +- llarp/router/i_outbound_message_handler.hpp | 2 +- llarp/router/outbound_message_handler.cpp | 18 ++-- llarp/router/outbound_message_handler.hpp | 4 +- llarp/router/router.cpp | 13 +-- llarp/router/router.hpp | 2 +- llarp/rpc/lokid_rpc_client.cpp | 51 +++++++---- llarp/rpc/lokid_rpc_client.hpp | 7 +- llarp/service/endpoint.cpp | 55 ++++++++++-- llarp/service/endpoint.hpp | 6 ++ llarp/service/intro_set.cpp | 5 +- 38 files changed, 425 insertions(+), 103 deletions(-) create mode 100644 include/lokinet/lokinet_os.h create mode 100644 include/lokinet/lokinet_socket.h create mode 100644 include/lokinet/lokinet_udp.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 4f5fec79e..5760afd59 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -254,7 +254,7 @@ endif() if(TESTNET) add_definitions(-DTESTNET=1) # 5 times slower than realtime - add_definitions(-DTESTNET_SPEED=5) + # add_definitions(-DTESTNET_SPEED=5) endif() if(SHADOW) diff --git a/include/lokinet.h b/include/lokinet.h index 6a920924f..4cda5b53f 100644 --- a/include/lokinet.h +++ b/include/lokinet.h @@ -5,4 +5,4 @@ #include "lokinet/lokinet_misc.h" #include "lokinet/lokinet_addr.h" #include "lokinet/lokinet_stream.h" - +#include "lokinet/lokinet_udp.h" diff --git a/include/lokinet/lokinet_addr.h b/include/lokinet/lokinet_addr.h index d67771f75..f3a5e3690 100644 --- a/include/lokinet/lokinet_addr.h +++ b/include/lokinet/lokinet_addr.h @@ -6,7 +6,7 @@ extern "C" { #endif -/// get a free()-able null terminated string that holds our .loki address + /// get a free()-able null terminated string that holds our .loki address /// returns NULL if we dont have one right now char* lokinet_address(struct lokinet_context*); diff --git a/include/lokinet/lokinet_context.h b/include/lokinet/lokinet_context.h index e6fd2f408..a7f4b9b58 100644 --- a/include/lokinet/lokinet_context.h +++ b/include/lokinet/lokinet_context.h @@ -4,7 +4,6 @@ #include #include - #ifdef __cplusplus extern "C" { @@ -49,7 +48,6 @@ extern "C" int lokinet_add_bootstrap_rc(const char*, size_t, struct lokinet_context*); - #ifdef __cplusplus } #endif diff --git a/include/lokinet/lokinet_misc.h b/include/lokinet/lokinet_misc.h index 7d5ad04ca..fde288997 100644 --- a/include/lokinet/lokinet_misc.h +++ b/include/lokinet/lokinet_misc.h @@ -1,6 +1,5 @@ #pragma once - #ifdef __cplusplus extern "C" { diff --git a/include/lokinet/lokinet_os.h b/include/lokinet/lokinet_os.h new file mode 100644 index 000000000..59ce14df0 --- /dev/null +++ b/include/lokinet/lokinet_os.h @@ -0,0 +1,26 @@ +#pragma once + +/// OS specific types + +#ifdef _WIN32 + +#else +#include +#include + +#endif + +#ifdef __cplusplus +extern "C" +{ +#endif + +#ifdef _WIN32 + typedef HANDLE OS_FD_t; +#else +typedef int OS_FD_t; +#endif + +#ifdef __cplusplus +} +#endif diff --git a/include/lokinet/lokinet_socket.h b/include/lokinet/lokinet_socket.h new file mode 100644 index 000000000..20d8d509a --- /dev/null +++ b/include/lokinet/lokinet_socket.h @@ -0,0 +1,23 @@ +#pragma once + +#include "lokinet_context.h" +#include "lokinet_os.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + + /// poll many sockets for activity + /// each pollfd.fd should be set to the socket id + /// returns 0 on sucess + int + lokinet_poll(struct pollfd* poll, nfds_t numsockets, struct lokinet_context* ctx); + + /// close a udp socket or a stream socket by its id + void + lokinet_close_socket(int id, struct lokinet_context* ctx); + +#ifdef __cplusplus +} +#endif diff --git a/include/lokinet/lokinet_srv.h b/include/lokinet/lokinet_srv.h index 654a322f2..4827b6862 100644 --- a/include/lokinet/lokinet_srv.h +++ b/include/lokinet/lokinet_srv.h @@ -15,7 +15,7 @@ extern "C" /// the weight of this record uint16_t weight; /// null terminated string of the hostname - char * target; + char target[256]; /// the port to use int port; }; @@ -26,35 +26,38 @@ extern "C" /// the result of an srv lookup struct lokinet_srv_lookup_result { - /// pointer to internal members - /// dont touch me - struct lokinet_srv_lookup_private * internal; /// set to zero on success otherwise is the error code - /// int error; + /// pointer to internal members + /// dont touch me + struct lokinet_srv_lookup_private* internal; }; /// do a srv lookup on host for service /// caller MUST call lokinet_srv_lookup_done when they are done handling the result - void - lokinet_srv_lookup(char * host, char * service, struct lokinet_srv_lookup_result * result, struct lokinet_context * ctx); + int + lokinet_srv_lookup( + char* host, + char* service, + struct lokinet_srv_lookup_result* result, + struct lokinet_context* ctx); - /// a hook function to handle each srv record in a srv lookup result /// passes in NULL when we are at the end of iteration /// passes in void * user data /// hook should NOT free the record - typedef void (*lokinet_srv_record_iterator)(struct lokinet_srv_record *, void *); + typedef void (*lokinet_srv_record_iterator)(struct lokinet_srv_record*, void*); /// iterate over each srv record in a lookup result - /// user is passes into hook and called for each result and then with NULL as the result on the end of iteration + /// user is passes into hook and called for each result and then with NULL as the result on the + /// end of iteration void - lokinet_for_each_srv_record(struct lokinet_srv_lookup_result * result, lokinet_srv_record_iterator iter, void * user); - + lokinet_for_each_srv_record( + struct lokinet_srv_lookup_result* result, lokinet_srv_record_iterator iter, void* user); /// free internal members of a srv lookup result after use of the result void - lokinet_srv_lookup_done(struct lokinet_srv_lookup_result * result); + lokinet_srv_lookup_done(struct lokinet_srv_lookup_result* result); #ifdef __cplusplus } diff --git a/include/lokinet/lokinet_stream.h b/include/lokinet/lokinet_stream.h index 3d42344e7..0e1d9f039 100644 --- a/include/lokinet/lokinet_stream.h +++ b/include/lokinet/lokinet_stream.h @@ -53,10 +53,6 @@ extern "C" int lokinet_inbound_stream(uint16_t port, struct lokinet_context* context); - /// close a stream by id - void - lokinet_close_stream(int stream_id, struct lokinet_context* context); - #ifdef __cplusplus } #endif diff --git a/include/lokinet/lokinet_udp.h b/include/lokinet/lokinet_udp.h new file mode 100644 index 000000000..7c73e8049 --- /dev/null +++ b/include/lokinet/lokinet_udp.h @@ -0,0 +1,83 @@ +#pragma once + +#include "lokinet_context.h" +#include "lokinet_os.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + + /// information about a udp flow + struct lokinet_udp_flow + { + /// the socket id for this flow used for i/o purposes and closing this socket + int socket_id; + /// remote endpoint's .loki or .snode address + char remote_addr[256]; + /// local endpoint's ip address + char local_addr[64]; + /// remote endpont's port + int remote_port; + /// local endpoint's port + int local_port; + }; + + /// establish an outbound udp flow + /// remoteHost is the remote .loki or .snode address conneting to + /// remotePort is either a string integer or an srv record name to lookup, e.g. thingservice in + /// which we do a srv lookup for _udp.thingservice.remotehost.tld and use the "best" port provided + /// localAddr is the local ip:port to bind our socket to, if localAddr is NULL then + /// lokinet_udp_sendmmsg MUST be used to send packets return 0 on success return nonzero on fail, + /// containing an errno value + int + lokinet_udp_establish( + char* remoteHost, + char* remotePort, + char* localAddr, + struct lokinet_udp_flow* flow, + struct lokinet_context* ctx); + + /// a result from a lokinet_udp_bind call + struct lokinet_udp_bind_result + { + /// a socket id used to close a lokinet udp socket + int socket_id; + }; + + /// inbound listen udp socket + /// expose udp port exposePort to the void + /// if srv is not NULL add an srv record for this port, the format being "thingservice" in which + /// will add a srv record "_udp.thingservice.ouraddress.tld" that advertises this port provide + /// localAddr to forward inbound udp packets to "ip:port" if localAddr is NULL then the resulting + /// socket MUST be drained by lokinet_udp_recvmmsg returns 0 on success returns nonzero on error + /// in which it is an errno value + int + lokinet_udp_bind( + int exposedPort, + char* srv, + char* localAddr, + struct lokinet_udp_listen_result* result, + struct lokinet_context* ctx); + + /// poll many udp sockets for activity + /// each pollfd.fd should be set to the udp socket id + /// returns 0 on sucess + int + lokinet_udp_poll(struct pollfd* poll, nfds_t numsockets, struct lokinet_context* ctx); + + struct lokinet_udp_pkt + { + char remote_addr[256]; + int remote_port; + struct iovec pkt; + }; + + /// analog to recvmmsg + ssize_t + lokinet_udp_recvmmsg( + int socket_id, struct lokinet_udp_pkt* pkts, size_t numevents, struct lokient_context* ctx); + +#ifdef __cplusplus +} +#endif diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp index aaa0d1175..7ab3f6c58 100644 --- a/llarp/config/config.cpp +++ b/llarp/config/config.cpp @@ -288,9 +288,16 @@ namespace llarp "network", "strict-connect", ClientOnly, - AssignmentAcceptor(m_strictConnect), + MultiValue, + [this](std::string value) { + RouterID router; + if (not router.FromString(value)) + throw std::invalid_argument{"bad snode value: " + value}; + if (not m_strictConnect.insert(router).second) + throw std::invalid_argument{"duplicate strict connect snode: " + value}; + }, Comment{ - "Public key of a router which will act as sole first-hop. This may be used to", + "Public key of a router which will act as a pinned first-hop. This may be used to", "provide a trusted router (consider that you are not fully anonymous with your", "first hop).", }); @@ -392,7 +399,7 @@ namespace llarp "Number of paths to maintain at any given time.", }, [this](int arg) { - if (arg < 2 or arg > 8) + if (arg < 3 or arg > 8) throw std::invalid_argument("[endpoint]:paths must be >= 2 and <= 8"); m_Paths = arg; }); diff --git a/llarp/config/config.hpp b/llarp/config/config.hpp index 96d0420a1..8d6194b51 100644 --- a/llarp/config/config.hpp +++ b/llarp/config/config.hpp @@ -94,7 +94,7 @@ namespace llarp { std::optional m_enableProfiling; bool m_saveProfiles; - std::string m_strictConnect; + std::set m_strictConnect; std::string m_ifname; IPRange m_ifaddr; diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index 4331734f2..8d3b0b55f 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -335,6 +335,22 @@ namespace llarp CleanupTX(); const llarp_time_t now = Now(); + if (_nodes) + { + // expire router contacts in memory + auto& nodes = _nodes->nodes; + auto itr = nodes.begin(); + while (itr != nodes.end()) + { + if (itr->second.rc.IsExpired(now)) + { + itr = nodes.erase(itr); + } + else + ++itr; + } + } + if (_services) { // expire intro sets @@ -463,10 +479,7 @@ namespace llarp { llarp::DHTImmediateMessage m; m.msgs.emplace_back(msg); - router->SendToOrQueue(peer, &m, [](SendStatus status) { - if (status != SendStatus::Success) - LogInfo("DHTSendTo unsuccessful, status: ", (int)status); - }); + router->SendToOrQueue(peer, m); auto now = Now(); router->PersistSessionUntil(peer, now + 1min); } diff --git a/llarp/dns/srv_data.cpp b/llarp/dns/srv_data.cpp index 5e08bb91e..883d9c68c 100644 --- a/llarp/dns/srv_data.cpp +++ b/llarp/dns/srv_data.cpp @@ -117,7 +117,8 @@ namespace llarp::dns if (not bencode_discard(buf)) return false; byte_t* end = buf->cur; - std::string_view srvString{reinterpret_cast(begin), end - begin}; + std::string_view srvString{ + reinterpret_cast(begin), static_cast(end - begin)}; try { SRVTuple tuple{}; diff --git a/llarp/endpoint_base.hpp b/llarp/endpoint_base.hpp index b0ce0e465..2a33f7602 100644 --- a/llarp/endpoint_base.hpp +++ b/llarp/endpoint_base.hpp @@ -123,6 +123,13 @@ namespace llarp virtual bool SendToOrQueue( service::ConvoTag tag, const llarp_buffer_t& payload, service::ProtocolType t) = 0; + + /// lookup srv records async + virtual void + LookupServiceAsync( + std::string name, + std::string service, + std::function)> resultHandler) = 0; }; } // namespace llarp diff --git a/llarp/handlers/exit.cpp b/llarp/handlers/exit.cpp index b94683936..382595acd 100644 --- a/llarp/handlers/exit.cpp +++ b/llarp/handlers/exit.cpp @@ -40,6 +40,14 @@ namespace llarp resultHandler(std::nullopt); } + void + ExitEndpoint::LookupServiceAsync( + std::string, std::string, std::function)> resultHandler) + { + // TODO: implement me + resultHandler({}); + } + std::optional ExitEndpoint::GetEndpointWithConvoTag(service::ConvoTag tag) const { diff --git a/llarp/handlers/exit.hpp b/llarp/handlers/exit.hpp index 4a8e87c3f..82e5c786b 100644 --- a/llarp/handlers/exit.hpp +++ b/llarp/handlers/exit.hpp @@ -69,6 +69,12 @@ namespace llarp bool HandleHookedDNSMessage(dns::Message msg, std::function) override; + void + LookupServiceAsync( + std::string name, + std::string service, + std::function)> handler) override; + bool AllocateNewExit(const PubKey pk, const PathID_t& path, bool permitInternet); diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 6cb923842..66c5b59ba 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -79,8 +79,8 @@ namespace llarp const SockAddr laddr{src, nuint16_t{*reinterpret_cast(ptr)}}; const SockAddr raddr{dst, nuint16_t{*reinterpret_cast(ptr + 2)}}; - OwnedBuffer buf{pkt.sz - (udp_header_size + ip_header_size)}; - std::copy_n(ptr + udp_header_size, buf.sz, buf.buf.get()); + OwnedBuffer buf{pkt.sz - (8 + ip_header_size)}; + std::copy_n(ptr + 8, buf.sz, buf.buf.get()); if (m_Resolver->ShouldHandlePacket(laddr, raddr, buf)) m_Resolver->HandlePacket(laddr, raddr, buf); else diff --git a/llarp/lokinet_shared.cpp b/llarp/lokinet_shared.cpp index 7cfe254f3..03244e4b5 100644 --- a/llarp/lokinet_shared.cpp +++ b/llarp/lokinet_shared.cpp @@ -126,8 +126,64 @@ namespace return -1; } + lokinet_srv_record + SRVFromData(const llarp::dns::SRVData& data, std::string name) + { + // TODO: implement me + (void)data; + (void)name; + return {}; + } + } // namespace +struct lokinet_srv_lookup_private +{ + std::vector results; + + int + LookupSRV(std::string host, std::string service, lokinet_context* ctx) + { + std::promise promise; + { + auto lock = ctx->acquire(); + if (ctx->impl and ctx->impl->IsUp()) + { + ctx->impl->CallSafe([host, service, &promise, ctx, self = this]() { + auto ep = ctx->endpoint(); + if (ep == nullptr) + { + promise.set_value(ENOTSUP); + return; + } + ep->LookupServiceAsync(host, service, [self, &promise, host](auto results) { + // for (const auto& result : results) + // { + // self->results.emplace_back(SRVFromData(result, host)); + // } + promise.set_value(0); + }); + }); + } + else + { + promise.set_value(EHOSTDOWN); + } + } + auto future = promise.get_future(); + return future.get(); + } + + void + IterateAll(std::function visit) + { + for (size_t idx = 0; idx < results.size(); ++idx) + visit(&results[idx]); + // null terminator + visit(nullptr); + } +}; + extern "C" { struct lokinet_context* @@ -502,4 +558,43 @@ extern "C" catch (...) {} } + + int + lokinet_srv_lookup( + char* host, + char* service, + struct lokinet_srv_lookup_result* result, + struct lokinet_context* ctx) + { + if (result == nullptr or ctx == nullptr or host == nullptr or service == nullptr) + return -1; + // sanity check, if the caller has not free()'d internals yet free them + if (result->internal) + delete result->internal; + result->internal = new lokinet_srv_lookup_private{}; + return result->internal->LookupSRV(host, service, ctx); + } + + void + lokinet_for_each_srv_record( + struct lokinet_srv_lookup_result* result, lokinet_srv_record_iterator iter, void* user) + { + if (result and result->internal) + { + result->internal->IterateAll([iter, user](auto* result) { iter(result, user); }); + } + else + { + iter(nullptr, user); + } + } + + void + lokinet_srv_lookup_done(struct lokinet_srv_lookup_result* result) + { + if (result == nullptr or result->internal == nullptr) + return; + delete result->internal; + result->internal = nullptr; + } } diff --git a/llarp/messages/dht_immediate.cpp b/llarp/messages/dht_immediate.cpp index 50ea7d0b1..1af4a9c47 100644 --- a/llarp/messages/dht_immediate.cpp +++ b/llarp/messages/dht_immediate.cpp @@ -74,7 +74,7 @@ namespace llarp { if (result) { - result = router->SendToOrQueue(session->GetPubKey(), &reply); + result = router->SendToOrQueue(session->GetPubKey(), reply); } } return true; diff --git a/llarp/messages/relay_status.cpp b/llarp/messages/relay_status.cpp index 86753fee4..9a72a07c8 100644 --- a/llarp/messages/relay_status.cpp +++ b/llarp/messages/relay_status.cpp @@ -232,7 +232,7 @@ namespace llarp AbstractRouter* router, const RouterID nextHop, std::shared_ptr msg) { llarp::LogDebug("Attempting to send LR_Status message to (", nextHop, ")"); - if (not router->SendToOrQueue(nextHop, msg.get())) + if (not router->SendToOrQueue(nextHop, *msg)) { llarp::LogError("Sending LR_Status message, SendToOrQueue to ", nextHop, " failed"); } diff --git a/llarp/path/path.cpp b/llarp/path/path.cpp index fc149f673..79ee51c68 100644 --- a/llarp/path/path.cpp +++ b/llarp/path/path.cpp @@ -436,7 +436,7 @@ namespace llarp { for (const auto& msg : msgs) { - if (r->SendToOrQueue(Upstream(), &msg)) + if (r->SendToOrQueue(Upstream(), msg)) { m_TXRate += msg.X.size(); } diff --git a/llarp/path/path_context.cpp b/llarp/path/path_context.cpp index 2b1382fee..accecefdd 100644 --- a/llarp/path/path_context.cpp +++ b/llarp/path/path_context.cpp @@ -83,11 +83,11 @@ namespace llarp return false; } - auto msg = std::make_shared(frames); + const LR_CommitMessage msg{frames}; LogDebug("forwarding LRCM to ", nextHop); - return m_Router->SendToOrQueue(nextHop, msg.get(), handler); + return m_Router->SendToOrQueue(nextHop, msg, handler); } template < diff --git a/llarp/path/pathbuilder.cpp b/llarp/path/pathbuilder.cpp index f733d5747..3fdb4d9ba 100644 --- a/llarp/path/pathbuilder.cpp +++ b/llarp/path/pathbuilder.cpp @@ -130,7 +130,6 @@ namespace llarp ctx->router->NotifyRouterEvent(ctx->router->pubkey(), ctx->path); const RouterID remote = ctx->path->Upstream(); - const ILinkMessage* msg = &ctx->LRCM; auto sentHandler = [ctx](auto status) { if (status == SendStatus::Success) { @@ -145,7 +144,7 @@ namespace llarp ctx->path = nullptr; ctx->pathset = nullptr; }; - if (ctx->router->SendToOrQueue(remote, msg, sentHandler)) + if (ctx->router->SendToOrQueue(remote, ctx->LRCM, sentHandler)) { // persist session with router until this path is done if (ctx->path) @@ -300,7 +299,7 @@ namespace llarp void Builder::BuildOne(PathRole roles) { - if (const auto maybe = GetHopsForBuild(); maybe.has_value()) + if (const auto maybe = GetHopsForBuild()) Build(*maybe, roles); } @@ -318,7 +317,10 @@ namespace llarp { const auto maybe = SelectFirstHop(exclude); if (not maybe.has_value()) + { + LogWarn(Name(), " has no first hop candidate"); return std::nullopt; + } hops.emplace_back(*maybe); }; @@ -395,15 +397,13 @@ namespace llarp { if (IsStopped()) return; - + lastBuild = Now(); const RouterID edge{hops[0].pubkey}; if (not m_EdgeLimiter.Insert(edge)) { LogWarn(Name(), " building too fast to edge router ", edge); return; } - - lastBuild = Now(); // async generate keys auto ctx = std::make_shared(); ctx->router = m_router; diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index 7513b689a..470400694 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -214,7 +214,7 @@ namespace llarp info.downstream, " to ", info.upstream); - r->SendToOrQueue(info.upstream, &msg); + r->SendToOrQueue(info.upstream, msg); } r->linkManager().PumpLinks(); } @@ -232,7 +232,7 @@ namespace llarp info.upstream, " to ", info.downstream); - r->SendToOrQueue(info.downstream, &msg); + r->SendToOrQueue(info.downstream, msg); } r->linkManager().PumpLinks(); } diff --git a/llarp/quic/connection.cpp b/llarp/quic/connection.cpp index fddf8b56d..54b674e24 100644 --- a/llarp/quic/connection.cpp +++ b/llarp/quic/connection.cpp @@ -1088,7 +1088,7 @@ namespace llarp::quic std::string_view lokinet_metadata{ reinterpret_cast( data.substr(lokinet_metadata_code.size() + meta_len_bytes).data()), - meta_len}; + static_cast(meta_len)}; LogDebug("Received bencoded lokinet metadata: ", buffer_printer{lokinet_metadata}); uint16_t port; diff --git a/llarp/quic/stream.hpp b/llarp/quic/stream.hpp index cd731c583..829ee09de 100644 --- a/llarp/quic/stream.hpp +++ b/llarp/quic/stream.hpp @@ -9,7 +9,7 @@ #include #include #include - +#include #include namespace llarp::quic diff --git a/llarp/router/abstractrouter.hpp b/llarp/router/abstractrouter.hpp index 388be3825..dff705206 100644 --- a/llarp/router/abstractrouter.hpp +++ b/llarp/router/abstractrouter.hpp @@ -243,7 +243,7 @@ namespace llarp virtual bool SendToOrQueue( - const RouterID& remote, const ILinkMessage* msg, SendStatusHandler handler = nullptr) = 0; + const RouterID& remote, const ILinkMessage& msg, SendStatusHandler handler = nullptr) = 0; virtual void PersistSessionUntil(const RouterID& remote, llarp_time_t until) = 0; diff --git a/llarp/router/i_outbound_message_handler.hpp b/llarp/router/i_outbound_message_handler.hpp index 46d7dcddd..83f02d25c 100644 --- a/llarp/router/i_outbound_message_handler.hpp +++ b/llarp/router/i_outbound_message_handler.hpp @@ -32,7 +32,7 @@ namespace llarp virtual ~IOutboundMessageHandler() = default; virtual bool - QueueMessage(const RouterID& remote, const ILinkMessage* msg, SendStatusHandler callback) = 0; + QueueMessage(const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback) = 0; virtual void Tick() = 0; diff --git a/llarp/router/outbound_message_handler.cpp b/llarp/router/outbound_message_handler.cpp index f3354b2c6..1e3180723 100644 --- a/llarp/router/outbound_message_handler.cpp +++ b/llarp/router/outbound_message_handler.cpp @@ -21,7 +21,7 @@ namespace llarp bool OutboundMessageHandler::QueueMessage( - const RouterID& remote, const ILinkMessage* msg, SendStatusHandler callback) + const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback) { if (not _linkManager->SessionIsClient(remote) and not _lookupHandler->RemoteIsAllowed(remote)) { @@ -29,7 +29,7 @@ namespace llarp return true; } - const uint16_t priority = msg->Priority(); + const uint16_t priority = msg.Priority(); std::array linkmsg_buffer; llarp_buffer_t buf(linkmsg_buffer); @@ -46,7 +46,7 @@ namespace llarp if (_linkManager->HasSessionTo(remote)) { - QueueOutboundMessage(remote, std::move(message), msg->pathid, priority); + QueueOutboundMessage(remote, std::move(message), msg.pathid, priority); return true; } @@ -195,9 +195,9 @@ namespace llarp } bool - OutboundMessageHandler::EncodeBuffer(const ILinkMessage* msg, llarp_buffer_t& buf) + OutboundMessageHandler::EncodeBuffer(const ILinkMessage& msg, llarp_buffer_t& buf) { - if (!msg->BEncode(&buf)) + if (!msg.BEncode(&buf)) { LogWarn("failed to encode outbound message, buffer size left: ", buf.size_left()); return false; @@ -248,10 +248,6 @@ namespace llarp if (outboundQueue.tryPushBack(std::move(entry)) != llarp::thread::QueueReturn::Success) { m_queueStats.dropped++; - LogWarn( - "QueueOutboundMessage outbound message handler dropped message on " - "pathid=", - pathid); DoCallback(callback_copy, SendStatus::Congestion); } else @@ -288,10 +284,6 @@ namespace llarp } else { - LogWarn( - "ProcessOutboundQueue outbound message handler dropped message on " - "pathid=", - entry.pathid); DoCallback(entry.message.second, SendStatus::Congestion); m_queueStats.dropped++; } diff --git a/llarp/router/outbound_message_handler.hpp b/llarp/router/outbound_message_handler.hpp index 8769b80d6..a3cfa4bad 100644 --- a/llarp/router/outbound_message_handler.hpp +++ b/llarp/router/outbound_message_handler.hpp @@ -28,7 +28,7 @@ namespace llarp OutboundMessageHandler(size_t maxQueueSize = MAX_OUTBOUND_QUEUE_SIZE); bool - QueueMessage(const RouterID& remote, const ILinkMessage* msg, SendStatusHandler callback) + QueueMessage(const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback) override EXCLUDES(_mutex); void @@ -98,7 +98,7 @@ namespace llarp QueueSessionCreation(const RouterID& remote); bool - EncodeBuffer(const ILinkMessage* msg, llarp_buffer_t& buf); + EncodeBuffer(const ILinkMessage& msg, llarp_buffer_t& buf); bool Send(const RouterID& remote, const Message& msg); diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 063138cf0..58a823a8b 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -193,7 +193,7 @@ namespace llarp } bool - Router::SendToOrQueue(const RouterID& remote, const ILinkMessage* msg, SendStatusHandler handler) + Router::SendToOrQueue(const RouterID& remote, const ILinkMessage& msg, SendStatusHandler handler) { return _outboundMessageHandler.QueueMessage(remote, msg, handler); } @@ -483,16 +483,7 @@ namespace llarp const auto& val = networkConfig.m_strictConnect; if (IsServiceNode()) throw std::runtime_error("cannot use strict-connect option as service node"); - - // try as a RouterID and as a PubKey, convert to RouterID if needed - llarp::RouterID snode; - llarp::PubKey pk; - if (pk.FromString(val)) - strictConnectPubkeys.emplace(pk); - else if (snode.FromString(val)) - strictConnectPubkeys.insert(snode); - else - throw std::invalid_argument(stringify("invalid key for strict-connect: ", val)); + strictConnectPubkeys.insert(val.begin(), val.end()); } std::vector configRouters = conf.connect.routers; diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 662b9bab7..b0dd03575 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -411,7 +411,7 @@ namespace llarp /// MUST be called in the logic thread bool SendToOrQueue( - const RouterID& remote, const ILinkMessage* msg, SendStatusHandler handler) override; + const RouterID& remote, const ILinkMessage& msg, SendStatusHandler handler) override; void ForEachPeer(std::function visit, bool randomize = false) diff --git a/llarp/rpc/lokid_rpc_client.cpp b/llarp/rpc/lokid_rpc_client.cpp index 25aeae61b..9f4c450b9 100644 --- a/llarp/rpc/lokid_rpc_client.cpp +++ b/llarp/rpc/lokid_rpc_client.cpp @@ -37,6 +37,10 @@ namespace llarp { // m_lokiMQ->log_level(toLokiMQLogLevel(LogLevel::Instance().curLevel)); + // new block handler + m_lokiMQ->add_category("notify", oxenmq::Access{oxenmq::AuthLevel::none}) + .add_request_command("block", [this](oxenmq::Message& m) { HandleNewBlock(m); }); + // TODO: proper auth here auto lokidCategory = m_lokiMQ->add_category("lokid", oxenmq::Access{oxenmq::AuthLevel::none}); lokidCategory.add_request_command( @@ -68,14 +72,29 @@ namespace llarp } void - LokidRpcClient::UpdateServiceNodeList() + LokidRpcClient::HandleNewBlock(oxenmq::Message& msg) + { + if (msg.data.size() != 2) + { + LogError( + "we got an invalid new block notification with ", + msg.data.size(), + " parts instead of 2 parts so we will not update the list of service nodes"); + return; // bail + } + LogDebug("new block at hieght ", msg.data[0]); + UpdateServiceNodeList(std::string{msg.data[1]}); + } + + void + LokidRpcClient::UpdateServiceNodeList(std::string topblock) { nlohmann::json request, fields; fields["pubkey_ed25519"] = true; request["fields"] = fields; request["active_only"] = true; - if (not m_CurrentBlockHash.empty()) - request["poll_block_hash"] = m_CurrentBlockHash; + if (not topblock.empty()) + request["poll_block_hash"] = topblock; Request( "rpc.get_service_nodes", [self = shared_from_this()](bool success, std::vector data) { @@ -105,9 +124,8 @@ namespace llarp LokidRpcClient::Connected() { constexpr auto PingInterval = 30s; - constexpr auto NodeListUpdateInterval = 30s; - auto makePingRequest = [self = shared_from_this()]() { + // send a ping nlohmann::json payload = {{"version", {VERSION[0], VERSION[1], VERSION[2]}}}; self->Request( "admin.lokinet_ping", @@ -116,24 +134,25 @@ namespace llarp LogDebug("Received response for ping. Successful: ", success); }, payload.dump()); + // subscribe to block updates + self->Request("sub.block", [](bool success, std::vector data) { + if (data.empty() or not success) + { + LogError("failed to subscribe to new blocks"); + return; + } + LogDebug("subscribed to new blocks: ", data[0]); + }); }; m_lokiMQ->add_timer(makePingRequest, PingInterval); - m_lokiMQ->add_timer( - [self = shared_from_this()]() { self->UpdateServiceNodeList(); }, NodeListUpdateInterval); - UpdateServiceNodeList(); + // initial fetch of service node list + UpdateServiceNodeList(""); } void LokidRpcClient::HandleGotServiceNodeList(std::string data) { auto j = nlohmann::json::parse(std::move(data)); - { - const auto itr = j.find("block_hash"); - if (itr != j.end()) - { - m_CurrentBlockHash = itr->get(); - } - } { const auto itr = j.find("unchanged"); if (itr != j.end()) @@ -165,7 +184,7 @@ namespace llarp if (nodeList.empty()) { - LogWarn("got empty service node list from lokid"); + LogWarn("got empty service node list, ignoring."); return; } // inform router about the new list diff --git a/llarp/rpc/lokid_rpc_client.hpp b/llarp/rpc/lokid_rpc_client.hpp index afcca6785..08c84e9f1 100644 --- a/llarp/rpc/lokid_rpc_client.hpp +++ b/llarp/rpc/lokid_rpc_client.hpp @@ -45,7 +45,7 @@ namespace llarp Command(std::string_view cmd); void - UpdateServiceNodeList(); + UpdateServiceNodeList(std::string topblock); template void @@ -68,9 +68,12 @@ namespace llarp void HandleGetPeerStats(oxenmq::Message& msg); + // Handles notification of a new block + void + HandleNewBlock(oxenmq::Message& msg); + std::optional m_Connection; LMQ_ptr m_lokiMQ; - std::string m_CurrentBlockHash; AbstractRouter* const m_Router; }; diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index bb812ec12..e57d28006 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -110,7 +110,7 @@ namespace llarp "could not publish descriptors for endpoint ", Name(), " because we couldn't get enough valid introductions"); - ManualRebuild(1); + BuildOne(); return; } introSet().I.clear(); @@ -179,6 +179,44 @@ namespace llarp return std::nullopt; } + void + Endpoint::LookupServiceAsync( + std::string name, + std::string service, + std::function)> resultHandler) + { + auto fail = [resultHandler]() { resultHandler({}); }; + auto lookupByAddress = [resultHandler](auto address) { + if (auto* ptr = std::get_if(&address)) + {} + else if (auto* ptr = std::get_if
(&address)) + {} + else + { + resultHandler({}); + } + }; + if (auto maybe = ParseAddress(name)) + { + lookupByAddress(*maybe); + } + else if (NameIsValid(name)) + { + LookupNameAsync(name, [lookupByAddress, fail](auto maybe) { + if (maybe) + { + lookupByAddress(*maybe); + } + else + { + fail(); + } + }); + } + else + fail(); + } + bool Endpoint::IntrosetIsStale() const { @@ -835,14 +873,21 @@ namespace llarp LogInfo(Name(), " looking up LNS name: ", name); path::Path::UniqueEndpointSet_t paths; ForEachPath([&](auto path) { - if (path->IsReady()) - { + if (path and path->IsReady()) paths.insert(path); - } }); + + constexpr size_t min_unique_lns_endpoints = 3; + // not enough paths - if (paths.size() < 3) + if (paths.size() < min_unique_lns_endpoints) { + LogWarn( + Name(), + " not enough paths for lns lookup, have ", + paths.size(), + " need ", + min_unique_lns_endpoints); handler(std::nullopt); return; } diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index d3cdb58f0..53ef31c65 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -224,6 +224,12 @@ namespace llarp std::function>)> resultHandler) override; + void + LookupServiceAsync( + std::string name, + std::string service, + std::function)> resultHandler) override; + /// called on event loop pump virtual void Pump(llarp_time_t now); diff --git a/llarp/service/intro_set.cpp b/llarp/service/intro_set.cpp index fded044b2..426125efb 100644 --- a/llarp/service/intro_set.cpp +++ b/llarp/service/intro_set.cpp @@ -90,7 +90,7 @@ namespace llarp::service llarp_buffer_t buf(payload); CryptoManager::instance()->xchacha20(buf, k, nounce); if (not i.BDecode(&buf)) - return {}; + return std::nullopt; return i; } @@ -177,7 +177,8 @@ namespace llarp::service byte_t* end = buf->cur; - std::string_view srvString{reinterpret_cast(begin), end - begin}; + std::string_view srvString{ + reinterpret_cast(begin), static_cast(end - begin)}; try {