liblokinet additions:

* add lokinet_add_bootstrap_rc function for adding an rc from memory
* prevent stack overflow on error closing connection in quic
* add in memory nodedb
* refactor how convotags are set as active
* add initial stubs for endpoint statistics
* refactor time stuff to be a bit cleaner
* update lnproxy script with more arguments
pull/1576/head
Jeff Becker 3 years ago
parent 6306876904
commit 95cd275cdd
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -3,6 +3,13 @@
from http.server import ThreadingHTTPServer as Server from http.server import ThreadingHTTPServer as Server
from http.server import BaseHTTPRequestHandler as BaseHandler from http.server import BaseHTTPRequestHandler as BaseHandler
bootstrapFromURL = True
try:
import requests
except ImportError:
bootstrapFromURL = False
import ctypes import ctypes
from ctypes.util import find_library from ctypes.util import find_library
@ -26,7 +33,7 @@ class ResultStruct(ctypes.Structure):
class LNContext(ctypes.Structure): class LNContext(ctypes.Structure):
pass pass
class Context: class Context:
""" """
wrapper around liblokinet wrapper around liblokinet
@ -39,44 +46,59 @@ class Context:
self._ln.lokinet_address.restype = ctypes.c_char_p self._ln.lokinet_address.restype = ctypes.c_char_p
self._ln.lokinet_address.argtypes = (ctypes.POINTER(LNContext), ) self._ln.lokinet_address.argtypes = (ctypes.POINTER(LNContext), )
self._ln.lokinet_outbound_stream.restype = ctypes.POINTER(ResultStruct) self._ln.lokinet_outbound_stream.restype = ctypes.POINTER(ResultStruct)
self._ln.lokinet_outbound_stream.argtypes = (ctypes.POINTER(ResultStruct), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(LNContext)) self._ln.lokinet_outbound_stream.argtypes = (ctypes.POINTER(ResultStruct), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(LNContext))
self._ctx = self._ln.lokinet_context_new() self._ctx = self._ln.lokinet_context_new()
self._addrmap = dict()
def free(self, ptr): def free(self, ptr):
self._c.free(ptr) self._c.free(ptr)
def add_bootstrap(self, data):
ptr = ctypes.create_string_buffer(data)
ptrlen = ctypes.c_size_t(len(data))
return self.ln_call("lokinet_add_bootstrap_rc", ptr, ptrlen)
def addr(self): def addr(self):
return self._ln.lokinet_address(self._ctx).decode('ascii') return self._ln.lokinet_address(self._ctx).decode('ascii')
def expose(self, port): def expose(self, port):
return self.ln_call('lokinet_inbound_stream', port) return self.ln_call('lokinet_inbound_stream', port)
def ln_call(self, funcname, *args): def ln_call(self, funcname, *args):
args += (self._ctx,) args += (self._ctx,)
print("call {}{}".format(funcname, args)) print("call {}{}".format(funcname, args))
return self._ln[funcname](*args) return self._ln[funcname](*args)
def start(self): def start(self):
self.ln_call("lokinet_context_start") return self.ln_call("lokinet_context_start")
def stop(self): def stop(self):
self.ln_call("lokinet_context_stop") self.ln_call("lokinet_context_stop")
def hasAddr(self, addr):
return addr in self._addrmap
def putAddr(self, addr, val):
self._addrmap[addr] = val
def getAddr(self, addr):
if addr in self._addrmap:
return self._addrmap[addr]
def __del__(self): def __del__(self):
self.stop() self.stop()
self._ln_call("lokinet_context_free") self._ln_call("lokinet_context_free")
class Stream: class Stream:
def __init__(self, ctx): def __init__(self, ctx):
self._ctx = ctx self._ctx = ctx
self._id = None self._id = None
def connect(self, remote): def connect(self, remote):
result = ResultStruct() result = ResultStruct()
self._ctx.ln_call("lokinet_outbound_stream", ctypes.cast(ctypes.addressof(result), ctypes.POINTER(ResultStruct)), ctypes.create_string_buffer(remote.encode()), ctypes.c_char_p(0)) self._ctx.ln_call("lokinet_outbound_stream", ctypes.cast(ctypes.addressof(result), ctypes.POINTER(ResultStruct)), ctypes.create_string_buffer(remote.encode()), ctypes.c_char_p(0))
if result.err: if result.err:
print(result.err) print(result.err)
return return
@ -94,36 +116,40 @@ class Stream:
def read_and_forward_or_close(readfd, writefd): def read_and_forward_or_close(readfd, writefd):
read = 0 read = 0
while True: while True:
data = os.read(readfd, 128) data = os.read(readfd, 1024)
read += len(data) read += len(data)
if data and len(data) > 0: if data and len(data) > 0:
writefd.write(data) writefd.write(data)
writefd.flush() writefd.flush()
return True
else: else:
return read > 0 return read > 0
ctx = Context() ctx = Context()
class Handler(BaseHandler): class Handler(BaseHandler):
def do_CONNECT(self): def do_CONNECT(self):
self.connect(self.path) self.connect(self.path)
def connect(self, host): def connect(self, host):
global ctx global ctx
stream = Stream(ctx) if not ctx.hasAddr(host):
stream = Stream(ctx)
result = stream.connect(host)
if not result:
self.send_error(503)
return
ctx.putAddr(host, result)
result = stream.connect(host)
if not result:
self.send_error(503)
return
sock = socket.socket() sock = socket.socket()
sock.connect(result) sock.connect(ctx.getAddr(host))
if not sock: if not sock:
self.send_error(504) self.send_error(504)
return return
self.send_response_only(200) self.send_response_only(200)
self.end_headers() self.end_headers()
sel = selectors.DefaultSelector() sel = selectors.DefaultSelector()
@ -142,16 +168,46 @@ class Handler(BaseHandler):
sel.unregister(self.rfile) sel.unregister(self.rfile)
sel.unregister(sock) sel.unregister(sock)
sel.close() sel.close()
stream.close()
return return
import os
import sys
from argparse import ArgumentParser as AP
ap = AP()
ap.add_argument("--ip", type=str, help="ip to bind to", default="127.0.0.1")
ap.add_argument("--port", type=int, help="port to bind to", default=3000)
ap.add_argument("--bootstrap", type=str, help="bootstrap file", default="bootstrap.signed")
if bootstrapFromURL:
ap.add_argument("--bootstrap-url", type=str, help="bootstrap from remote url", default="https://seed.lokinet.org/lokinet.signed")
args = ap.parse_args()
addr = (args.ip, args.port)
server = Server(addr, Handler)
if os.path.exists(args.bootstrap):
with open(args.bootstrap, 'rb') as f:
if ctx.add_bootstrap(f.read()) == 0:
print("loaded {}".format(args.bootstrap))
if args.bootstrap_url is not None:
print('getting bootstrap info from {}'.format(args.bootstrap_url))
resp = requests.get(args.bootstrap_url)
if resp.status_code == 200 and ctx.add_bootstrap(resp.content) == 0:
pass
else:
print("failed")
if ctx.start() != 0:
print("failed to start")
ctx.stop()
sys.exit(-1)
server = Server(('127.0.0.1', 3000), Handler)
ctx.start()
id = ctx.expose(80) id = ctx.expose(80)
print("we are {}".format(ctx.addr())) print("we are {}".format(ctx.addr()))
try: try:
print("serving on {}:{}".format(*addr))
server.serve_forever() server.serve_forever()
except: finally:
ctx.ln_call("lokinet_close_stream", id) ctx.ln_call("lokinet_close_stream", id)
ctx.stop() ctx.stop()

