fix up liblokinet api to be more friendly to ffi

add lnproxy exmaple of use of liblokinet
pull/1576/head
Jeff Becker 3 years ago
parent 853cc52efb
commit 8bc60a59ac
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -0,0 +1,155 @@
#!/usr/bin/env python3
from http.server import ThreadingHTTPServer as Server
from http.server import BaseHTTPRequestHandler as BaseHandler
import ctypes
from ctypes.util import find_library
import selectors
import socket
import os
class ResultStruct(ctypes.Structure):
_pack_ = 1
_fields_ = [
("err", ctypes.c_int),
("local_address", ctypes.c_char * 256),
("local_port", ctypes.c_int),
("stream_id", ctypes.c_int)
]
def __repr__(self):
return "<Result err={} addr={} port={} id={}>".format(self.err, self.local_address, self.local_port, self.stream_id)
class LNContext(ctypes.Structure):
pass
class Context:
"""
wrapper around liblokinet
"""
def __init__(self):
self._ln = ctypes.CDLL(find_library("lokinet"))
self._c = ctypes.CDLL(find_library("c"))
self._ln.lokinet_context_new.restype = ctypes.POINTER(LNContext)
self._ln.lokinet_address.restype = ctypes.c_char_p
self._ln.lokinet_address.argtypes = (ctypes.POINTER(LNContext), )
self._ln.lokinet_outbound_stream.restype = ctypes.POINTER(ResultStruct)
self._ln.lokinet_outbound_stream.argtypes = (ctypes.POINTER(ResultStruct), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(LNContext))
self._ctx = self._ln.lokinet_context_new()
def free(self, ptr):
self._c.free(ptr)
def addr(self):
return self._ln.lokinet_address(self._ctx).decode('ascii')
def expose(self, port):
return self.ln_call('lokinet_inbound_stream', port)
def ln_call(self, funcname, *args):
args += (self._ctx,)
print("call {}{}".format(funcname, args))
return self._ln[funcname](*args)
def start(self):
self.ln_call("lokinet_context_start")
def stop(self):
self.ln_call("lokinet_context_stop")
def __del__(self):
self.stop()
self._ln_call("lokinet_context_free")
class Stream:
def __init__(self, ctx):
self._ctx = ctx
self._id = None
def connect(self, remote):
result = ResultStruct()
self._ctx.ln_call("lokinet_outbound_stream", ctypes.cast(ctypes.addressof(result), ctypes.POINTER(ResultStruct)), ctypes.create_string_buffer(remote.encode()), ctypes.c_char_p(0))
if result.err:
print(result.err)
return
addr = result.local_address.decode('ascii')
port = result.local_port
self._id = result.stream_id
print("connect to {} made via {}:{} via {}".format(remote, addr, port, self._id))
return addr, port
def close(self):
if self._id is not None:
self._ctx.ln_call("lokinet_close_stream", self._id)
def read_and_forward_or_close(readfd, writefd):
read = 0
while True:
data = os.read(readfd, 128)
read += len(data)
if data and len(data) > 0:
writefd.write(data)
writefd.flush()
else:
return read > 0
ctx = Context()
class Handler(BaseHandler):
def do_CONNECT(self):
self.connect(self.path)
def connect(self, host):
global ctx
stream = Stream(ctx)
result = stream.connect(host)
if not result:
self.send_error(503)
return
sock = socket.socket()
sock.connect(result)
if not sock:
self.send_error(504)
return
self.send_response_only(200)
self.end_headers()
sel = selectors.DefaultSelector()
sock.setblocking(False)
sockfd = sock.makefile('rwb')
sel.register(self.rfile.fileno(), selectors.EVENT_READ, lambda x : read_and_forward_or_close(x, sockfd))
sel.register(sock.fileno(), selectors.EVENT_READ, lambda x : read_and_forward_or_close(x, self.wfile))
print("running")
while True:
events = sel.select(1)
if not events:
continue
for key, mask in events:
if not key.data(key.fileobj):
sel.unregister(self.rfile)
sel.unregister(sock)
sel.close()
stream.close()
return
server = Server(('127.0.0.1', 3000), Handler)
ctx.start()
id = ctx.expose(80)
print("we are {}".format(ctx.addr()))
server.serve_forever()
ctx.ln_call("lokinet_close_stream", id)
ctx.stop()

@ -32,12 +32,18 @@ extern "C"
struct lokinet_context*
lokinet_default();
/// get a free()-able null terminated string that holds our .loki address
/// returns NULL if we dont have one right now
char*
lokinet_address(struct lokinet_context*);
/// the result of a lokinet stream mapping attempt
#pragma pack(1)
struct lokinet_stream_result
{
/// set to zero on success otherwise the error that happened
/// use strerror(3) to get printable string of this error
int errno;
int error;
/// the local ip address we mapped the remote endpoint to
/// null terminated
@ -47,14 +53,17 @@ extern "C"
/// the id of the stream we created
int stream_id;
};
#pragma pack()
/// connect out to a remote endpoint
/// remoteAddr is in the form of "name:port"
/// localAddr is either NULL for any or in the form of "ip:port" to bind to an explicit address
/// returns a lokinet_stream_result * that contains the result that can be free()'d
struct lokinet_stream_result*
void
lokinet_outbound_stream(
const char* remoteAddr, const char* localAddr, struct lokinet_context* context);
struct lokinet_stream_result* result,
const char* remoteAddr,
const char* localAddr,
struct lokinet_context* context);
/// stream accept filter determines if we should accept a stream or not
/// return 0 to accept

