use configurable number of net io threads

pull/1/head
Jeff Becker 6 years ago
parent bfaa837bce
commit 956973cb84
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -1,10 +1,15 @@
cmake_minimum_required(VERSION 2.8.10) cmake_minimum_required(VERSION 2.8.10)
set(DEBUG_FLAGS "-O1 -g -fsanitize=address -fno-omit-frame-pointer") set(DEBUG_FLAGS "")
if(ASAN)
set(DEBUG_FLAGS "${DEBUG_FLAGS} -O1 -g -fsanitize=address -fno-omit-frame-pointer")
endif(ASAN)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c11 -Wall -fPIC ${DEBUG_FLAGS}") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c11 -Wall -fPIC ${DEBUG_FLAGS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall -fPIC ${DEBUG_FLAGS}") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall -fPIC ${DEBUG_FLAGS}")
set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake ${CMAKE_MODULE_PATH}) set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake ${CMAKE_MODULE_PATH})
set(EXE llarpd) set(EXE llarpd)

@ -1,17 +1,20 @@
all: remove-build clean compile all: debug
remove-build: clean:
rm -f build.ninja rules.ninja cmake_install.cmake CMakeCache.txt rm -f build.ninja rules.ninja cmake_install.cmake CMakeCache.txt
rm -rf CMakeFiles rm -rf CMakeFiles
clean: build.ninja debug-configure: clean
ninja clean cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DASAN=true
build.ninja: configure: clean
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug cmake -GNinja -DCMAKE_BUILD_TYPE=Release
compile: build.ninja compile: configure
ninja
debug: debug-configure
ninja ninja
format: format:

@ -1,5 +1,6 @@
[router] [router]
threads=2 worker-threads=8
net-threads=2
contact-file=other.signed contact-file=other.signed
ident-privkey=server-ident.key ident-privkey=server-ident.key