@ -92,6 +92,10 @@ namespace llarp
virtual std::shared_ptr<AbstractRouter> virtual std::shared_ptr<AbstractRouter>
makeRouter(const std::shared_ptr<EventLoop>& loop); makeRouter(const std::shared_ptr<EventLoop>& loop);
/// create the nodedb given our current configs
virtual std::shared_ptr<NodeDB>
makeNodeDB();
/// create the vpn platform for use in creating network interfaces /// create the vpn platform for use in creating network interfaces
virtual std::shared_ptr<llarp::vpn::Platform> virtual std::shared_ptr<llarp::vpn::Platform>
makeVPNPlatform(); makeVPNPlatform();

@ -3,6 +3,7 @@
#include <stdbool.h> #include <stdbool.h>
#include <stdint.h> #include <stdint.h>
#include <unistd.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" extern "C"
@ -15,12 +16,20 @@ extern "C"
struct lokinet_context* struct lokinet_context*
lokinet_context_new(); lokinet_context_new();
/// load a bootstrap RC from memory
/// return 0 on success
/// return non zero on fail
int
lokinet_add_bootstrap_rc(const char*, size_t, struct lokinet_context*);
/// free a context allocated by lokinet_context_new /// free a context allocated by lokinet_context_new
void void
lokinet_context_free(struct lokinet_context*); lokinet_context_free(struct lokinet_context*);
/// spawn all the threads needed for operation and start running /// spawn all the threads needed for operation and start running
void /// return 0 on success
/// return non zero on fail
int
lokinet_context_start(struct lokinet_context*); lokinet_context_start(struct lokinet_context*);
/// stop all operations on this lokinet context /// stop all operations on this lokinet context

@ -937,8 +937,8 @@ namespace llarp
{ {
throw std::invalid_argument("cannot use empty filename as bootstrap"); throw std::invalid_argument("cannot use empty filename as bootstrap");
} }
routers.emplace_back(std::move(arg)); files.emplace_back(std::move(arg));
if (not fs::exists(routers.back())) if (not fs::exists(files.back()))
{ {
throw std::invalid_argument("file does not exist: " + arg); throw std::invalid_argument("file does not exist: " + arg);
} }
@ -1356,11 +1356,12 @@ namespace llarp
std::shared_ptr<Config> std::shared_ptr<Config>
Config::EmbeddedConfig() Config::EmbeddedConfig()
{ {
auto config = std::make_shared<Config>(fs::current_path()); auto config = std::make_shared<Config>(fs::path{});
config->Load(); config->Load();
config->logging.m_logLevel = eLogNone; config->logging.m_logLevel = eLogNone;
config->api.m_enableRPCServer = false; config->api.m_enableRPCServer = false;
config->network.m_endpointType = "null"; config->network.m_endpointType = "null";
config->bootstrap.files.clear();
return config; return config;
} }

@ -188,7 +188,8 @@ namespace llarp
struct BootstrapConfig struct BootstrapConfig
{ {
std::vector<fs::path> routers; std::vector<fs::path> files;
std::set<RouterContact> routers;
bool seednode; bool seednode;
void void
defineConfigOptions(ConfigDefinition& conf, const ConfigGenParameters& params); defineConfigOptions(ConfigDefinition& conf, const ConfigGenParameters& params);

@ -17,6 +17,14 @@ namespace llarp
if (m_initialized) if (m_initialized)
return false; return false;
if (not isSNode)
{
CryptoManager::instance()->identity_keygen(identityKey);
CryptoManager::instance()->encryption_keygen(encryptionKey);
CryptoManager::instance()->encryption_keygen(transportKey);
return true;
}
const fs::path root = config.router.m_dataDir; const fs::path root = config.router.m_dataDir;
// utility function to assign a path, using the specified config parameter if present and // utility function to assign a path, using the specified config parameter if present and

@ -74,13 +74,19 @@ namespace llarp
router = makeRouter(loop); router = makeRouter(loop);
nodedb = std::make_shared<NodeDB>( nodedb = makeNodeDB();
nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); });
if (!router->Configure(config, opts.isSNode, nodedb)) if (!router->Configure(config, opts.isSNode, nodedb))
throw std::runtime_error("Failed to configure router"); throw std::runtime_error("Failed to configure router");
} }
std::shared_ptr<NodeDB>
Context::makeNodeDB()
{
return std::make_shared<NodeDB>(
nodedb_dir, [r = router.get()](auto call) { r->QueueDiskIO(std::move(call)); });
}
std::shared_ptr<AbstractRouter> std::shared_ptr<AbstractRouter>
Context::makeRouter(const EventLoop_ptr& loop) Context::makeRouter(const EventLoop_ptr& loop)
{ {

@ -27,7 +27,29 @@ namespace llarp
using AddressVariant_t = std::variant<service::Address, RouterID>; using AddressVariant_t = std::variant<service::Address, RouterID>;
virtual std::string struct SendStat
{
/// how many routing messages we sent to them
uint64_t messagesSend;
/// how many routing messages we got from them
uint64_t messagesRecv;
/// how many convos have we had to this guy total?
size_t numTotalConvos;
/// current estimated rtt
Duration_t estimatedRTT;
/// last time point when we sent a message to them
Duration_t lastSendAt;
/// last time point when we got a message from them
Duration_t lastRecvAt;
};
/// get statistics about how much traffic we sent and recv'd via remote endpoints we are talking
/// to
virtual std::unordered_map<AddressVariant_t, SendStat>
GetStatistics() const = 0;
/// get our local address
virtual AddressVariant_t
LocalAddress() const = 0; LocalAddress() const = 0;
virtual quic::TunnelManager* virtual quic::TunnelManager*

@ -557,11 +557,16 @@ namespace llarp
return found; return found;
} }
std::string EndpointBase::AddressVariant_t
ExitEndpoint::LocalAddress() const ExitEndpoint::LocalAddress() const
{ {
const RouterID r{m_Router->pubkey()}; return RouterID{m_Router->pubkey()};
return r.ToString(); }
std::unordered_map<EndpointBase::AddressVariant_t, EndpointBase::SendStat>
ExitEndpoint::GetStatistics() const
{
return {};
} }
bool bool

