From 8bc60a59ac9f637958615d220ba91bb350fcf066 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 1 Apr 2021 12:51:02 -0400 Subject: [PATCH] fix up liblokinet api to be more friendly to ffi add lnproxy exmaple of use of liblokinet --- contrib/py/lnproxy/lnproxy/__main__.py | 155 +++++++++++++++++++++++++ include/lokinet.h | 17 ++- llarp/config/config.cpp | 5 +- llarp/lokinet_shared.cpp | 105 ++++++++++++----- 4 files changed, 245 insertions(+), 37 deletions(-) create mode 100644 contrib/py/lnproxy/lnproxy/__main__.py diff --git a/contrib/py/lnproxy/lnproxy/__main__.py b/contrib/py/lnproxy/lnproxy/__main__.py new file mode 100644 index 000000000..a57a1057a --- /dev/null +++ b/contrib/py/lnproxy/lnproxy/__main__.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 + +from http.server import ThreadingHTTPServer as Server +from http.server import BaseHTTPRequestHandler as BaseHandler + +import ctypes +from ctypes.util import find_library + +import selectors +import socket + +import os + +class ResultStruct(ctypes.Structure): + _pack_ = 1 + _fields_ = [ + ("err", ctypes.c_int), + ("local_address", ctypes.c_char * 256), + ("local_port", ctypes.c_int), + ("stream_id", ctypes.c_int) + ] + + def __repr__(self): + return "".format(self.err, self.local_address, self.local_port, self.stream_id) + + +class LNContext(ctypes.Structure): + pass + +class Context: + """ + wrapper around liblokinet + """ + + def __init__(self): + self._ln = ctypes.CDLL(find_library("lokinet")) + self._c = ctypes.CDLL(find_library("c")) + self._ln.lokinet_context_new.restype = ctypes.POINTER(LNContext) + self._ln.lokinet_address.restype = ctypes.c_char_p + self._ln.lokinet_address.argtypes = (ctypes.POINTER(LNContext), ) + self._ln.lokinet_outbound_stream.restype = ctypes.POINTER(ResultStruct) + self._ln.lokinet_outbound_stream.argtypes = (ctypes.POINTER(ResultStruct), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(LNContext)) + self._ctx = self._ln.lokinet_context_new() + + def free(self, ptr): + self._c.free(ptr) + + + def addr(self): + return self._ln.lokinet_address(self._ctx).decode('ascii') + + def expose(self, port): + return self.ln_call('lokinet_inbound_stream', port) + + def ln_call(self, funcname, *args): + args += (self._ctx,) + print("call {}{}".format(funcname, args)) + return self._ln[funcname](*args) + + def start(self): + self.ln_call("lokinet_context_start") + + def stop(self): + self.ln_call("lokinet_context_stop") + + def __del__(self): + self.stop() + self._ln_call("lokinet_context_free") + +class Stream: + + def __init__(self, ctx): + self._ctx = ctx + self._id = None + + def connect(self, remote): + result = ResultStruct() + self._ctx.ln_call("lokinet_outbound_stream", ctypes.cast(ctypes.addressof(result), ctypes.POINTER(ResultStruct)), ctypes.create_string_buffer(remote.encode()), ctypes.c_char_p(0)) + + if result.err: + print(result.err) + return + addr = result.local_address.decode('ascii') + port = result.local_port + self._id = result.stream_id + print("connect to {} made via {}:{} via {}".format(remote, addr, port, self._id)) + return addr, port + + + def close(self): + if self._id is not None: + self._ctx.ln_call("lokinet_close_stream", self._id) + +def read_and_forward_or_close(readfd, writefd): + read = 0 + while True: + data = os.read(readfd, 128) + read += len(data) + if data and len(data) > 0: + writefd.write(data) + writefd.flush() + else: + return read > 0 + +ctx = Context() + + +class Handler(BaseHandler): + + def do_CONNECT(self): + self.connect(self.path) + + def connect(self, host): + global ctx + stream = Stream(ctx) + + result = stream.connect(host) + if not result: + self.send_error(503) + return + sock = socket.socket() + sock.connect(result) + if not sock: + self.send_error(504) + return + + self.send_response_only(200) + self.end_headers() + sel = selectors.DefaultSelector() + sock.setblocking(False) + sockfd = sock.makefile('rwb') + sel.register(self.rfile.fileno(), selectors.EVENT_READ, lambda x : read_and_forward_or_close(x, sockfd)) + sel.register(sock.fileno(), selectors.EVENT_READ, lambda x : read_and_forward_or_close(x, self.wfile)) + + print("running") + while True: + events = sel.select(1) + if not events: + continue + for key, mask in events: + if not key.data(key.fileobj): + sel.unregister(self.rfile) + sel.unregister(sock) + sel.close() + stream.close() + return + + +server = Server(('127.0.0.1', 3000), Handler) +ctx.start() +id = ctx.expose(80) +print("we are {}".format(ctx.addr())) +server.serve_forever() +ctx.ln_call("lokinet_close_stream", id) +ctx.stop() diff --git a/include/lokinet.h b/include/lokinet.h index 9135c3f97..2a862ff65 100644 --- a/include/lokinet.h +++ b/include/lokinet.h @@ -32,12 +32,18 @@ extern "C" struct lokinet_context* lokinet_default(); + /// get a free()-able null terminated string that holds our .loki address + /// returns NULL if we dont have one right now + char* + lokinet_address(struct lokinet_context*); + /// the result of a lokinet stream mapping attempt +#pragma pack(1) struct lokinet_stream_result { /// set to zero on success otherwise the error that happened /// use strerror(3) to get printable string of this error - int errno; + int error; /// the local ip address we mapped the remote endpoint to /// null terminated @@ -47,14 +53,17 @@ extern "C" /// the id of the stream we created int stream_id; }; +#pragma pack() /// connect out to a remote endpoint /// remoteAddr is in the form of "name:port" /// localAddr is either NULL for any or in the form of "ip:port" to bind to an explicit address - /// returns a lokinet_stream_result * that contains the result that can be free()'d - struct lokinet_stream_result* + void lokinet_outbound_stream( - const char* remoteAddr, const char* localAddr, struct lokinet_context* context); + struct lokinet_stream_result* result, + const char* remoteAddr, + const char* localAddr, + struct lokinet_context* context); /// stream accept filter determines if we should accept a stream or not /// return 0 to accept diff --git a/llarp/config/config.cpp b/llarp/config/config.cpp index 437c25365..53000e974 100644 --- a/llarp/config/config.cpp +++ b/llarp/config/config.cpp @@ -1356,7 +1356,10 @@ namespace llarp std::shared_ptr Config::EmbeddedConfig() { - auto config = std::make_shared(fs::path{""}); + auto config = std::make_shared(fs::current_path()); + config->Load(); + config->logging.m_logLevel = eLogNone; + config->api.m_enableRPCServer = false; config->network.m_endpointType = "null"; return config; } diff --git a/llarp/lokinet_shared.cpp b/llarp/lokinet_shared.cpp index c21b0b2a2..b8753ccb3 100644 --- a/llarp/lokinet_shared.cpp +++ b/llarp/lokinet_shared.cpp @@ -51,26 +51,24 @@ struct lokinet_context namespace { - struct lokinet_context g_context - {}; + std::unique_ptr g_context; - lokinet_stream_result* - stream_error(int err) + void + stream_error(lokinet_stream_result* result, int err) { - return new lokinet_stream_result{err, {0}, 0, 0}; + result->error = err; } - lokinet_stream_result* - stream_okay(std::string host, int port, int stream_id) + void + stream_okay(lokinet_stream_result* result, std::string host, int port, int stream_id) { - auto* result = new lokinet_stream_result{}; + stream_error(result, 0); std::copy_n( host.c_str(), std::min(host.size(), sizeof(result->local_address) - 1), result->local_address); result->local_port = port; result->stream_id = stream_id; - return result; } std::pair @@ -90,7 +88,7 @@ namespace return {host, serv->s_port}; } else - throw(errno ? errno : EINVAL); + return {host, std::stoi(portStr)}; } int @@ -111,7 +109,21 @@ extern "C" struct lokinet_context* lokinet_default() { - return &g_context; + if (not g_context) + g_context = std::make_unique(); + return g_context.get(); + } + + char* + lokinet_address(struct lokinet_context* ctx) + { + if (not ctx) + return nullptr; + auto lock = ctx->acquire(); + auto ep = ctx->impl->router->hiddenServiceContext().GetEndpointByName("default"); + const auto addr = ep->GetIdentity().pub.Addr(); + const auto addrStr = addr.ToString(); + return strdup(addrStr.c_str()); } struct lokinet_context* @@ -134,11 +146,26 @@ extern "C" return; auto lock = ctx->acquire(); ctx->runner = std::make_unique([ctx]() { + llarp::util::SetThreadName("llarp-mainloop"); ctx->impl->Configure(llarp::Config::EmbeddedConfig()); const llarp::RuntimeOptions opts{}; - ctx->impl->Setup(opts); - ctx->impl->Run(opts); + try + { + ctx->impl->Setup(opts); + ctx->impl->Run(opts); + } + catch (std::exception& ex) + { + std::cerr << ex.what() << std::endl; + ctx->impl->CloseAsync(); + } }); + while (not ctx->impl->IsUp()) + { + if (ctx->impl->IsStopping()) + return; + std::this_thread::sleep_for(5ms); + } } void @@ -160,20 +187,28 @@ extern "C" ctx->runner.reset(); } - struct lokinet_stream_result* - lokinet_outbound_stream(const char* remote, const char* local, struct lokinet_context* ctx) + void + lokinet_outbound_stream( + struct lokinet_stream_result* result, + const char* remote, + const char* local, + struct lokinet_context* ctx) { if (ctx == nullptr) - return stream_error(EHOSTDOWN); - - std::promise promise; + { + stream_error(result, EHOSTDOWN); + return; + } + std::promise promise; { auto lock = ctx->acquire(); if (not ctx->impl->IsUp()) - return stream_error(EHOSTDOWN); - + { + stream_error(result, EHOSTDOWN); + return; + } std::string remotehost; int remoteport; try @@ -184,7 +219,8 @@ extern "C" } catch (int err) { - return stream_error(err); + stream_error(result, err); + return; } // TODO: make configurable (?) std::string endpoint{"default"}; @@ -199,10 +235,12 @@ extern "C" } catch (std::exception& ex) { - return stream_error(EINVAL); + stream_error(result, EINVAL); + return; } auto call = [&promise, ctx, + result, router = ctx->impl->router, remotehost, remoteport, @@ -211,31 +249,35 @@ extern "C" auto ep = router->hiddenServiceContext().GetEndpointByName(endpoint); if (ep == nullptr) { - promise.set_value(stream_error(EHOSTUNREACH)); + stream_error(result, ENOTSUP); + promise.set_value(); return; } auto* quic = ep->GetQUICTunnel(); if (quic == nullptr) { - promise.set_value(stream_error(ENOTSUP)); + stream_error(result, ENOTSUP); + promise.set_value(); return; } try { auto [addr, id] = quic->open( - remotehost, remoteport, [](auto&&) {}, localAddr); + remotehost, remoteport, [](auto) {}, localAddr); auto [host, port] = split_host_port(addr.toString()); ctx->outbound_stream(id); - promise.set_value(stream_okay(host, port, id)); + stream_okay(result, host, port, id); } catch (std::exception& ex) { - promise.set_value(stream_error(ECANCELED)); + std::cout << ex.what() << std::endl; + stream_error(result, ECANCELED); } catch (int err) { - promise.set_value(stream_error(err)); + stream_error(result, err); } + promise.set_value(); }; ctx->impl->CallSafe([call]() { @@ -255,17 +297,16 @@ extern "C" if (auto status = future.wait_for(std::chrono::seconds{10}); status == std::future_status::ready) { - return future.get(); + future.get(); } else { - promise.set_value(stream_error(ETIMEDOUT)); - return future.get(); + stream_error(result, ETIMEDOUT); } } catch (std::exception& ex) { - return stream_error(EBADF); + stream_error(result, EBADF); } }