@ -5,6 +5,9 @@
#include <string.h> #include <string.h>
#include <llarp/logger.hpp> #include <llarp/logger.hpp>
#include <thread>
#include <vector>
static void static void
progress() progress()
{ {
@ -15,10 +18,11 @@ progress()
struct llarp_main struct llarp_main
{ {
struct llarp_alloc mem; struct llarp_alloc mem;
int num_nethreads = 1;
std::vector< std::thread > netio_threads;
struct llarp_crypto crypto; struct llarp_crypto crypto;
struct llarp_router *router = nullptr; struct llarp_router *router = nullptr;
struct llarp_threadpool *worker = nullptr; struct llarp_threadpool *worker = nullptr;
struct llarp_threadpool *thread = nullptr;
struct llarp_logic *logic = nullptr; struct llarp_logic *logic = nullptr;
struct llarp_config *config = nullptr; struct llarp_config *config = nullptr;
struct llarp_nodedb *nodedb = nullptr; struct llarp_nodedb *nodedb = nullptr;
@ -68,9 +72,19 @@ struct llarp_main
progress(); progress();
llarp_free_logic(&logic); llarp_free_logic(&logic);
progress();
llarp_nodedb_free(&nodedb);
for(auto &t : netio_threads)
{
progress();
t.join();
}
progress(); progress();
netio_threads.clear();
printf("\n"); printf("stopped\n");
fflush(stdout); fflush(stdout);
return exitcode; return exitcode;
} }
@ -84,7 +98,7 @@ iter_main_config(struct llarp_config_iterator *itr, const char *section,
if(!strcmp(section, "router")) if(!strcmp(section, "router"))
{ {
if(!strcmp(key, "threads")) if(!strcmp(key, "worker-threads"))
{ {
int workers = atoi(val); int workers = atoi(val);
if(workers > 0 && m->worker == nullptr) if(workers > 0 && m->worker == nullptr)
@ -92,6 +106,12 @@ iter_main_config(struct llarp_config_iterator *itr, const char *section,
m->worker = llarp_init_threadpool(workers, "llarp-worker"); m->worker = llarp_init_threadpool(workers, "llarp-worker");
} }
} }
if(!strcmp(key, "net-threads"))
{
m->num_nethreads = atoi(val);
if(m->num_nethreads <= 0)
m->num_nethreads = 1;
}
} }
if(!strcmp(section, "netdb")) if(!strcmp(section, "netdb"))
{ {
@ -104,18 +124,25 @@ iter_main_config(struct llarp_config_iterator *itr, const char *section,
llarp_main *sllarp = nullptr; llarp_main *sllarp = nullptr;
void
run_net(void *user)
{
llarp_ev_loop_run(static_cast< llarp_ev_loop * >(user));
}
void void
handle_signal(int sig) handle_signal(int sig)
{ {
printf("\ninterrupted\n"); if(sllarp->logic)
llarp_ev_loop_stop(sllarp->mainloop);
llarp_logic_stop(sllarp->logic); llarp_logic_stop(sllarp->logic);
if(sllarp->mainloop)
llarp_ev_loop_stop(sllarp->mainloop);
if(sllarp)
{
for(auto &t : sllarp->netio_threads)
{
progress();
t.join();
}
progress();
sllarp->netio_threads.clear();
}
printf("\ninterrupted\n");
} }
int int
@ -149,7 +176,6 @@ main(int argc, char *argv[])
if(!sllarp->worker) if(!sllarp->worker)
sllarp->worker = llarp_init_threadpool(2, "llarp-worker"); sllarp->worker = llarp_init_threadpool(2, "llarp-worker");
// ensure netio thread // ensure netio thread
sllarp->thread = llarp_init_threadpool(1, "llarp-netio");
sllarp->logic = llarp_init_logic(mem); sllarp->logic = llarp_init_logic(mem);
sllarp->router = llarp_init_router(mem, sllarp->worker, sllarp->router = llarp_init_router(mem, sllarp->worker,
@ -160,9 +186,15 @@ main(int argc, char *argv[])
signal(SIGINT, handle_signal); signal(SIGINT, handle_signal);
llarp_run_router(sllarp->router); llarp_run_router(sllarp->router);
// run mainloop // run net io thread
llarp_threadpool_queue_job(sllarp->thread, auto netio = sllarp->mainloop;
{sllarp->mainloop, &run_net}); while(sllarp->num_nethreads--)
{
sllarp->netio_threads.emplace_back(
[netio]() { llarp_ev_loop_run(netio); });
pthread_setname_np(sllarp->netio_threads.back().native_handle(),
"llarp-netio");
}
llarp::Info(__FILE__, "running"); llarp::Info(__FILE__, "running");
sllarp->exitcode = 0; sllarp->exitcode = 0;
llarp_logic_mainloop(sllarp->logic); llarp_logic_mainloop(sllarp->logic);
@ -175,7 +207,10 @@ main(int argc, char *argv[])
} }
else else
llarp::Error(__FILE__, "no nodedb defined"); llarp::Error(__FILE__, "no nodedb defined");
return sllarp->shutdown(); auto code = sllarp->shutdown();
delete sllarp;
sllarp = nullptr;
return code;
} }
else else
llarp::Error(__FILE__, "failed to load config"); llarp::Error(__FILE__, "failed to load config");

@ -41,10 +41,6 @@ llarp_buffer_write(llarp_buffer_t *buff, const void *data, size_t sz);
bool bool
llarp_buffer_writef(llarp_buffer_t *buff, const char *fmt, ...); llarp_buffer_writef(llarp_buffer_t *buff, const char *fmt, ...);
/// read from file into buff using allocator "mem"
bool
llarp_buffer_readfile(llarp_buffer_t *buff, FILE *f, struct llarp_alloc *mem);
/// read buffer upto character delimiter /// read buffer upto character delimiter
size_t size_t
llarp_buffer_read_until(llarp_buffer_t *buff, char delim, byte_t *result, llarp_buffer_read_until(llarp_buffer_t *buff, char delim, byte_t *result,

@ -44,25 +44,6 @@ llarp_buffer_write(llarp_buffer_t* buff, const void* data, size_t sz)
return false; return false;
} }
bool
llarp_buffer_readfile(llarp_buffer_t* buff, FILE* f, llarp_alloc* mem)
{
ssize_t len;
fseek(f, 0, SEEK_END);
len = ftell(f);
rewind(f);
if(len > 0)
{
buff->base = static_cast< uint8_t* >(mem->alloc(mem, len, 8));
buff->cur = buff->base;
buff->sz = len;
ssize_t sz = fread(buff->base, len, 1, f);
rewind(f);
return sz == len;
}
return false;
}
size_t size_t
llarp_buffer_read_until(llarp_buffer_t* buff, char delim, byte_t* result, llarp_buffer_read_until(llarp_buffer_t* buff, char delim, byte_t* result,
size_t resultsize) size_t resultsize)

@ -85,7 +85,7 @@ struct llarp_epoll_loop : public llarp_ev_loop
byte_t readbuf[2048]; byte_t readbuf[2048];
do do
{ {
result = epoll_wait(epollfd, events, 1024, -1); result = epoll_wait(epollfd, events, 1024, 100);
if(result > 0) if(result > 0)
{ {
int idx = 0; int idx = 0;

@ -1,6 +1,8 @@
#include <llarp/nodedb.h> #include <llarp/nodedb.h>
#include <llarp/router_contact.h> #include <llarp/router_contact.h>
#include <fstream>
#include <map> #include <map>
#include "buffer.hpp"
#include "crypto.hpp" #include "crypto.hpp"
#include "fs.hpp" #include "fs.hpp"
#include "mem.hpp" #include "mem.hpp"
@ -9,11 +11,10 @@ static const char skiplist_subdirs[] = "0123456789ABCDEF";
struct llarp_nodedb struct llarp_nodedb
{ {
llarp_nodedb(llarp_alloc *m, llarp_crypto *c) : mem(m), crypto(c) llarp_nodedb(llarp_crypto *c) : crypto(c)
{ {
} }
llarp_alloc *mem;
llarp_crypto *crypto; llarp_crypto *crypto;
std::map< llarp::pubkey, llarp_rc * > entries; std::map< llarp::pubkey, llarp_rc * > entries;
@ -23,7 +24,7 @@ struct llarp_nodedb
auto itr = entries.begin(); auto itr = entries.begin();
while(itr != entries.end()) while(itr != entries.end())
{ {
mem->free(mem, itr->second); delete itr->second;
itr = entries.erase(itr); itr = entries.erase(itr);
} }
} }
@ -54,19 +55,27 @@ struct llarp_nodedb
bool bool
loadfile(const fs::path &fpath) loadfile(const fs::path &fpath)
{ {
llarp_buffer_t buff; std::ifstream f(fpath, std::ios::binary);
FILE *f = fopen(fpath.c_str(), "rb"); if(!f.is_open())
if(!f)
return false; return false;
if(!llarp_buffer_readfile(&buff, f, mem))
{ byte_t tmp[MAX_RC_SIZE];
fclose(f);
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
f.seekg(0, std::ios::end);
size_t sz = f.tellg();
f.seekg(0, std::ios::beg);
if(sz > buf.sz)
return false; return false;
}
fclose(f); // TODO: error checking
llarp_rc *rc = llarp::Alloc< llarp_rc >(mem); f.read((char *)buf.base, sz);
buf.sz = sz;
llarp_rc *rc = new llarp_rc;
llarp::Zero(rc, sizeof(llarp_rc)); llarp::Zero(rc, sizeof(llarp_rc));
if(llarp_rc_bdecode(rc, &buff)) if(llarp_rc_bdecode(rc, &buf))
{ {
if(llarp_rc_verify_sig(crypto, rc)) if(llarp_rc_verify_sig(crypto, rc))
{ {
@ -77,7 +86,7 @@ struct llarp_nodedb
} }
} }
llarp_rc_free(rc); llarp_rc_free(rc);
mem->free(mem, rc); delete rc;
return false; return false;
} }
@ -99,11 +108,7 @@ extern "C" {
struct llarp_nodedb * struct llarp_nodedb *
llarp_nodedb_new(struct llarp_alloc *mem, struct llarp_crypto *crypto) llarp_nodedb_new(struct llarp_alloc *mem, struct llarp_crypto *crypto)
{ {
void *ptr = return new llarp_nodedb(crypto);
mem->alloc(mem, sizeof(llarp_nodedb), llarp::alignment< llarp_nodedb >());
if(!ptr)
return nullptr;
return new(ptr) llarp_nodedb(mem, crypto);
} }
void void
@ -111,10 +116,8 @@ llarp_nodedb_free(struct llarp_nodedb **n)
{ {
if(*n) if(*n)
{ {
struct llarp_alloc *mem = (*n)->mem;
(*n)->Clear(); (*n)->Clear();
(*n)->~llarp_nodedb(); delete *n;
mem->free(mem, *n);
} }
*n = nullptr; *n = nullptr;
} }

@ -64,7 +64,7 @@ namespace llarp
// don't allow enqueueing after stopping the pool // don't allow enqueueing after stopping the pool
if(stop) if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool"); return;
jobs.emplace_back(job); jobs.emplace_back(job);
} }
@ -131,7 +131,11 @@ llarp_threadpool_queue_job(struct llarp_threadpool *pool,
void void
llarp_free_threadpool(struct llarp_threadpool **pool) llarp_free_threadpool(struct llarp_threadpool **pool)
{ {
if(*pool)
{
(*pool)->impl.Join();
delete *pool; delete *pool;
}
*pool = nullptr; *pool = nullptr;
} }
} }

@ -159,12 +159,12 @@ llarp_timer_stop(struct llarp_timer_context* t)
{ {
std::unique_lock< std::mutex > lock(t->timersMutex); std::unique_lock< std::mutex > lock(t->timersMutex);
// destroy all timers
// don't call callbacks on timers
auto itr = t->timers.begin(); auto itr = t->timers.begin();
while(itr != t->timers.end()) while(itr != t->timers.end())
{ {
// timer expired itr = t->timers.erase(itr);
llarp_threadpool_queue_job(t->threadpool, itr->second);
++itr;
} }
} }
} }

Loading…
Cancel
Save