@ -109,9 +109,12 @@ namespace llarp
bool bool
QueueOutboundTraffic(net::IPPacket pkt); QueueOutboundTraffic(net::IPPacket pkt);
std::string AddressVariant_t
LocalAddress() const override; LocalAddress() const override;
std::unordered_map<AddressVariant_t, SendStat>
GetStatistics() const override;
/// sets up networking and starts traffic /// sets up networking and starts traffic
bool bool
Start(); Start();

@ -29,7 +29,6 @@ namespace llarp
LogTrace("Inbound ", t, " packet (", buf.sz, "B) on convo ", tag); LogTrace("Inbound ", t, " packet (", buf.sz, "B) on convo ", tag);
if (t == service::ProtocolType::Control) if (t == service::ProtocolType::Control)
{ {
MarkConvoTagActive(tag);
return true; return true;
} }
if (t != service::ProtocolType::QUIC) if (t != service::ProtocolType::QUIC)
@ -46,7 +45,6 @@ namespace llarp
LogWarn("invalid incoming quic packet, dropping"); LogWarn("invalid incoming quic packet, dropping");
return false; return false;
} }
MarkConvoTagActive(tag);
quic->receive_packet(tag, buf); quic->receive_packet(tag, buf);
return true; return true;
} }

@ -999,7 +999,6 @@ namespace llarp
return false; return false;
} }
LogInfo("tag active T=", tag); LogInfo("tag active T=", tag);
MarkConvoTagActive(tag);
quic->receive_packet(tag, buf); quic->receive_packet(tag, buf);
return true; return true;
} }

