diff --git a/contrib/py/lnproxy/lnproxy/__main__.py b/contrib/py/lnproxy/lnproxy/__main__.py index a3994bba5..81e42c01c 100644 --- a/contrib/py/lnproxy/lnproxy/__main__.py +++ b/contrib/py/lnproxy/lnproxy/__main__.py @@ -3,6 +3,13 @@ from http.server import ThreadingHTTPServer as Server from http.server import BaseHTTPRequestHandler as BaseHandler + +bootstrapFromURL = True +try: + import requests +except ImportError: + bootstrapFromURL = False + import ctypes from ctypes.util import find_library @@ -26,7 +33,7 @@ class ResultStruct(ctypes.Structure): class LNContext(ctypes.Structure): pass - + class Context: """ wrapper around liblokinet @@ -39,44 +46,59 @@ class Context: self._ln.lokinet_address.restype = ctypes.c_char_p self._ln.lokinet_address.argtypes = (ctypes.POINTER(LNContext), ) self._ln.lokinet_outbound_stream.restype = ctypes.POINTER(ResultStruct) - self._ln.lokinet_outbound_stream.argtypes = (ctypes.POINTER(ResultStruct), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(LNContext)) + self._ln.lokinet_outbound_stream.argtypes = (ctypes.POINTER(ResultStruct), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(LNContext)) self._ctx = self._ln.lokinet_context_new() + self._addrmap = dict() def free(self, ptr): self._c.free(ptr) + def add_bootstrap(self, data): + ptr = ctypes.create_string_buffer(data) + ptrlen = ctypes.c_size_t(len(data)) + return self.ln_call("lokinet_add_bootstrap_rc", ptr, ptrlen) def addr(self): return self._ln.lokinet_address(self._ctx).decode('ascii') def expose(self, port): return self.ln_call('lokinet_inbound_stream', port) - + def ln_call(self, funcname, *args): args += (self._ctx,) print("call {}{}".format(funcname, args)) return self._ln[funcname](*args) - + def start(self): - self.ln_call("lokinet_context_start") + return self.ln_call("lokinet_context_start") def stop(self): self.ln_call("lokinet_context_stop") + def hasAddr(self, addr): + return addr in self._addrmap + + def putAddr(self, addr, val): + self._addrmap[addr] = val + + def getAddr(self, addr): + if addr in self._addrmap: + return self._addrmap[addr] + def __del__(self): self.stop() - self._ln_call("lokinet_context_free") + self._ln_call("lokinet_context_free") class Stream: - + def __init__(self, ctx): self._ctx = ctx self._id = None - + def connect(self, remote): result = ResultStruct() self._ctx.ln_call("lokinet_outbound_stream", ctypes.cast(ctypes.addressof(result), ctypes.POINTER(ResultStruct)), ctypes.create_string_buffer(remote.encode()), ctypes.c_char_p(0)) - + if result.err: print(result.err) return @@ -94,36 +116,40 @@ class Stream: def read_and_forward_or_close(readfd, writefd): read = 0 while True: - data = os.read(readfd, 128) + data = os.read(readfd, 1024) read += len(data) if data and len(data) > 0: writefd.write(data) writefd.flush() + return True else: return read > 0 - + ctx = Context() - + class Handler(BaseHandler): def do_CONNECT(self): self.connect(self.path) - + def connect(self, host): global ctx - stream = Stream(ctx) + if not ctx.hasAddr(host): + stream = Stream(ctx) + + result = stream.connect(host) + if not result: + self.send_error(503) + return + ctx.putAddr(host, result) - result = stream.connect(host) - if not result: - self.send_error(503) - return sock = socket.socket() - sock.connect(result) + sock.connect(ctx.getAddr(host)) if not sock: self.send_error(504) return - + self.send_response_only(200) self.end_headers() sel = selectors.DefaultSelector() @@ -142,16 +168,46 @@ class Handler(BaseHandler): sel.unregister(self.rfile) sel.unregister(sock) sel.close() - stream.close() return +import os +import sys +from argparse import ArgumentParser as AP + +ap = AP() +ap.add_argument("--ip", type=str, help="ip to bind to", default="127.0.0.1") +ap.add_argument("--port", type=int, help="port to bind to", default=3000) +ap.add_argument("--bootstrap", type=str, help="bootstrap file", default="bootstrap.signed") +if bootstrapFromURL: + ap.add_argument("--bootstrap-url", type=str, help="bootstrap from remote url", default="https://seed.lokinet.org/lokinet.signed") + +args = ap.parse_args() +addr = (args.ip, args.port) +server = Server(addr, Handler) + +if os.path.exists(args.bootstrap): + with open(args.bootstrap, 'rb') as f: + if ctx.add_bootstrap(f.read()) == 0: + print("loaded {}".format(args.bootstrap)) + +if args.bootstrap_url is not None: + print('getting bootstrap info from {}'.format(args.bootstrap_url)) + resp = requests.get(args.bootstrap_url) + if resp.status_code == 200 and ctx.add_bootstrap(resp.content) == 0: + pass + else: + print("failed") + +if ctx.start() != 0: + print("failed to start") + ctx.stop() + sys.exit(-1) -server = Server(('127.0.0.1', 3000), Handler) -ctx.start() id = ctx.expose(80) print("we are {}".format(ctx.addr())) try: + print("serving on {}:{}".format(*addr)) server.serve_forever() -except: +finally: ctx.ln_call("lokinet_close_stream", id) ctx.stop() diff --git a/include/llarp.hpp b/include/llarp.hpp index 0d6e5449c..e333e2495 100644 --- a/include/llarp.hpp +++ b/include/llarp.hpp @@ -92,6 +92,10 @@ namespace llarp virtual std::shared_ptr makeRouter(const std::shared_ptr& loop); + /// create the nodedb given our current configs + virtual std::shared_ptr + makeNodeDB(); + /// create the vpn platform for use in creating network interfaces virtual std::shared_ptr makeVPNPlatform(); diff --git a/include/lokinet.h b/include/lokinet.h index 2a862ff65..0c1b6933e 100644 --- a/include/lokinet.h +++ b/include/lokinet.h @@ -3,6 +3,7 @@ #include #include +#include #ifdef __cplusplus extern "C" @@ -15,12 +16,20 @@ extern "C" struct lokinet_context* lokinet_context_new(); + /// load a bootstrap RC from memory + /// return 0 on success + /// return non zero on fail + int + lokinet_add_bootstrap_rc(const char*, size_t, struct lokinet_context*); + /// free a context allocated by lokinet_context_new void lokinet_context_free(struct lokinet_context*); /// spawn all the threads needed for operation and start running - void + /// return 0 on success + /// return non zero on fail + int lokinet_context_start(struct lokinet_context*); /// stop all operations on this lokinet context diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp index 53000e974..8ffae4ba0 100644 --- a/llarp/config/config.cpp +++ b/llarp/config/config.cpp @@ -937,8 +937,8 @@ namespace llarp { throw std::invalid_argument("cannot use empty filename as bootstrap"); } - routers.emplace_back(std::move(arg)); - if (not fs::exists(routers.back())) + files.emplace_back(std::move(arg)); + if (not fs::exists(files.back())) { throw std::invalid_argument("file does not exist: " + arg); } @@ -1356,11 +1356,12 @@ namespace llarp std::shared_ptr Config::EmbeddedConfig() { - auto config = std::make_shared(fs::current_path()); + auto config = std::make_shared(fs::path{}); config->Load(); config->logging.m_logLevel = eLogNone; config->api.m_enableRPCServer = false; config->network.m_endpointType = "null"; + config->bootstrap.files.clear(); return config; } diff --git a/llarp/config/config.hpp b/llarp/config/config.hpp index 9c389001b..e9dcc2bb5 100644 --- a/llarp/config/config.hpp +++ b/llarp/config/config.hpp @@ -188,7 +188,8 @@ namespace llarp struct BootstrapConfig { - std::vector routers; + std::vector files; + std::set routers; bool seednode; void defineConfigOptions(ConfigDefinition& conf, const ConfigGenParameters& params); diff --git a/llarp/config/key_manager.cpp b/llarp/config/key_manager.cpp index 14e9298cb..4e17c0ef4 100644 --- a/llarp/config/key_manager.cpp +++ b/llarp/config/key_manager.cpp @@ -17,6 +17,14 @@ namespace llarp if (m_initialized) return false; + if (not isSNode) + { + CryptoManager::instance()->identity_keygen(identityKey); + CryptoManager::instance()->encryption_keygen(encryptionKey); + CryptoManager::instance()->encryption_keygen(transportKey); + return true; + } + const fs::path root = config.router.m_dataDir; // utility function to assign a path, using the specified config parameter if present and diff --git a/llarp/context.cpp b/llarp/context.cpp index 5cb4aa901..c62d5893d 100644 --- a/llarp/context.cpp +++ b/llarp/context.cpp @@ -74,13 +74,19 @@ namespace llarp router = makeRouter(loop); - nodedb = std::make_shared( - nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); }); + nodedb = makeNodeDB(); if (!router->Configure(config, opts.isSNode, nodedb)) throw std::runtime_error("Failed to configure router"); } + std::shared_ptr + Context::makeNodeDB() + { + return std::make_shared( + nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); }); + } + std::shared_ptr Context::makeRouter(const EventLoop_ptr& loop) { diff --git a/llarp/endpoint_base.hpp b/llarp/endpoint_base.hpp index b16581637..c20ca6cea 100644 --- a/llarp/endpoint_base.hpp +++ b/llarp/endpoint_base.hpp @@ -27,7 +27,29 @@ namespace llarp using AddressVariant_t = std::variant; - virtual std::string + struct SendStat + { + /// how many routing messages we sent to them + uint64_t messagesSend; + /// how many routing messages we got from them + uint64_t messagesRecv; + /// how many convos have we had to this guy total? + size_t numTotalConvos; + /// current estimated rtt + Duration_t estimatedRTT; + /// last time point when we sent a message to them + Duration_t lastSendAt; + /// last time point when we got a message from them + Duration_t lastRecvAt; + }; + + /// get statistics about how much traffic we sent and recv'd via remote endpoints we are talking + /// to + virtual std::unordered_map + GetStatistics() const = 0; + + /// get our local address + virtual AddressVariant_t LocalAddress() const = 0; virtual quic::TunnelManager* diff --git a/llarp/handlers/exit.cpp b/llarp/handlers/exit.cpp index d7e291100..bdbf7ab90 100644 --- a/llarp/handlers/exit.cpp +++ b/llarp/handlers/exit.cpp @@ -557,11 +557,16 @@ namespace llarp return found; } - std::string + EndpointBase::AddressVariant_t ExitEndpoint::LocalAddress() const { - const RouterID r{m_Router->pubkey()}; - return r.ToString(); + return RouterID{m_Router->pubkey()}; + } + + std::unordered_map + ExitEndpoint::GetStatistics() const + { + return {}; } bool diff --git a/llarp/handlers/exit.hpp b/llarp/handlers/exit.hpp index 59fe60cdf..9bdcdf29b 100644 --- a/llarp/handlers/exit.hpp +++ b/llarp/handlers/exit.hpp @@ -109,9 +109,12 @@ namespace llarp bool QueueOutboundTraffic(net::IPPacket pkt); - std::string + AddressVariant_t LocalAddress() const override; + std::unordered_map + GetStatistics() const override; + /// sets up networking and starts traffic bool Start(); diff --git a/llarp/handlers/null.hpp b/llarp/handlers/null.hpp index 4f52ab1cc..853332fbb 100644 --- a/llarp/handlers/null.hpp +++ b/llarp/handlers/null.hpp @@ -29,7 +29,6 @@ namespace llarp LogTrace("Inbound ", t, " packet (", buf.sz, "B) on convo ", tag); if (t == service::ProtocolType::Control) { - MarkConvoTagActive(tag); return true; } if (t != service::ProtocolType::QUIC) @@ -46,7 +45,6 @@ namespace llarp LogWarn("invalid incoming quic packet, dropping"); return false; } - MarkConvoTagActive(tag); quic->receive_packet(tag, buf); return true; } diff --git a/llarp/handlers/tun.cpp b/llarp/handlers/tun.cpp index 9bac4b927..1eea6df0e 100644 --- a/llarp/handlers/tun.cpp +++ b/llarp/handlers/tun.cpp @@ -999,7 +999,6 @@ namespace llarp return false; } LogInfo("tag active T=", tag); - MarkConvoTagActive(tag); quic->receive_packet(tag, buf); return true; } diff --git a/llarp/lokinet_shared.cpp b/llarp/lokinet_shared.cpp index b8753ccb3..ab6edc029 100644 --- a/llarp/lokinet_shared.cpp +++ b/llarp/lokinet_shared.cpp @@ -2,23 +2,40 @@ #include "lokinet.h" #include "llarp.hpp" -#include "config/config.hpp" +#include +#include #include #include #include +#include #include +namespace +{ + struct Context : public llarp::Context + { + using llarp::Context::Context; + + std::shared_ptr + makeNodeDB() override + { + return std::make_shared(); + } + }; +} // namespace + struct lokinet_context { std::mutex m_access; std::shared_ptr impl; + std::shared_ptr config; std::unique_ptr runner; - lokinet_context() : impl{std::make_shared()} + lokinet_context() : impl{std::make_shared()}, config{llarp::Config::EmbeddedConfig()} {} ~lokinet_context() @@ -56,6 +73,7 @@ namespace void stream_error(lokinet_stream_result* result, int err) { + std::memset(result, 0, sizeof(lokinet_stream_result)); result->error = err; } @@ -126,6 +144,24 @@ extern "C" return strdup(addrStr.c_str()); } + int + lokinet_add_bootstrap_rc(const char* data, size_t datalen, struct lokinet_context* ctx) + { + llarp_buffer_t buf{data, datalen}; + llarp::RouterContact rc{}; + if (ctx == nullptr) + return -3; + auto lock = ctx->acquire(); + // add a temp cryptography implementation here so rc.Verify works + llarp::CryptoManager instance{new llarp::sodium::CryptoLibSodium{}}; + if (not rc.BDecode(&buf)) + return -1; + if (not rc.Verify(llarp::time_now_ms())) + return -2; + ctx->config->bootstrap.routers.insert(std::move(rc)); + return 0; + } + struct lokinet_context* lokinet_context_new() { @@ -139,19 +175,22 @@ extern "C" delete ctx; } - void + int lokinet_context_start(struct lokinet_context* ctx) { if (not ctx) - return; + return -1; auto lock = ctx->acquire(); ctx->runner = std::make_unique([ctx]() { llarp::util::SetThreadName("llarp-mainloop"); - ctx->impl->Configure(llarp::Config::EmbeddedConfig()); + ctx->impl->Configure(ctx->config); const llarp::RuntimeOptions opts{}; try { ctx->impl->Setup(opts); +#ifdef SIG_PIPE + signal(SIG_PIPE, SIGIGN); +#endif ctx->impl->Run(opts); } catch (std::exception& ex) @@ -163,9 +202,10 @@ extern "C" while (not ctx->impl->IsUp()) { if (ctx->impl->IsStopping()) - return; + return -1; std::this_thread::sleep_for(5ms); } + return 0; } void diff --git a/llarp/nodedb.cpp b/llarp/nodedb.cpp index 397018fb3..671ad2fc3 100644 --- a/llarp/nodedb.cpp +++ b/llarp/nodedb.cpp @@ -63,10 +63,15 @@ namespace llarp { EnsureSkiplist(m_Root); } + NodeDB::NodeDB() : m_Root{}, disk{[](auto) {}}, m_NextFlushAt{0s} + {} void NodeDB::Tick(llarp_time_t now) { + if (m_NextFlushAt == 0s) + return; + if (now > m_NextFlushAt) { m_NextFlushAt += FlushInterval; @@ -102,6 +107,9 @@ namespace llarp void NodeDB::LoadFromDisk() { + if (m_Root.empty()) + return; + for (const char& ch : skiplist_subdirs) { if (!ch) @@ -125,6 +133,9 @@ namespace llarp void NodeDB::SaveToDisk() const { + if (m_Root.empty()) + return; + for (const auto& item : m_Entries) { item.second.rc.Write(GetPathForPubkey(item.first)); @@ -209,6 +220,8 @@ namespace llarp void NodeDB::AsyncRemoveManyFromDisk(std::unordered_set remove) const { + if (m_Root.empty()) + return; // build file list std::set files; for (auto id : remove) diff --git a/llarp/nodedb.hpp b/llarp/nodedb.hpp index 63cabd8f8..52330bc9a 100644 --- a/llarp/nodedb.hpp +++ b/llarp/nodedb.hpp @@ -50,6 +50,9 @@ namespace llarp public: explicit NodeDB(fs::path rootdir, std::function)> diskCaller); + /// in memory nodedb + NodeDB(); + /// load all entries from disk syncrhonously void LoadFromDisk(); diff --git a/llarp/quic/connection.cpp b/llarp/quic/connection.cpp index dfa262ec3..fddf8b56d 100644 --- a/llarp/quic/connection.cpp +++ b/llarp/quic/connection.cpp @@ -857,7 +857,8 @@ namespace llarp::quic if (!was_closing && stream.close_callback) { LogDebug("Invoke stream close callback"); - stream.close_callback(stream, app_code == 0 ? std::nullopt : std::optional{app_code}); + stream.close_callback( + stream, app_code == 0 ? std::nullopt : std::optional{app_code}); } LogDebug("Erasing stream ", id, " from ", (void*)it->second.get()); @@ -866,10 +867,9 @@ namespace llarp::quic if (!ngtcp2_conn_is_local_stream(*this, id.id)) ngtcp2_conn_extend_max_streams_bidi(*this, 1); - io_ready(); // Probably superfluous but sometimes we might need to send a FIN or something. + io_ready(); // Probably superfluous but sometimes we might need to send a FIN or something. } - int Connection::stream_ack(StreamID id, size_t size) { diff --git a/llarp/quic/tunnel.cpp b/llarp/quic/tunnel.cpp index 8ce6595ed..e150b318e 100644 --- a/llarp/quic/tunnel.cpp +++ b/llarp/quic/tunnel.cpp @@ -131,7 +131,7 @@ namespace llarp::quic stream->data(nullptr); tcp.data(nullptr); } - tcp.closeReset(); + // tcp.closeReset(); }); tcp.on(on_outgoing_data); stream.data_callback = on_incoming_data; diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index dfaf0dec9..3f5844a69 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -497,11 +497,11 @@ namespace llarp std::vector configRouters = conf.connect.routers; configRouters.insert( - configRouters.end(), conf.bootstrap.routers.begin(), conf.bootstrap.routers.end()); + configRouters.end(), conf.bootstrap.files.begin(), conf.bootstrap.files.end()); // if our conf had no bootstrap files specified, try the default location of // /bootstrap.signed. If this isn't present, leave a useful error message - if (configRouters.empty()) + if (configRouters.empty() and conf.bootstrap.routers.empty()) { // TODO: use constant fs::path defaultBootstrapFile = conf.router.m_dataDir / "bootstrap.signed"; @@ -549,6 +549,11 @@ namespace llarp } } + for (const auto& rc : conf.bootstrap.routers) + { + b_list.emplace(rc); + } + for (auto& rc : b_list) { if (not rc.Verify(Now())) @@ -559,7 +564,17 @@ namespace llarp bootstrapRCList.emplace(std::move(rc)); } - LogInfo("Loaded ", bootstrapRCList.size(), " bootstrap routers"); + if (bootstrapRCList.empty() and not conf.bootstrap.seednode) + { + throw std::runtime_error{"we have no bootstrap nodes"}; + } + + if (conf.bootstrap.seednode) + { + LogInfo("we are a seed node"); + } + else + LogInfo("Loaded ", bootstrapRCList.size(), " bootstrap routers"); // Init components after relevant config settings loaded _outboundMessageHandler.Init(&_linkManager, &_rcLookupHandler, _loop); diff --git a/llarp/rpc/rpc_server.cpp b/llarp/rpc/rpc_server.cpp index d00baebde..5a5ecf11b 100644 --- a/llarp/rpc/rpc_server.cpp +++ b/llarp/rpc/rpc_server.cpp @@ -259,7 +259,10 @@ namespace llarp::rpc } util::StatusObject result; result["id"] = id; - result["addr"] = ep->LocalAddress() + ":" + std::to_string(port); + std::string localAddress; + var::visit( + [&](auto&& addr) { localAddress = addr.ToString(); }, ep->LocalAddress()); + result["addr"] = localAddress + ":" + std::to_string(port); reply(CreateJSONResponse(result)); } else if (closeID) diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 5eb5ea3b2..89d5578b0 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -346,7 +346,6 @@ namespace llarp itr->second.inbound = inbound; itr->second.remote = info; } - itr->second.lastUsed = Now(); } size_t @@ -383,7 +382,6 @@ namespace llarp { auto& s = Sessions()[tag]; s.intro = intro; - s.lastUsed = Now(); } bool @@ -405,7 +403,6 @@ namespace llarp return; } itr->second.replyIntro = intro; - itr->second.lastUsed = Now(); } bool @@ -443,17 +440,18 @@ namespace llarp itr = Sessions().emplace(tag, Session{}).first; } itr->second.sharedKey = k; - itr->second.lastUsed = Now(); } void - Endpoint::MarkConvoTagActive(const ConvoTag& tag) + Endpoint::ConvoTagTX(const ConvoTag& tag) { - auto itr = Sessions().find(tag); - if (itr != Sessions().end()) - { - itr->second.lastUsed = Now(); - } + Sessions()[tag].TX(); + } + + void + Endpoint::ConvoTagRX(const ConvoTag& tag) + { + Sessions()[tag].RX(); } bool @@ -1012,10 +1010,35 @@ namespace llarp return false; } - std::string + EndpointBase::AddressVariant_t Endpoint::LocalAddress() const { - return m_Identity.pub.Addr().ToString(); + return m_Identity.pub.Addr(); + } + + inline void + AccumulateStats(const Session& session, EndpointBase::SendStat& stats) + {} + + inline void + AccumulateStats( + const std::shared_ptr& session, EndpointBase::SendStat& stats) + {} + + std::unordered_map + Endpoint::GetStatistics() const + { + std::unordered_map stats; + for (const auto& item : Sessions()) + { + Address addr = item.second.remote.Addr(); + AccumulateStats(item.second, stats[addr]); + } + for (const auto& item : m_state->m_SNodeSessions) + { + AccumulateStats(item.second.first, stats[item.first]); + } + return stats; } bool @@ -1372,13 +1395,16 @@ namespace llarp { if (*ptr == m_Identity.pub.Addr()) { + ConvoTagTX(tag); Loop()->wakeup(); - return HandleInboundPacket(tag, pkt, t, 0); + if (not HandleInboundPacket(tag, pkt, t, 0)) + return false; + ConvoTagRX(tag); + return true; } } if (not SendToOrQueue(*maybe, pkt, t)) return false; - MarkConvoTagActive(tag); Loop()->wakeup(); return true; } @@ -1393,11 +1419,13 @@ namespace llarp auto pkt = std::make_shared(); if (!pkt->Load(buf)) return false; - EnsurePathToSNode( - addr, [pkt, t](RouterID, exit::BaseSession_ptr s, [[maybe_unused]] ConvoTag tag) { - if (s) - s->SendPacketToRemote(pkt->ConstBuffer(), t); - }); + EnsurePathToSNode(addr, [=](RouterID, exit::BaseSession_ptr s, ConvoTag tag) { + if (s) + { + ConvoTagTX(tag); + s->SendPacketToRemote(pkt->ConstBuffer(), t); + } + }); return true; } @@ -1428,7 +1456,7 @@ namespace llarp msg.seqno); if (HandleInboundPacket(msg.tag, msg.payload, msg.proto, msg.seqno)) { - MarkConvoTagActive(msg.tag); + ConvoTagRX(msg.tag); } else { @@ -1451,7 +1479,7 @@ namespace llarp SendEvent_t item = m_SendQueue.popFront(); item.first->S = item.second->NextSeqNo(); if (item.second->SendRoutingMessage(*item.first, router)) - MarkConvoTagActive(item.first->T.T); + ConvoTagTX(item.first->T.T); } UpstreamFlush(router); @@ -1542,7 +1570,7 @@ namespace llarp else tag.Randomize(); PutSenderFor(tag, m_Identity.pub, true); - MarkConvoTagActive(tag); + ConvoTagTX(tag); Sessions()[tag].forever = true; Loop()->call_soon([tag, hook]() { hook(tag); }); return true; @@ -1648,10 +1676,14 @@ namespace llarp m->introReply = p->intro; PutReplyIntroFor(f.T, m->introReply); m->sender = m_Identity.pub; - m->seqno = GetSeqNoForConvo(f.T); - if (m->seqno == 0) + if (auto maybe = GetSeqNoForConvo(f.T)) + { + m->seqno = *maybe; + } + else { LogWarn(Name(), " no session T=", f.T); + return false; } f.S = m->seqno; f.F = m->introReply.pathID; @@ -1680,45 +1712,45 @@ namespace llarp } // Failed to find a suitable inbound convo, look for outbound - LogTrace("Not an inbound convo"); - auto& sessions = m_state->m_RemoteSessions; - auto range = sessions.equal_range(remote); - for (auto itr = range.first; itr != range.second; ++itr) + LogTrace("Not an inbound convo"); + auto& sessions = m_state->m_RemoteSessions; + auto range = sessions.equal_range(remote); + for (auto itr = range.first; itr != range.second; ++itr) + { + if (itr->second->ReadyToSend()) { - if (itr->second->ReadyToSend()) - { - LogTrace("Found an outbound session to use to reach ", remote); - itr->second->AsyncEncryptAndSendTo(data, t); - return true; - } + LogTrace("Found an outbound session to use to reach ", remote); + itr->second->AsyncEncryptAndSendTo(data, t); + return true; } - // if we want to make an outbound session - if (WantsOutboundSession(remote)) - { - LogTrace("Making an outbound session and queuing the data"); - // add pending traffic - auto& traffic = m_state->m_PendingTraffic; - traffic[remote].emplace_back(data, t); - EnsurePathToService( - remote, - [self = this](Address addr, OutboundContext* ctx) { - if (ctx) - { - for (auto& pending : self->m_state->m_PendingTraffic[addr]) - { - ctx->AsyncEncryptAndSendTo(pending.Buffer(), pending.protocol); - } - } - else + } + // if we want to make an outbound session + if (WantsOutboundSession(remote)) + { + LogTrace("Making an outbound session and queuing the data"); + // add pending traffic + auto& traffic = m_state->m_PendingTraffic; + traffic[remote].emplace_back(data, t); + EnsurePathToService( + remote, + [self = this](Address addr, OutboundContext* ctx) { + if (ctx) + { + for (auto& pending : self->m_state->m_PendingTraffic[addr]) { - LogWarn("no path made to ", addr); + ctx->AsyncEncryptAndSendTo(pending.Buffer(), pending.protocol); } - self->m_state->m_PendingTraffic.erase(addr); - }, - 1500ms); - return true; - } - LogDebug("SendOrQueue failed: no inbound/outbound sessions"); + } + else + { + LogWarn("no path made to ", addr); + } + self->m_state->m_PendingTraffic.erase(addr); + }, + 1500ms); + return true; + } + LogDebug("SendOrQueue failed: no inbound/outbound sessions"); return false; } @@ -1735,14 +1767,13 @@ namespace llarp return Sessions().find(t) != Sessions().end(); } - uint64_t + std::optional Endpoint::GetSeqNoForConvo(const ConvoTag& tag) { auto itr = Sessions().find(tag); if (itr == Sessions().end()) - return 0; - itr->second.seqno += 1; - return itr->second.seqno; + return std::nullopt; + return itr->second.seqno++; } bool diff --git a/llarp/service/endpoint.hpp b/llarp/service/endpoint.hpp index e2a174238..eee335ee7 100644 --- a/llarp/service/endpoint.hpp +++ b/llarp/service/endpoint.hpp @@ -136,9 +136,12 @@ namespace llarp std::string Name() const override; - std::string + AddressVariant_t LocalAddress() const override; + std::unordered_map + GetStatistics() const override; + bool ShouldPublishDescriptors(llarp_time_t now) const override; @@ -332,7 +335,10 @@ namespace llarp RemoveConvoTag(const ConvoTag& remote) override; void - MarkConvoTagActive(const ConvoTag& remote) override; + ConvoTagTX(const ConvoTag& remote) override; + + void + ConvoTagRX(const ConvoTag& remote) override; void PutReplyIntroFor(const ConvoTag& remote, const Introduction& intro) override; @@ -346,7 +352,7 @@ namespace llarp void PutNewOutboundContext(const IntroSet& introset, llarp_time_t timeLeftToAlign); - uint64_t + std::optional GetSeqNoForConvo(const ConvoTag& tag); bool diff --git a/llarp/service/handler.hpp b/llarp/service/handler.hpp index 0bea0d039..4ae29c8f3 100644 --- a/llarp/service/handler.hpp +++ b/llarp/service/handler.hpp @@ -33,8 +33,13 @@ namespace llarp virtual void PutCachedSessionKeyFor(const ConvoTag& remote, const SharedSecret& secret) = 0; + /// called when we send data to remote on a convotag virtual void - MarkConvoTagActive(const ConvoTag& tag) = 0; + ConvoTagTX(const ConvoTag& tag) = 0; + + /// called when we got data from remote on a convotag + virtual void + ConvoTagRX(const ConvoTag& tag) = 0; virtual void RemoveConvoTag(const ConvoTag& remote) = 0; diff --git a/llarp/service/protocol.cpp b/llarp/service/protocol.cpp index b815d7e2e..6f0275ae9 100644 --- a/llarp/service/protocol.cpp +++ b/llarp/service/protocol.cpp @@ -12,15 +12,17 @@ namespace llarp { namespace service { - std::ostream& operator<<(std::ostream& o, ProtocolType t) { - return o << - (t == ProtocolType::Control ? "Control" : - t == ProtocolType::TrafficV4 ? "TrafficV4" : - t == ProtocolType::TrafficV6 ? "TrafficV6" : - t == ProtocolType::Exit ? "Exit" : - t == ProtocolType::Auth ? "Auth" : - t == ProtocolType::QUIC ? "QUIC" : - "(unknown-protocol-type)"); + std::ostream& + operator<<(std::ostream& o, ProtocolType t) + { + return o + << (t == ProtocolType::Control ? "Control" + : t == ProtocolType::TrafficV4 ? "TrafficV4" + : t == ProtocolType::TrafficV6 ? "TrafficV6" + : t == ProtocolType::Exit ? "Exit" + : t == ProtocolType::Auth ? "Auth" + : t == ProtocolType::QUIC ? "QUIC" + : "(unknown-protocol-type)"); } ProtocolMessage::ProtocolMessage() diff --git a/llarp/service/protocol.hpp b/llarp/service/protocol.hpp index ab4dfd5f6..4093555e8 100644 --- a/llarp/service/protocol.hpp +++ b/llarp/service/protocol.hpp @@ -31,7 +31,8 @@ namespace llarp constexpr std::size_t MAX_PROTOCOL_MESSAGE_SIZE = 2048 * 2; - std::ostream& operator<<(std::ostream& o, ProtocolType t); + std::ostream& + operator<<(std::ostream& o, ProtocolType t); /// inner message struct ProtocolMessage diff --git a/llarp/service/sendcontext.cpp b/llarp/service/sendcontext.cpp index 5ad25ce3c..5425839e2 100644 --- a/llarp/service/sendcontext.cpp +++ b/llarp/service/sendcontext.cpp @@ -53,7 +53,7 @@ namespace llarp { lastGoodSend = r->Now(); flushpaths.emplace(item.second); - m_Endpoint->MarkConvoTagActive(item.first->T.T); + m_Endpoint->ConvoTagTX(item.first->T.T); const auto rtt = (item.second->intro.latency + remoteIntro.latency) * 2; rttRMS += rtt * rtt.count(); } @@ -98,7 +98,15 @@ namespace llarp m_DataHandler->PutIntroFor(f->T, remoteIntro); m_DataHandler->PutReplyIntroFor(f->T, path->intro); m->proto = t; - m->seqno = m_Endpoint->GetSeqNoForConvo(f->T); + if (auto maybe = m_Endpoint->GetSeqNoForConvo(f->T)) + { + m->seqno = *maybe; + } + else + { + LogWarn(m_Endpoint->Name(), " no session T=", f->T); + return; + } m->introReply = path->intro; f->F = m->introReply.pathID; m->sender = m_Endpoint->GetIdentity().pub; diff --git a/llarp/service/session.cpp b/llarp/service/session.cpp index 2fd3dbfdf..966b0a65b 100644 --- a/llarp/service/session.cpp +++ b/llarp/service/session.cpp @@ -8,10 +8,13 @@ namespace llarp Session::ExtractStatus() const { util::StatusObject obj{ - {"lastUsed", to_json(lastUsed)}, + {"lastSend", to_json(lastSend)}, + {"lastRecv", to_json(lastRecv)}, {"replyIntro", replyIntro.ExtractStatus()}, {"remote", remote.Addr().ToString()}, {"seqno", seqno}, + {"tx", messagesSend}, + {"rx", messagesRecv}, {"intro", intro.ExtractStatus()}}; return obj; } @@ -21,8 +24,23 @@ namespace llarp { if (forever) return false; + const auto lastUsed = std::max(lastSend, lastRecv); return now > lastUsed && (now - lastUsed > lifetime || intro.IsExpired(now)); } + void + Session::TX() + { + messagesSend++; + lastSend = time_now_ms(); + } + + void + Session::RX() + { + messagesRecv++; + lastRecv = time_now_ms(); + } + } // namespace service } // namespace llarp diff --git a/llarp/service/session.hpp b/llarp/service/session.hpp index 035d922f4..414cbea91 100644 --- a/llarp/service/session.hpp +++ b/llarp/service/session.hpp @@ -21,15 +21,32 @@ namespace llarp ServiceInfo remote; /// the intro they have Introduction intro; - /// the intro remoet last sent on - Introduction lastInboundIntro; - llarp_time_t lastUsed = 0s; + + /// the sequence number we are to use for the next message uint64_t seqno = 0; + + /// number of remote messages we sent to them + uint64_t messagesSend = 0; + /// number of remote messages we got from them + uint64_t messagesRecv = 0; + bool inbound = false; bool forever = false; + Duration_t lastSend{}; + Duration_t lastRecv{}; + util::StatusObject ExtractStatus() const; + + /// called to indicate we recieved on this session + void + RX(); + + /// called to indicate we transimitted on this session + void + TX(); + bool IsExpired(llarp_time_t now, llarp_time_t lifetime = SessionLifetime) const; }; diff --git a/llarp/util/logging/logger.cpp b/llarp/util/logging/logger.cpp index 908ffb8e0..4730e32b3 100644 --- a/llarp/util/logging/logger.cpp +++ b/llarp/util/logging/logger.cpp @@ -44,8 +44,7 @@ namespace llarp return LogType::Unknown; } - LogContext::LogContext() - : logStream(std::make_unique(_LOGSTREAM_INIT)), started(llarp::time_now_ms()) + LogContext::LogContext() : logStream{std::make_unique(_LOGSTREAM_INIT)} {} LogContext& @@ -71,9 +70,7 @@ namespace llarp {} log_timestamp::log_timestamp(const char* fmt) - : format(fmt) - , now(llarp::time_now_ms()) - , delta(llarp::time_now_ms() - LogContext::Instance().started) + : format{fmt}, now{llarp::time_now_ms()}, delta{llarp::uptime()} {} void diff --git a/llarp/util/logging/logger.hpp b/llarp/util/logging/logger.hpp index f6446625f..99c18b424 100644 --- a/llarp/util/logging/logger.hpp +++ b/llarp/util/logging/logger.hpp @@ -28,8 +28,6 @@ namespace llarp ILogStream_ptr logStream; std::string nodeName = "lokinet"; - const llarp_time_t started; - static LogContext& Instance(); diff --git a/llarp/util/logging/logger_internal.hpp b/llarp/util/logging/logger_internal.hpp index 87f156cd3..06f0ac9b9 100644 --- a/llarp/util/logging/logger_internal.hpp +++ b/llarp/util/logging/logger_internal.hpp @@ -50,8 +50,8 @@ namespace llarp struct log_timestamp { const char* format; - const llarp_time_t now; - const llarp_time_t delta; + const Duration_t now; + const Duration_t delta; log_timestamp(); diff --git a/llarp/util/time.cpp b/llarp/util/time.cpp index dbdc0858e..4c9cde1f8 100644 --- a/llarp/util/time.cpp +++ b/llarp/util/time.cpp @@ -1,73 +1,55 @@ #include "time.hpp" #include -#include namespace llarp { using Clock_t = std::chrono::system_clock; template - static llarp_time_t - time_since_epoch() + static Duration_t + time_since_epoch(std::chrono::time_point point) { - return std::chrono::duration_cast(Clock::now().time_since_epoch()); + return std::chrono::duration_cast(point.time_since_epoch()); } - const static llarp_time_t started_at_system = - time_since_epoch(); + const static auto started_at_system = Clock_t::now(); + + const static auto started_at_steady = std::chrono::steady_clock::now(); + + uint64_t + ToMS(Duration_t ms) + { + return ms.count(); + } - const static llarp_time_t started_at_steady = - time_since_epoch(); /// get our uptime in ms - static llarp_time_t - time_since_started() + Duration_t + uptime() { - return time_since_epoch() - - started_at_steady; + return std::chrono::duration_cast( + std::chrono::steady_clock::now() - started_at_steady); } - llarp_time_t + Duration_t time_now_ms() { - static llarp_time_t lastTime = 0s; - auto t = time_since_started(); + auto t = uptime(); #ifdef TESTNET_SPEED - t /= uint64_t(TESTNET_SPEED); + t /= uint64_t{TESTNET_SPEED}; #endif - t += started_at_system; - - if (t <= lastTime) - { - return lastTime; - } - if (lastTime == 0s) - { - lastTime = t; - } - const auto dlt = t - lastTime; - if (dlt > 5s) - { - // big timeskip - t = lastTime; - lastTime = 0s; - } - else - { - lastTime = t; - } - return t; + return t + time_since_epoch(started_at_system); } nlohmann::json - to_json(const llarp_time_t& t) + to_json(const Duration_t& t) { - return t.count(); + return ToMS(t); } std::ostream& - operator<<(std::ostream& out, const llarp_time_t& t) + operator<<(std::ostream& out, const Duration_t& t) { - std::chrono::milliseconds amount = t; + std::chrono::milliseconds amount{ToMS(t)}; auto h = std::chrono::duration_cast(amount); amount -= h; auto m = std::chrono::duration_cast(amount); diff --git a/llarp/util/time.hpp b/llarp/util/time.hpp index f248af948..d5f558f8c 100644 --- a/llarp/util/time.hpp +++ b/llarp/util/time.hpp @@ -2,19 +2,28 @@ #include "types.hpp" #include +#include using namespace std::chrono_literals; namespace llarp { /// get time right now as milliseconds, this is monotonic - llarp_time_t + Duration_t time_now_ms(); + /// get the uptime of the process + Duration_t + uptime(); + + /// convert to milliseconds + uint64_t + ToMS(Duration_t duration); + std::ostream& - operator<<(std::ostream& out, const llarp_time_t& t); + operator<<(std::ostream& out, const Duration_t& t); nlohmann::json - to_json(const llarp_time_t& t); + to_json(const Duration_t& t); } // namespace llarp diff --git a/llarp/util/types.hpp b/llarp/util/types.hpp index 6538ad048..cd86b4d04 100644 --- a/llarp/util/types.hpp +++ b/llarp/util/types.hpp @@ -6,9 +6,14 @@ using byte_t = uint8_t; using llarp_proto_version_t = std::uint8_t; -using llarp_time_t = std::chrono::milliseconds; - namespace llarp { + using Duration_t = std::chrono::milliseconds; using namespace std::literals; -} + + /// convert to milliseconds + uint64_t + ToMS(Duration_t duration); +} // namespace llarp + +using llarp_time_t = llarp::Duration_t;