diff --git a/include/llarp/ev.h b/include/llarp/ev.h index 1e49978a9..fd9d04615 100644 --- a/include/llarp/ev.h +++ b/include/llarp/ev.h @@ -3,6 +3,7 @@ #include #include +#include #include #ifdef __cplusplus @@ -35,14 +36,19 @@ extern "C" { struct llarp_ev_job { + /** the loop this job belongs to */ struct llarp_ev_loop * loop; + /** user data */ void * user; /** work is called async when ready in the event loop thread */ void (*work)(struct llarp_ev_job *); }; - /** call work async in event loop thread (thread safe) */ - void llarp_ev_async(struct llarp_ev_loop * ev, struct llarp_ev_job job); + /** + call work async in event loop thread (thread safe) + return true if we queued the job otherwise return false + */ + bool llarp_ev_async(struct llarp_ev_loop * ev, struct llarp_ev_job job); #ifdef __cplusplus } diff --git a/include/llarp/threadpool.h b/include/llarp/threadpool.h index 943a89479..c3b92703d 100644 --- a/include/llarp/threadpool.h +++ b/include/llarp/threadpool.h @@ -13,10 +13,14 @@ extern "C" { /** job to be done in worker thread */ struct llarp_thread_job { - /** calls result async after work is executed */ + /** + called async after work is executed + */ struct llarp_ev_job * result; + /** user data to pass to work function */ + void * user; /** called in threadpool worker thread */ - void (*work)(struct llarp_thread_job *); + void (*work)(void *); }; void llarp_threadpool_queue_job(struct llarp_threadpool * tp, struct llarp_thread_job j); diff --git a/llarp/ev.cpp b/llarp/ev.cpp index 50dad3f0f..3b06d7755 100644 --- a/llarp/ev.cpp +++ b/llarp/ev.cpp @@ -27,6 +27,11 @@ namespace llarp { return llarp_g_mem.alloc(sz, alignment()); } + + static void operator delete(void * ptr) + { + llarp_g_mem.free(ptr); + } uv_udp_t _handle; struct llarp_udp_listener * listener; @@ -65,11 +70,29 @@ namespace llarp { udp_listener * l = static_cast(handle->data); l->closed(); - llarp_g_mem.free(l); + delete l; } } +namespace llarp +{ + + static void ev_handle_async_closed(uv_handle_t * handle) + { + struct llarp_ev_job * ev = static_cast(handle->data); + llarp_g_mem.free(ev); + llarp_g_mem.free(handle); + } + + static void ev_handle_async(uv_async_t * handle) + { + struct llarp_ev_job * ev = static_cast(handle->data); + ev->work(ev); + uv_close((uv_handle_t *)handle, ev_handle_async_closed); + } +} + extern "C" { void llarp_ev_loop_alloc(struct llarp_ev_loop ** ev) { @@ -135,4 +158,27 @@ extern "C" { } return ret; } + + void llarp_ev_loop_stop(struct llarp_ev_loop * loop) + { + uv_stop(loop->loop()); + } + + bool llarp_ev_async(struct llarp_ev_loop * loop, struct llarp_ev_job job) + { + struct llarp_ev_job * job_copy = static_cast(llarp_g_mem.alloc(sizeof(struct llarp_ev_job), llarp::alignment())); + job_copy->work = job.work; + job_copy->loop = loop; + job_copy->user = job.user; + uv_async_t * async = static_cast(llarp_g_mem.alloc(sizeof(uv_async_t), llarp::alignment())); + async->data = job_copy; + if(uv_async_init(loop->loop(), async, llarp::ev_handle_async) == 0 && uv_async_send(async)) + return true; + else + { + llarp_g_mem.free(job_copy); + llarp_g_mem.free(async); + return false; + } + } } diff --git a/llarp/link.cpp b/llarp/link.cpp index e83e0d0b4..bc804e011 100644 --- a/llarp/link.cpp +++ b/llarp/link.cpp @@ -7,43 +7,30 @@ bool operator < (const sockaddr_in6 addr0, const sockaddr_in6 addr1) return memcmp(addr0.sin6_addr.s6_addr, addr1.sin6_addr.s6_addr, sizeof(addr0.sin6_addr)) && addr0.sin6_port < addr1.sin6_port; } -namespace llarp -{ +extern "C"{ + struct llarp_link * llarp_link_alloc() + { + return new llarp_link; + } - static void link_recv_from(struct llarp_udp_listener * l, const struct sockaddr * src, char * buff, ssize_t sz) + void llarp_link_free(struct llarp_link ** l) { - if(src && src->sa_family == AF_INET6) - { - Link * link = static_cast(l->user); - struct sockaddr_in6 remote; - memcpy(&remote, src, sizeof(sockaddr_in6)); - auto itr = link->sessions.find(remote); - if(itr == link->sessions.end()) - { - link->sessions[remote] = std::make_shared(link->_crypto, remote); - } - link->sessions[remote]->RecvFrom(buff, sz); - } + if(*l) delete *l; + *l =nullptr; } - - Link::Link(llarp_crypto * crypto) : _crypto(crypto) + + struct llarp_udp_listener * llarp_link_udp_listener(struct llarp_link *l) { - _listener.user = this; - _listener.recvfrom = link_recv_from; + return &l->listener; } - - PeerSession::PeerSession(llarp_crypto * crypto, sockaddr_in6 remote) : - lastRX(0), - remoteAddr(remote), - _crypto(crypto), - state(eHandshakeInboundInit) + bool llarp_link_configure(struct llarp_link * link, const char * ifname, int af) { - memset(sessionKey, 0, sizeof(sessionKey)); + return false; } - void PeerSession::RecvFrom(const char * buff, ssize_t sz) + void llarp_link_stop(struct llarp_link * link) { - lastRX = llarp_time_now_ms(); + llarp_ev_close_udp_listener(&link->listener); } } diff --git a/llarp/link.hpp b/llarp/link.hpp index 0752b530d..63e38a714 100644 --- a/llarp/link.hpp +++ b/llarp/link.hpp @@ -9,66 +9,25 @@ #include #include +#include "mem.hpp" -namespace llarp + +struct llarp_link { - struct Link; - - struct PeerSession + static void * operator new(size_t sz) { - sockaddr_in6 remoteAddr; - llarp_rc rc; - llarp_sharedkey_t sessionKey; - - uint64_t lastRX; - - llarp_crypto * _crypto; - - enum State - { - eStateNULL, - eHandshakeInboundInit, - eHandshakeOutboundInit, - eHandshakeInboundRepliedInit, - eHandshakeOutboundGotReply, - eHandshakeInboundGotAck, - eHandshakeOutboundGotAck, - eEstablished, - eTimeout - }; - - State state; - - /** inbound session */ - PeerSession(llarp_crypto * crypto, sockaddr_in6 remote); - /** outbound session */ - PeerSession(llarp_crypto * crypto, llarp_rc rc); - - void SendTo(Link * link, const char * buff, std::size_t sz); - - void RecvFrom(const char * buff, ssize_t sz); - - - typedef std::shared_ptr Ptr; - }; + return llarp_g_mem.alloc(sz, llarp::alignment()); + } - struct Link + static void operator delete(void * ptr) { - typedef std::map Sessions; - - Sessions sessions; - llarp_seckey_t transportSecKey; - llarp_pubkey_t transportPubKey; - - llarp_crypto * _crypto; - - Link(llarp_crypto * crypto); - - llarp_udp_listener _listener; - - llarp_udp_listener * Listener() { return &_listener; } - - }; -} + llarp_g_mem.free(ptr); + } + + struct sockaddr_in6 localaddr; + int af; + llarp_udp_listener listener; +}; + #endif diff --git a/llarp/router.cpp b/llarp/router.cpp index d36ef0e2c..24b0b4f00 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -89,14 +89,17 @@ extern "C" { }); } + void llarp_stop_router(struct llarp_router * router) + { + router->Close(); + } + void llarp_free_router(struct llarp_router ** router) { if(*router) { - llarp_router * r = *router; - r->Close(); - r->ForEachLink([](llarp_link * link) { llarp_g_mem.free(link); }); - delete r; + (*router)->ForEachLink([](llarp_link * link) { llarp_g_mem.free(link); }); + delete *router; } *router = nullptr; } diff --git a/llarp/threadpool.cpp b/llarp/threadpool.cpp new file mode 100644 index 000000000..764b6df7e --- /dev/null +++ b/llarp/threadpool.cpp @@ -0,0 +1,96 @@ +#include "threadpool.hpp" +#include + +namespace llarp +{ + namespace thread + { + Pool::Pool(size_t workers) + { + stop.store(true); + while(workers--) + { + threads.emplace_back( + [this] + { + for(;;) + { + llarp_thread_job job; + { + lock_t lock(this->queue_mutex); + this->condition.wait(lock, [this]{ return this->stop || !this->jobs.empty(); }); + if(this->stop && this->jobs.empty()) + return; + job = std::move(this->jobs.front()); + this->jobs.pop(); + } + // do work + job.work(job.user); + // inform result if needed + if(job.result && job.result->loop) + if(!llarp_ev_async(job.result->loop, *job.result)) + { + std::cerr << "failed to queue result in thread worker" << std::endl; + } + } + }); + } + } + + void Pool::Join() + { + { + lock_t lock(queue_mutex); + stop.store(true); + } + condition.notify_all(); + for(auto & t : threads) + t.join(); + } + + void Pool::QueueJob(llarp_thread_job job) + { + { + lock_t lock(queue_mutex); + + // don't allow enqueueing after stopping the pool + if(stop) + throw std::runtime_error("enqueue on stopped ThreadPool"); + + jobs.emplace(job); + } + condition.notify_one(); + } + + } +} + +struct llarp_threadpool +{ + llarp::thread::Pool impl; + + llarp_threadpool(int workers) : impl(workers) {} +}; + +extern "C" { + + struct llarp_threadpool * llarp_init_threadpool(int workers) + { + if(workers > 0) + return new llarp_threadpool(workers); + else + return nullptr; + } + + void llarp_threadpool_join(struct llarp_threadpool * pool) + { + pool->impl.Join(); + } + + void llarp_free_threadpool(struct llarp_threadpool ** pool) + { + delete *pool; + *pool = nullptr; + } + +} diff --git a/llarp/threadpool.hpp b/llarp/threadpool.hpp new file mode 100644 index 000000000..b20bbb91b --- /dev/null +++ b/llarp/threadpool.hpp @@ -0,0 +1,36 @@ +#ifndef LLARP_THREADPOOL_HPP +#define LLARP_THREADPOOL_HPP + +#include + +#include +#include +#include +#include +#include +#include + +namespace llarp +{ + namespace thread + { + typedef std::mutex mtx_t; + typedef std::unique_lock lock_t; + struct Pool + { + + Pool(size_t sz); + void QueueJob(llarp_thread_job job); + void Join(); + std::vector threads; + std::queue jobs; + + mtx_t queue_mutex; + std::condition_variable condition; + std::atomic stop; + }; + + } +} + +#endif