@ -2,23 +2,40 @@
#include "lokinet.h" #include "lokinet.h"
#include "llarp.hpp" #include "llarp.hpp"
#include "config/config.hpp" #include <llarp/config/config.hpp>
#include <llarp/crypto/crypto_libsodium.hpp>
#include <llarp/router/abstractrouter.hpp> #include <llarp/router/abstractrouter.hpp>
#include <llarp/service/context.hpp> #include <llarp/service/context.hpp>
#include <llarp/quic/tunnel.hpp> #include <llarp/quic/tunnel.hpp>
#include <llarp/nodedb.hpp>
#include <mutex> #include <mutex>
namespace
{
struct Context : public llarp::Context
{
using llarp::Context::Context;
std::shared_ptr<llarp::NodeDB>
makeNodeDB() override
{
return std::make_shared<llarp::NodeDB>();
}
};
} // namespace
struct lokinet_context struct lokinet_context
{ {
std::mutex m_access; std::mutex m_access;
std::shared_ptr<llarp::Context> impl; std::shared_ptr<llarp::Context> impl;
std::shared_ptr<llarp::Config> config;
std::unique_ptr<std::thread> runner; std::unique_ptr<std::thread> runner;
lokinet_context() : impl{std::make_shared<llarp::Context>()} lokinet_context() : impl{std::make_shared<Context>()}, config{llarp::Config::EmbeddedConfig()}
{} {}
~lokinet_context() ~lokinet_context()
@ -56,6 +73,7 @@ namespace
void void
stream_error(lokinet_stream_result* result, int err) stream_error(lokinet_stream_result* result, int err)
{ {
std::memset(result, 0, sizeof(lokinet_stream_result));
result->error = err; result->error = err;
} }
@ -126,6 +144,24 @@ extern "C"
return strdup(addrStr.c_str()); return strdup(addrStr.c_str());
} }
int
lokinet_add_bootstrap_rc(const char* data, size_t datalen, struct lokinet_context* ctx)
{
llarp_buffer_t buf{data, datalen};
llarp::RouterContact rc{};
if (ctx == nullptr)
return -3;
auto lock = ctx->acquire();
// add a temp cryptography implementation here so rc.Verify works
llarp::CryptoManager instance{new llarp::sodium::CryptoLibSodium{}};
if (not rc.BDecode(&buf))
return -1;
if (not rc.Verify(llarp::time_now_ms()))
return -2;
ctx->config->bootstrap.routers.insert(std::move(rc));
return 0;
}
struct lokinet_context* struct lokinet_context*
lokinet_context_new() lokinet_context_new()
{ {
@ -139,19 +175,22 @@ extern "C"
delete ctx; delete ctx;
} }
void int
lokinet_context_start(struct lokinet_context* ctx) lokinet_context_start(struct lokinet_context* ctx)
{ {
if (not ctx) if (not ctx)
return; return -1;
auto lock = ctx->acquire(); auto lock = ctx->acquire();
ctx->runner = std::make_unique<std::thread>([ctx]() { ctx->runner = std::make_unique<std::thread>([ctx]() {
llarp::util::SetThreadName("llarp-mainloop"); llarp::util::SetThreadName("llarp-mainloop");
ctx->impl->Configure(llarp::Config::EmbeddedConfig()); ctx->impl->Configure(ctx->config);
const llarp::RuntimeOptions opts{}; const llarp::RuntimeOptions opts{};
try try
{ {
ctx->impl->Setup(opts); ctx->impl->Setup(opts);
#ifdef SIG_PIPE
signal(SIG_PIPE, SIGIGN);
#endif
ctx->impl->Run(opts); ctx->impl->Run(opts);
} }
catch (std::exception& ex) catch (std::exception& ex)
@ -163,9 +202,10 @@ extern "C"
while (not ctx->impl->IsUp()) while (not ctx->impl->IsUp())
{ {
if (ctx->impl->IsStopping()) if (ctx->impl->IsStopping())
return; return -1;
std::this_thread::sleep_for(5ms); std::this_thread::sleep_for(5ms);
} }
return 0;
} }
void void

@ -63,10 +63,15 @@ namespace llarp
{ {
EnsureSkiplist(m_Root); EnsureSkiplist(m_Root);
} }
NodeDB::NodeDB() : m_Root{}, disk{[](auto) {}}, m_NextFlushAt{0s}
{}
void void
NodeDB::Tick(llarp_time_t now) NodeDB::Tick(llarp_time_t now)
{ {
if (m_NextFlushAt == 0s)
return;
if (now > m_NextFlushAt) if (now > m_NextFlushAt)
{ {
m_NextFlushAt += FlushInterval; m_NextFlushAt += FlushInterval;
@ -102,6 +107,9 @@ namespace llarp
void void
NodeDB::LoadFromDisk() NodeDB::LoadFromDisk()
{ {
if (m_Root.empty())
return;
for (const char& ch : skiplist_subdirs) for (const char& ch : skiplist_subdirs)
{ {
if (!ch) if (!ch)
@ -125,6 +133,9 @@ namespace llarp
void void
NodeDB::SaveToDisk() const NodeDB::SaveToDisk() const
{ {
if (m_Root.empty())
return;
for (const auto& item : m_Entries) for (const auto& item : m_Entries)
{ {
item.second.rc.Write(GetPathForPubkey(item.first)); item.second.rc.Write(GetPathForPubkey(item.first));
@ -209,6 +220,8 @@ namespace llarp
void void
NodeDB::AsyncRemoveManyFromDisk(std::unordered_set<RouterID> remove) const NodeDB::AsyncRemoveManyFromDisk(std::unordered_set<RouterID> remove) const
{ {
if (m_Root.empty())
return;
// build file list // build file list
std::set<fs::path> files; std::set<fs::path> files;
for (auto id : remove) for (auto id : remove)

@ -50,6 +50,9 @@ namespace llarp
public: public:
explicit NodeDB(fs::path rootdir, std::function<void(std::function<void()>)> diskCaller); explicit NodeDB(fs::path rootdir, std::function<void(std::function<void()>)> diskCaller);
/// in memory nodedb
NodeDB();
/// load all entries from disk syncrhonously /// load all entries from disk syncrhonously
void void
LoadFromDisk(); LoadFromDisk();

@ -857,7 +857,8 @@ namespace llarp::quic
if (!was_closing && stream.close_callback) if (!was_closing && stream.close_callback)
{ {
LogDebug("Invoke stream close callback"); LogDebug("Invoke stream close callback");
stream.close_callback(stream, app_code == 0 ? std::nullopt : std::optional<uint64_t>{app_code}); stream.close_callback(
stream, app_code == 0 ? std::nullopt : std::optional<uint64_t>{app_code});
} }
LogDebug("Erasing stream ", id, " from ", (void*)it->second.get()); LogDebug("Erasing stream ", id, " from ", (void*)it->second.get());
@ -866,10 +867,9 @@ namespace llarp::quic
if (!ngtcp2_conn_is_local_stream(*this, id.id)) if (!ngtcp2_conn_is_local_stream(*this, id.id))
ngtcp2_conn_extend_max_streams_bidi(*this, 1); ngtcp2_conn_extend_max_streams_bidi(*this, 1);
io_ready(); // Probably superfluous but sometimes we might need to send a FIN or something. io_ready(); // Probably superfluous but sometimes we might need to send a FIN or something.
} }
int int
Connection::stream_ack(StreamID id, size_t size) Connection::stream_ack(StreamID id, size_t size)
{ {

@ -131,7 +131,7 @@ namespace llarp::quic
stream->data(nullptr); stream->data(nullptr);
tcp.data(nullptr); tcp.data(nullptr);
} }
tcp.closeReset(); // tcp.closeReset();
}); });
tcp.on<uvw::DataEvent>(on_outgoing_data); tcp.on<uvw::DataEvent>(on_outgoing_data);
stream.data_callback = on_incoming_data; stream.data_callback = on_incoming_data;

@ -497,11 +497,11 @@ namespace llarp
std::vector<fs::path> configRouters = conf.connect.routers; std::vector<fs::path> configRouters = conf.connect.routers;
configRouters.insert( configRouters.insert(
configRouters.end(), conf.bootstrap.routers.begin(), conf.bootstrap.routers.end()); configRouters.end(), conf.bootstrap.files.begin(), conf.bootstrap.files.end());
// if our conf had no bootstrap files specified, try the default location of // if our conf had no bootstrap files specified, try the default location of
// <DATA_DIR>/bootstrap.signed. If this isn't present, leave a useful error message // <DATA_DIR>/bootstrap.signed. If this isn't present, leave a useful error message
if (configRouters.empty()) if (configRouters.empty() and conf.bootstrap.routers.empty())
{ {
// TODO: use constant // TODO: use constant
fs::path defaultBootstrapFile = conf.router.m_dataDir / "bootstrap.signed"; fs::path defaultBootstrapFile = conf.router.m_dataDir / "bootstrap.signed";
@ -549,6 +549,11 @@ namespace llarp
} }
} }
for (const auto& rc : conf.bootstrap.routers)
{
b_list.emplace(rc);
}
for (auto& rc : b_list) for (auto& rc : b_list)
{ {
if (not rc.Verify(Now())) if (not rc.Verify(Now()))
@ -559,7 +564,17 @@ namespace llarp
bootstrapRCList.emplace(std::move(rc)); bootstrapRCList.emplace(std::move(rc));
} }
LogInfo("Loaded ", bootstrapRCList.size(), " bootstrap routers"); if (bootstrapRCList.empty() and not conf.bootstrap.seednode)
{
throw std::runtime_error{"we have no bootstrap nodes"};
}
if (conf.bootstrap.seednode)
{
LogInfo("we are a seed node");
}
else
LogInfo("Loaded ", bootstrapRCList.size(), " bootstrap routers");
// Init components after relevant config settings loaded // Init components after relevant config settings loaded
_outboundMessageHandler.Init(&_linkManager, &_rcLookupHandler, _loop); _outboundMessageHandler.Init(&_linkManager, &_rcLookupHandler, _loop);

@ -259,7 +259,10 @@ namespace llarp::rpc
} }
util::StatusObject result; util::StatusObject result;
result["id"] = id; result["id"] = id;
result["addr"] = ep->LocalAddress() + ":" + std::to_string(port); std::string localAddress;
var::visit(
[&](auto&& addr) { localAddress = addr.ToString(); }, ep->LocalAddress());
result["addr"] = localAddress + ":" + std::to_string(port);
reply(CreateJSONResponse(result)); reply(CreateJSONResponse(result));
} }
else if (closeID) else if (closeID)

@ -346,7 +346,6 @@ namespace llarp
itr->second.inbound = inbound; itr->second.inbound = inbound;
itr->second.remote = info; itr->second.remote = info;
} }
itr->second.lastUsed = Now();
} }
size_t size_t
@ -383,7 +382,6 @@ namespace llarp
{ {
auto& s = Sessions()[tag]; auto& s = Sessions()[tag];
s.intro = intro; s.intro = intro;
s.lastUsed = Now();
} }
bool bool
@ -405,7 +403,6 @@ namespace llarp
return; return;
} }
itr->second.replyIntro = intro; itr->second.replyIntro = intro;
itr->second.lastUsed = Now();
} }
bool bool
@ -443,17 +440,18 @@ namespace llarp
itr = Sessions().emplace(tag, Session{}).first; itr = Sessions().emplace(tag, Session{}).first;
} }
itr->second.sharedKey = k; itr->second.sharedKey = k;
itr->second.lastUsed = Now();
} }
void void
Endpoint::MarkConvoTagActive(const ConvoTag& tag) Endpoint::ConvoTagTX(const ConvoTag& tag)
{ {
auto itr = Sessions().find(tag); Sessions()[tag].TX();
if (itr != Sessions().end()) }
{
itr->second.lastUsed = Now(); void
} Endpoint::ConvoTagRX(const ConvoTag& tag)
{
Sessions()[tag].RX();
} }
bool bool
@ -1012,10 +1010,35 @@ namespace llarp
return false; return false;
} }
std::string EndpointBase::AddressVariant_t
Endpoint::LocalAddress() const Endpoint::LocalAddress() const
{ {
return m_Identity.pub.Addr().ToString(); return m_Identity.pub.Addr();
}
inline void
AccumulateStats(const Session& session, EndpointBase::SendStat& stats)
{}
inline void
AccumulateStats(
const std::shared_ptr<exit::BaseSession>& session, EndpointBase::SendStat& stats)
{}
std::unordered_map<EndpointBase::AddressVariant_t, EndpointBase::SendStat>
Endpoint::GetStatistics() const
{
std::unordered_map<AddressVariant_t, SendStat> stats;
for (const auto& item : Sessions())
{
Address addr = item.second.remote.Addr();
AccumulateStats(item.second, stats[addr]);
}
for (const auto& item : m_state->m_SNodeSessions)
{
AccumulateStats(item.second.first, stats[item.first]);
}
return stats;
} }
bool bool
@ -1372,13 +1395,16 @@ namespace llarp
{ {
if (*ptr == m_Identity.pub.Addr()) if (*ptr == m_Identity.pub.Addr())
{ {
ConvoTagTX(tag);
Loop()->wakeup(); Loop()->wakeup();
return HandleInboundPacket(tag, pkt, t, 0); if (not HandleInboundPacket(tag, pkt, t, 0))
return false;
ConvoTagRX(tag);
return true;
} }
} }
if (not SendToOrQueue(*maybe, pkt, t)) if (not SendToOrQueue(*maybe, pkt, t))
return false; return false;
MarkConvoTagActive(tag);
Loop()->wakeup(); Loop()->wakeup();
return true; return true;
} }
@ -1393,11 +1419,13 @@ namespace llarp
auto pkt = std::make_shared<net::IPPacket>(); auto pkt = std::make_shared<net::IPPacket>();
if (!pkt->Load(buf)) if (!pkt->Load(buf))
return false; return false;
EnsurePathToSNode( EnsurePathToSNode(addr, [=](RouterID, exit::BaseSession_ptr s, ConvoTag tag) {
addr, [pkt, t](RouterID, exit::BaseSession_ptr s, [[maybe_unused]] ConvoTag tag) { if (s)
if (s) {
s->SendPacketToRemote(pkt->ConstBuffer(), t); ConvoTagTX(tag);
}); s->SendPacketToRemote(pkt->ConstBuffer(), t);
}
});
return true; return true;
} }
@ -1428,7 +1456,7 @@ namespace llarp
msg.seqno); msg.seqno);
if (HandleInboundPacket(msg.tag, msg.payload, msg.proto, msg.seqno)) if (HandleInboundPacket(msg.tag, msg.payload, msg.proto, msg.seqno))
{ {
MarkConvoTagActive(msg.tag); ConvoTagRX(msg.tag);
} }
else else
{ {
@ -1451,7 +1479,7 @@ namespace llarp
SendEvent_t item = m_SendQueue.popFront(); SendEvent_t item = m_SendQueue.popFront();
item.first->S = item.second->NextSeqNo(); item.first->S = item.second->NextSeqNo();
if (item.second->SendRoutingMessage(*item.first, router)) if (item.second->SendRoutingMessage(*item.first, router))
MarkConvoTagActive(item.first->T.T); ConvoTagTX(item.first->T.T);
} }
UpstreamFlush(router); UpstreamFlush(router);
@ -1542,7 +1570,7 @@ namespace llarp
else else
tag.Randomize(); tag.Randomize();
PutSenderFor(tag, m_Identity.pub, true); PutSenderFor(tag, m_Identity.pub, true);
MarkConvoTagActive(tag); ConvoTagTX(tag);
Sessions()[tag].forever = true; Sessions()[tag].forever = true;
Loop()->call_soon([tag, hook]() { hook(tag); }); Loop()->call_soon([tag, hook]() { hook(tag); });
return true; return true;
@ -1648,10 +1676,14 @@ namespace llarp
m->introReply = p->intro; m->introReply = p->intro;
PutReplyIntroFor(f.T, m->introReply); PutReplyIntroFor(f.T, m->introReply);
m->sender = m_Identity.pub; m->sender = m_Identity.pub;
m->seqno = GetSeqNoForConvo(f.T); if (auto maybe = GetSeqNoForConvo(f.T))
if (m->seqno == 0) {
m->seqno = *maybe;
}
else
{ {
LogWarn(Name(), " no session T=", f.T); LogWarn(Name(), " no session T=", f.T);
return false;
} }
f.S = m->seqno; f.S = m->seqno;
f.F = m->introReply.pathID; f.F = m->introReply.pathID;
@ -1680,45 +1712,45 @@ namespace llarp
} }
// Failed to find a suitable inbound convo, look for outbound // Failed to find a suitable inbound convo, look for outbound
LogTrace("Not an inbound convo"); LogTrace("Not an inbound convo");
auto& sessions = m_state->m_RemoteSessions; auto& sessions = m_state->m_RemoteSessions;
auto range = sessions.equal_range(remote); auto range = sessions.equal_range(remote);
for (auto itr = range.first; itr != range.second; ++itr) for (auto itr = range.first; itr != range.second; ++itr)
{
if (itr->second->ReadyToSend())
{ {
if (itr->second->ReadyToSend()) LogTrace("Found an outbound session to use to reach ", remote);
{ itr->second->AsyncEncryptAndSendTo(data, t);
LogTrace("Found an outbound session to use to reach ", remote); return true;
itr->second->AsyncEncryptAndSendTo(data, t);
return true;
}
} }
// if we want to make an outbound session }
if (WantsOutboundSession(remote)) // if we want to make an outbound session
{ if (WantsOutboundSession(remote))
LogTrace("Making an outbound session and queuing the data"); {
// add pending traffic LogTrace("Making an outbound session and queuing the data");
auto& traffic = m_state->m_PendingTraffic; // add pending traffic
traffic[remote].emplace_back(data, t); auto& traffic = m_state->m_PendingTraffic;
EnsurePathToService( traffic[remote].emplace_back(data, t);
remote, EnsurePathToService(
[self = this](Address addr, OutboundContext* ctx) { remote,
if (ctx) [self = this](Address addr, OutboundContext* ctx) {
{ if (ctx)
for (auto& pending : self->m_state->m_PendingTraffic[addr]) {
{ for (auto& pending : self->m_state->m_PendingTraffic[addr])
ctx->AsyncEncryptAndSendTo(pending.Buffer(), pending.protocol);
}
}
else
{ {
LogWarn("no path made to ", addr); ctx->AsyncEncryptAndSendTo(pending.Buffer(), pending.protocol);
} }
self->m_state->m_PendingTraffic.erase(addr); }
}, else
1500ms); {
return true; LogWarn("no path made to ", addr);
} }
LogDebug("SendOrQueue failed: no inbound/outbound sessions"); self->m_state->m_PendingTraffic.erase(addr);
},
1500ms);
return true;
}
LogDebug("SendOrQueue failed: no inbound/outbound sessions");
return false; return false;
} }
@ -1735,14 +1767,13 @@ namespace llarp
return Sessions().find(t) != Sessions().end(); return Sessions().find(t) != Sessions().end();
} }
uint64_t std::optional<uint64_t>
Endpoint::GetSeqNoForConvo(const ConvoTag& tag) Endpoint::GetSeqNoForConvo(const ConvoTag& tag)
{ {
auto itr = Sessions().find(tag); auto itr = Sessions().find(tag);
if (itr == Sessions().end()) if (itr == Sessions().end())
return 0; return std::nullopt;
itr->second.seqno += 1; return itr->second.seqno++;
return itr->second.seqno;
} }
bool bool