@ -1356,7 +1356,10 @@ namespace llarp
std::shared_ptr<Config>
Config::EmbeddedConfig()
{
auto config = std::make_shared<Config>(fs::path{""});
auto config = std::make_shared<Config>(fs::current_path());
config->Load();
config->logging.m_logLevel = eLogNone;
config->api.m_enableRPCServer = false;
config->network.m_endpointType = "null";
return config;
}

@ -51,26 +51,24 @@ struct lokinet_context
namespace
{
struct lokinet_context g_context
{};
std::unique_ptr<lokinet_context> g_context;
lokinet_stream_result*
stream_error(int err)
void
stream_error(lokinet_stream_result* result, int err)
{
return new lokinet_stream_result{err, {0}, 0, 0};
result->error = err;
}
lokinet_stream_result*
stream_okay(std::string host, int port, int stream_id)
void
stream_okay(lokinet_stream_result* result, std::string host, int port, int stream_id)
{
auto* result = new lokinet_stream_result{};
stream_error(result, 0);
std::copy_n(
host.c_str(),
std::min(host.size(), sizeof(result->local_address) - 1),
result->local_address);
result->local_port = port;
result->stream_id = stream_id;
return result;
}
std::pair<std::string, int>
@ -90,7 +88,7 @@ namespace
return {host, serv->s_port};
}
else
throw(errno ? errno : EINVAL);
return {host, std::stoi(portStr)};
}
int
@ -111,7 +109,21 @@ extern "C"
struct lokinet_context*
lokinet_default()
{
return &g_context;
if (not g_context)
g_context = std::make_unique<lokinet_context>();
return g_context.get();
}
char*
lokinet_address(struct lokinet_context* ctx)
{
if (not ctx)
return nullptr;
auto lock = ctx->acquire();
auto ep = ctx->impl->router->hiddenServiceContext().GetEndpointByName("default");
const auto addr = ep->GetIdentity().pub.Addr();
const auto addrStr = addr.ToString();
return strdup(addrStr.c_str());
}
struct lokinet_context*
@ -134,11 +146,26 @@ extern "C"
return;
auto lock = ctx->acquire();
ctx->runner = std::make_unique<std::thread>([ctx]() {
llarp::util::SetThreadName("llarp-mainloop");
ctx->impl->Configure(llarp::Config::EmbeddedConfig());
const llarp::RuntimeOptions opts{};
ctx->impl->Setup(opts);
ctx->impl->Run(opts);
try
{
ctx->impl->Setup(opts);
ctx->impl->Run(opts);
}
catch (std::exception& ex)
{
std::cerr << ex.what() << std::endl;
ctx->impl->CloseAsync();
}
});
while (not ctx->impl->IsUp())
{
if (ctx->impl->IsStopping())
return;
std::this_thread::sleep_for(5ms);
}
}
void
@ -160,20 +187,28 @@ extern "C"
ctx->runner.reset();
}
struct lokinet_stream_result*
lokinet_outbound_stream(const char* remote, const char* local, struct lokinet_context* ctx)
void
lokinet_outbound_stream(
struct lokinet_stream_result* result,
const char* remote,
const char* local,
struct lokinet_context* ctx)
{
if (ctx == nullptr)
return stream_error(EHOSTDOWN);
std::promise<lokinet_stream_result*> promise;
{
stream_error(result, EHOSTDOWN);
return;
}
std::promise<void> promise;
{
auto lock = ctx->acquire();
if (not ctx->impl->IsUp())
return stream_error(EHOSTDOWN);
{
stream_error(result, EHOSTDOWN);
return;
}
std::string remotehost;
int remoteport;
try
@ -184,7 +219,8 @@ extern "C"
}
catch (int err)
{
return stream_error(err);
stream_error(result, err);
return;
}
// TODO: make configurable (?)
std::string endpoint{"default"};
@ -199,10 +235,12 @@ extern "C"
}
catch (std::exception& ex)
{
return stream_error(EINVAL);
stream_error(result, EINVAL);
return;
}
auto call = [&promise,
ctx,
result,
router = ctx->impl->router,
remotehost,
remoteport,
@ -211,31 +249,35 @@ extern "C"
auto ep = router->hiddenServiceContext().GetEndpointByName(endpoint);
if (ep == nullptr)
{
promise.set_value(stream_error(EHOSTUNREACH));
stream_error(result, ENOTSUP);
promise.set_value();
return;
}
auto* quic = ep->GetQUICTunnel();
if (quic == nullptr)
{
promise.set_value(stream_error(ENOTSUP));
stream_error(result, ENOTSUP);
promise.set_value();
return;
}
try
{
auto [addr, id] = quic->open(
remotehost, remoteport, [](auto&&) {}, localAddr);
remotehost, remoteport, [](auto) {}, localAddr);
auto [host, port] = split_host_port(addr.toString());
ctx->outbound_stream(id);
promise.set_value(stream_okay(host, port, id));
stream_okay(result, host, port, id);
}
catch (std::exception& ex)
{
promise.set_value(stream_error(ECANCELED));
std::cout << ex.what() << std::endl;
stream_error(result, ECANCELED);
}
catch (int err)
{
promise.set_value(stream_error(err));
stream_error(result, err);
}
promise.set_value();
};
ctx->impl->CallSafe([call]() {
@ -255,17 +297,16 @@ extern "C"
if (auto status = future.wait_for(std::chrono::seconds{10});
status == std::future_status::ready)
{
return future.get();
future.get();
}
else
{
promise.set_value(stream_error(ETIMEDOUT));
return future.get();
stream_error(result, ETIMEDOUT);
}
}
catch (std::exception& ex)
{
return stream_error(EBADF);
stream_error(result, EBADF);
}
}

Loading…
Cancel
Save