@ -136,9 +136,12 @@ namespace llarp
std::string std::string
Name() const override; Name() const override;
std::string AddressVariant_t
LocalAddress() const override; LocalAddress() const override;
std::unordered_map<AddressVariant_t, SendStat>
GetStatistics() const override;
bool bool
ShouldPublishDescriptors(llarp_time_t now) const override; ShouldPublishDescriptors(llarp_time_t now) const override;
@ -332,7 +335,10 @@ namespace llarp
RemoveConvoTag(const ConvoTag& remote) override; RemoveConvoTag(const ConvoTag& remote) override;
void void
MarkConvoTagActive(const ConvoTag& remote) override; ConvoTagTX(const ConvoTag& remote) override;
void
ConvoTagRX(const ConvoTag& remote) override;
void void
PutReplyIntroFor(const ConvoTag& remote, const Introduction& intro) override; PutReplyIntroFor(const ConvoTag& remote, const Introduction& intro) override;
@ -346,7 +352,7 @@ namespace llarp
void void
PutNewOutboundContext(const IntroSet& introset, llarp_time_t timeLeftToAlign); PutNewOutboundContext(const IntroSet& introset, llarp_time_t timeLeftToAlign);
uint64_t std::optional<uint64_t>
GetSeqNoForConvo(const ConvoTag& tag); GetSeqNoForConvo(const ConvoTag& tag);
bool bool

@ -33,8 +33,13 @@ namespace llarp
virtual void virtual void
PutCachedSessionKeyFor(const ConvoTag& remote, const SharedSecret& secret) = 0; PutCachedSessionKeyFor(const ConvoTag& remote, const SharedSecret& secret) = 0;
/// called when we send data to remote on a convotag
virtual void virtual void
MarkConvoTagActive(const ConvoTag& tag) = 0; ConvoTagTX(const ConvoTag& tag) = 0;
/// called when we got data from remote on a convotag
virtual void
ConvoTagRX(const ConvoTag& tag) = 0;
virtual void virtual void
RemoveConvoTag(const ConvoTag& remote) = 0; RemoveConvoTag(const ConvoTag& remote) = 0;

@ -12,15 +12,17 @@ namespace llarp
{ {
namespace service namespace service
{ {
std::ostream& operator<<(std::ostream& o, ProtocolType t) { std::ostream&
return o << operator<<(std::ostream& o, ProtocolType t)
(t == ProtocolType::Control ? "Control" : {
t == ProtocolType::TrafficV4 ? "TrafficV4" : return o
t == ProtocolType::TrafficV6 ? "TrafficV6" : << (t == ProtocolType::Control ? "Control"
t == ProtocolType::Exit ? "Exit" : : t == ProtocolType::TrafficV4 ? "TrafficV4"
t == ProtocolType::Auth ? "Auth" : : t == ProtocolType::TrafficV6 ? "TrafficV6"
t == ProtocolType::QUIC ? "QUIC" : : t == ProtocolType::Exit ? "Exit"
"(unknown-protocol-type)"); : t == ProtocolType::Auth ? "Auth"
: t == ProtocolType::QUIC ? "QUIC"
: "(unknown-protocol-type)");
} }
ProtocolMessage::ProtocolMessage() ProtocolMessage::ProtocolMessage()

@ -31,7 +31,8 @@ namespace llarp
constexpr std::size_t MAX_PROTOCOL_MESSAGE_SIZE = 2048 * 2; constexpr std::size_t MAX_PROTOCOL_MESSAGE_SIZE = 2048 * 2;
std::ostream& operator<<(std::ostream& o, ProtocolType t); std::ostream&
operator<<(std::ostream& o, ProtocolType t);
/// inner message /// inner message
struct ProtocolMessage struct ProtocolMessage

@ -53,7 +53,7 @@ namespace llarp
{ {
lastGoodSend = r->Now(); lastGoodSend = r->Now();
flushpaths.emplace(item.second); flushpaths.emplace(item.second);
m_Endpoint->MarkConvoTagActive(item.first->T.T); m_Endpoint->ConvoTagTX(item.first->T.T);
const auto rtt = (item.second->intro.latency + remoteIntro.latency) * 2; const auto rtt = (item.second->intro.latency + remoteIntro.latency) * 2;
rttRMS += rtt * rtt.count(); rttRMS += rtt * rtt.count();
} }
@ -98,7 +98,15 @@ namespace llarp
m_DataHandler->PutIntroFor(f->T, remoteIntro); m_DataHandler->PutIntroFor(f->T, remoteIntro);
m_DataHandler->PutReplyIntroFor(f->T, path->intro); m_DataHandler->PutReplyIntroFor(f->T, path->intro);
m->proto = t; m->proto = t;
m->seqno = m_Endpoint->GetSeqNoForConvo(f->T); if (auto maybe = m_Endpoint->GetSeqNoForConvo(f->T))
{
m->seqno = *maybe;
}
else
{
LogWarn(m_Endpoint->Name(), " no session T=", f->T);
return;
}
m->introReply = path->intro; m->introReply = path->intro;
f->F = m->introReply.pathID; f->F = m->introReply.pathID;
m->sender = m_Endpoint->GetIdentity().pub; m->sender = m_Endpoint->GetIdentity().pub;

@ -8,10 +8,13 @@ namespace llarp
Session::ExtractStatus() const Session::ExtractStatus() const
{ {
util::StatusObject obj{ util::StatusObject obj{
{"lastUsed", to_json(lastUsed)}, {"lastSend", to_json(lastSend)},
{"lastRecv", to_json(lastRecv)},
{"replyIntro", replyIntro.ExtractStatus()}, {"replyIntro", replyIntro.ExtractStatus()},
{"remote", remote.Addr().ToString()}, {"remote", remote.Addr().ToString()},
{"seqno", seqno}, {"seqno", seqno},
{"tx", messagesSend},
{"rx", messagesRecv},
{"intro", intro.ExtractStatus()}}; {"intro", intro.ExtractStatus()}};
return obj; return obj;
} }
@ -21,8 +24,23 @@ namespace llarp
{ {
if (forever) if (forever)
return false; return false;
const auto lastUsed = std::max(lastSend, lastRecv);
return now > lastUsed && (now - lastUsed > lifetime || intro.IsExpired(now)); return now > lastUsed && (now - lastUsed > lifetime || intro.IsExpired(now));
} }
void
Session::TX()
{
messagesSend++;
lastSend = time_now_ms();
}
void
Session::RX()
{
messagesRecv++;
lastRecv = time_now_ms();
}
} // namespace service } // namespace service
} // namespace llarp } // namespace llarp

@ -21,15 +21,32 @@ namespace llarp
ServiceInfo remote; ServiceInfo remote;
/// the intro they have /// the intro they have
Introduction intro; Introduction intro;
/// the intro remoet last sent on
Introduction lastInboundIntro; /// the sequence number we are to use for the next message
llarp_time_t lastUsed = 0s;
uint64_t seqno = 0; uint64_t seqno = 0;
/// number of remote messages we sent to them
uint64_t messagesSend = 0;
/// number of remote messages we got from them
uint64_t messagesRecv = 0;
bool inbound = false; bool inbound = false;
bool forever = false; bool forever = false;
Duration_t lastSend{};
Duration_t lastRecv{};
util::StatusObject util::StatusObject
ExtractStatus() const; ExtractStatus() const;
/// called to indicate we recieved on this session
void
RX();
/// called to indicate we transimitted on this session
void
TX();
bool bool
IsExpired(llarp_time_t now, llarp_time_t lifetime = SessionLifetime) const; IsExpired(llarp_time_t now, llarp_time_t lifetime = SessionLifetime) const;
}; };

@ -44,8 +44,7 @@ namespace llarp
return LogType::Unknown; return LogType::Unknown;
} }
LogContext::LogContext() LogContext::LogContext() : logStream{std::make_unique<Stream_t>(_LOGSTREAM_INIT)}
: logStream(std::make_unique<Stream_t>(_LOGSTREAM_INIT)), started(llarp::time_now_ms())
{} {}
LogContext& LogContext&
@ -71,9 +70,7 @@ namespace llarp
{} {}
log_timestamp::log_timestamp(const char* fmt) log_timestamp::log_timestamp(const char* fmt)
: format(fmt) : format{fmt}, now{llarp::time_now_ms()}, delta{llarp::uptime()}
, now(llarp::time_now_ms())
, delta(llarp::time_now_ms() - LogContext::Instance().started)
{} {}
void void

@ -28,8 +28,6 @@ namespace llarp
ILogStream_ptr logStream; ILogStream_ptr logStream;
std::string nodeName = "lokinet"; std::string nodeName = "lokinet";
const llarp_time_t started;
static LogContext& static LogContext&
Instance(); Instance();

@ -50,8 +50,8 @@ namespace llarp
struct log_timestamp struct log_timestamp
{ {
const char* format; const char* format;
const llarp_time_t now; const Duration_t now;
const llarp_time_t delta; const Duration_t delta;
log_timestamp(); log_timestamp();

@ -1,73 +1,55 @@
#include "time.hpp" #include "time.hpp"
#include <chrono> #include <chrono>
#include <llarp/util/logging/logger.hpp>
namespace llarp namespace llarp
{ {
using Clock_t = std::chrono::system_clock; using Clock_t = std::chrono::system_clock;
template <typename Res, typename Clock> template <typename Res, typename Clock>
static llarp_time_t static Duration_t
time_since_epoch() time_since_epoch(std::chrono::time_point<Clock> point)
{ {
return std::chrono::duration_cast<Res>(Clock::now().time_since_epoch()); return std::chrono::duration_cast<Res>(point.time_since_epoch());
} }
const static llarp_time_t started_at_system = const static auto started_at_system = Clock_t::now();
time_since_epoch<std::chrono::milliseconds, Clock_t>();
const static auto started_at_steady = std::chrono::steady_clock::now();
uint64_t
ToMS(Duration_t ms)
{
return ms.count();
}
const static llarp_time_t started_at_steady =
time_since_epoch<std::chrono::milliseconds, std::chrono::steady_clock>();
/// get our uptime in ms /// get our uptime in ms
static llarp_time_t Duration_t
time_since_started() uptime()
{ {
return time_since_epoch<std::chrono::milliseconds, std::chrono::steady_clock>() return std::chrono::duration_cast<Duration_t>(
- started_at_steady; std::chrono::steady_clock::now() - started_at_steady);
} }
llarp_time_t Duration_t
time_now_ms() time_now_ms()
{ {
static llarp_time_t lastTime = 0s; auto t = uptime();
auto t = time_since_started();
#ifdef TESTNET_SPEED #ifdef TESTNET_SPEED
t /= uint64_t(TESTNET_SPEED); t /= uint64_t{TESTNET_SPEED};
#endif #endif
t += started_at_system; return t + time_since_epoch<Duration_t, Clock_t>(started_at_system);
if (t <= lastTime)
{
return lastTime;
}
if (lastTime == 0s)
{
lastTime = t;
}
const auto dlt = t - lastTime;
if (dlt > 5s)
{
// big timeskip
t = lastTime;
lastTime = 0s;
}
else
{
lastTime = t;
}
return t;
} }
nlohmann::json nlohmann::json
to_json(const llarp_time_t& t) to_json(const Duration_t& t)
{ {
return t.count(); return ToMS(t);
} }
std::ostream& std::ostream&
operator<<(std::ostream& out, const llarp_time_t& t) operator<<(std::ostream& out, const Duration_t& t)
{ {
std::chrono::milliseconds amount = t; std::chrono::milliseconds amount{ToMS(t)};
auto h = std::chrono::duration_cast<std::chrono::hours>(amount); auto h = std::chrono::duration_cast<std::chrono::hours>(amount);
amount -= h; amount -= h;
auto m = std::chrono::duration_cast<std::chrono::minutes>(amount); auto m = std::chrono::duration_cast<std::chrono::minutes>(amount);

@ -2,19 +2,28 @@
#include "types.hpp" #include "types.hpp"
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include <iostream>
using namespace std::chrono_literals; using namespace std::chrono_literals;
namespace llarp namespace llarp
{ {
/// get time right now as milliseconds, this is monotonic /// get time right now as milliseconds, this is monotonic
llarp_time_t Duration_t
time_now_ms(); time_now_ms();
/// get the uptime of the process
Duration_t
uptime();
/// convert to milliseconds
uint64_t
ToMS(Duration_t duration);
std::ostream& std::ostream&
operator<<(std::ostream& out, const llarp_time_t& t); operator<<(std::ostream& out, const Duration_t& t);
nlohmann::json nlohmann::json
to_json(const llarp_time_t& t); to_json(const Duration_t& t);
} // namespace llarp } // namespace llarp

@ -6,9 +6,14 @@
using byte_t = uint8_t; using byte_t = uint8_t;
using llarp_proto_version_t = std::uint8_t; using llarp_proto_version_t = std::uint8_t;
using llarp_time_t = std::chrono::milliseconds;
namespace llarp namespace llarp
{ {
using Duration_t = std::chrono::milliseconds;
using namespace std::literals; using namespace std::literals;
}
/// convert to milliseconds
uint64_t
ToMS(Duration_t duration);
} // namespace llarp
using llarp_time_t = llarp::Duration_t;

Loading…
Cancel
Save