make threadpool consice

pull/686/head
Jeff Becker 5 years ago
parent 0eb6431eb1
commit b9bcc2b775
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -14,7 +14,6 @@
struct llarp_ev_loop;
struct llarp_nodedb;
struct llarp_nodedb_iter;
struct llarp_threadpool;
namespace llarp
{
@ -25,6 +24,10 @@ namespace llarp
struct CryptoManager;
struct MetricsConfig;
struct RouterContact;
namespace thread
{
struct ThreadPool;
}
namespace metrics
{
@ -54,7 +57,7 @@ namespace llarp
std::unique_ptr< Crypto > crypto;
std::unique_ptr< CryptoManager > cryptoManager;
std::unique_ptr< AbstractRouter > router;
std::unique_ptr< llarp_threadpool > worker;
std::shared_ptr< thread::ThreadPool > worker;
std::shared_ptr< Logic > logic;
std::unique_ptr< Config > config;
std::unique_ptr< llarp_nodedb > nodedb;

@ -116,11 +116,10 @@ main(ABSL_ATTRIBUTE_UNUSED int argc, ABSL_ATTRIBUTE_UNUSED char* argv[])
absl::SetMutexDeadlockDetectionMode(absl::OnDeadlockCycle::kAbort);
#endif
llarp::SetLogLevel(llarp::eLogDebug);
llarp_threadpool* threadpool = llarp_init_same_process_threadpool();
// Now that libuv is the single non-Windows event loop impl, we can
// go back to using the normal function
llarp_ev_loop_ptr loop = llarp_make_ev_loop();
auto logic = std::make_shared< llarp::Logic >(threadpool);
auto logic = std::make_shared< llarp::Logic >();
sockaddr_in addr;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = htons(1222);
@ -135,7 +134,7 @@ main(ABSL_ATTRIBUTE_UNUSED int argc, ABSL_ATTRIBUTE_UNUSED char* argv[])
{
client.RunAsync(loop, a.ToString());
client.DoDemoRequest();
llarp_ev_loop_run_single_process(loop, threadpool, logic);
llarp_ev_loop_run_single_process(loop, logic);
return 0;
}
else

@ -62,8 +62,8 @@ namespace llarp
// Router config
if(!singleThreaded && config->router.workerThreads > 0 && !worker)
{
worker.reset(
llarp_init_threadpool(config->router.workerThreads, "llarp-worker"));
worker = std::make_shared< llarp::thread::ThreadPool >(
config->router.workerThreads, 1024);
}
if(singleThreaded)
@ -212,21 +212,7 @@ __ ___ ____ _ _ ___ _ _ ____
llarp::LogInfo(LLARP_VERSION, " ", LLARP_RELEASE_MOTTO);
llarp::LogInfo("starting up");
mainloop = llarp_make_ev_loop();
// ensure worker thread pool
if(!worker && !singleThreaded)
worker.reset(llarp_init_threadpool(2, "llarp-worker"));
else if(singleThreaded)
{
llarp::LogInfo("running in single threaded mode");
worker.reset(llarp_init_same_process_threadpool());
}
// ensure netio thread
if(singleThreaded)
{
logic = std::make_shared< Logic >(worker.get());
}
else
logic = std::make_shared< Logic >();
logic = std::make_shared< Logic >();
if(debug)
{
@ -257,7 +243,7 @@ __ ___ ____ _ _ ___ _ _ ____
}
cryptoManager = std::make_unique< CryptoManager >(crypto.get());
router = std::make_unique< Router >(worker.get(), mainloop, logic);
router = std::make_unique< Router >(worker, mainloop, logic);
if(!router->Configure(config.get()))
{
llarp::LogError("Failed to configure router");
@ -288,7 +274,7 @@ __ ___ ____ _ _ ___ _ _ ____
// run net io thread
llarp::LogInfo("running mainloop");
llarp_ev_loop_run_single_process(mainloop, worker.get(), logic);
llarp_ev_loop_run_single_process(mainloop, logic);
// waits for router graceful stop
return 0;
}
@ -390,17 +376,13 @@ __ ___ ____ _ _ ___ _ _ ____
{
llarp::LogDebug("stop workers");
if(worker)
llarp_threadpool_stop(worker.get());
llarp::LogDebug("join workers");
if(worker)
llarp_threadpool_join(worker.get());
worker->stop();
llarp::LogDebug("free config");
config.release();
llarp::LogDebug("free workers");
worker.release();
worker.reset();
llarp::LogDebug("free nodedb");
nodedb.release();

@ -79,12 +79,12 @@ namespace llarp
EncryptedFrame target;
void
AsyncDecrypt(llarp_threadpool* worker, const EncryptedFrame& frame,
User_ptr u)
AsyncDecrypt(const std::shared_ptr< thread::ThreadPool >& worker,
const EncryptedFrame& frame, User_ptr u)
{
target = frame;
user = u;
llarp_threadpool_queue_job(worker, {this, &Decrypt});
worker->addJob(std::bind(&Decrypt, this));
}
};
} // namespace llarp

@ -34,7 +34,6 @@ llarp_make_ev_loop()
void
llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
struct llarp_threadpool *tp,
std::shared_ptr< llarp::Logic > logic)
{
while(ev->running())
@ -45,7 +44,7 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
{
ev->update_time();
logic->tick_async(ev->time_now());
llarp_threadpool_tick(tp);
llarp_threadpool_tick(logic->thread);
}
llarp::LogContext::Instance().logStream->Tick(ev->time_now());
}

@ -59,7 +59,6 @@ llarp_make_ev_loop();
// run mainloop
void
llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
struct llarp_threadpool *tp,
std::shared_ptr< llarp::Logic > logic);
/// get the current time on the event loop

@ -443,8 +443,7 @@ llarp_nodedb_async_verify(struct llarp_async_verify_rc *job)
{
// switch to crypto threadpool and continue with
// crypto_threadworker_verifyrc
llarp_threadpool_queue_job(job->cryptoworker,
{job, &crypto_threadworker_verifyrc});
job->cryptoworker->addJob(std::bind(&crypto_threadworker_verifyrc, job));
}
// disabled for now

@ -44,7 +44,7 @@ struct llarp_nodedb_iter
struct llarp_nodedb
{
explicit llarp_nodedb(llarp::thread::ThreadPool *diskworker)
explicit llarp_nodedb(std::shared_ptr< llarp::thread::ThreadPool > diskworker)
: disk(diskworker)
{
}
@ -54,7 +54,7 @@ struct llarp_nodedb
Clear();
}
llarp::thread::ThreadPool *disk;
std::shared_ptr< llarp::thread::ThreadPool > disk;
mutable llarp::util::Mutex access; // protects entries
struct NetDBEntry
@ -178,8 +178,8 @@ struct llarp_async_verify_rc
llarp_nodedb *nodedb;
// llarp::Logic for queue_job
std::shared_ptr< llarp::Logic > logic; // includes a llarp_threadpool
llarp_threadpool *cryptoworker;
llarp::thread::ThreadPool *diskworker;
std::shared_ptr< llarp::thread::ThreadPool > cryptoworker;
std::shared_ptr< llarp::thread::ThreadPool > diskworker;
/// router contact
llarp::RouterContact rc;

@ -25,7 +25,7 @@ namespace llarp
return m_AllowTransit;
}
llarp_threadpool*
std::shared_ptr< thread::ThreadPool >
PathContext::Worker()
{
return m_Router->threadpool();

@ -129,7 +129,7 @@ namespace llarp
}
};
llarp_threadpool*
std::shared_ptr< thread::ThreadPool >
Worker();
std::shared_ptr< Logic >

@ -23,9 +23,9 @@ namespace llarp
using Handler = std::function< void(const AsyncPathKeyExchangeContext&) >;
Handler result;
size_t idx = 0;
AbstractRouter* router = nullptr;
llarp_threadpool* worker = nullptr;
size_t idx = 0;
AbstractRouter* router = nullptr;
std::shared_ptr< thread::ThreadPool > worker;
std::shared_ptr< Logic > logic;
LR_CommitMessage LRCM;
@ -101,7 +101,7 @@ namespace llarp
else
{
// next hop
worker->QueueFunc(
worker->addJob(
std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, *this));
}
}
@ -109,7 +109,7 @@ namespace llarp
/// Generate all keys asynchronously and call handler when done
void
AsyncGenerateKeys(Path_t p, std::shared_ptr< Logic > l,
llarp_threadpool* pool, Handler func)
std::shared_ptr< thread::ThreadPool > pool, Handler func)
{
path = p;
logic = l;
@ -120,7 +120,7 @@ namespace llarp
{
LRCM.frames[i].Randomize();
}
pool->QueueFunc(
pool->addJob(
std::bind(&AsyncPathKeyExchangeContext::GenerateNextKey, *this));
}
};

@ -94,10 +94,10 @@ namespace llarp
virtual llarp_ev_loop_ptr
netloop() const = 0;
virtual llarp_threadpool *
virtual std::shared_ptr< thread::ThreadPool >
threadpool() = 0;
virtual thread::ThreadPool *
virtual std::shared_ptr< thread::ThreadPool >
diskworker() = 0;
virtual service::Context &

@ -207,15 +207,15 @@ namespace llarp
return async_verify_RC(s->GetRemoteRC());
}
Router::Router(struct llarp_threadpool *_tp, llarp_ev_loop_ptr __netloop,
std::shared_ptr< Logic > l)
Router::Router(std::shared_ptr< llarp::thread::ThreadPool > _tp,
llarp_ev_loop_ptr __netloop, std::shared_ptr< Logic > l)
: ready(false)
, _netloop(__netloop)
, tp(_tp)
, cryptoworker(_tp)
, _logic(l)
, paths(this)
, _exitContext(this)
, disk(1, 1000)
, disk(std::make_shared< llarp::thread::ThreadPool >(1, 1000))
, _dht(llarp_dht_context_new(this))
, inbound_link_msg_parser(this)
, _hiddenServiceContext(this)
@ -517,8 +517,8 @@ namespace llarp
llarp_ev_loop_stop(_netloop);
inboundLinks.clear();
outboundLinks.clear();
disk.stop();
disk.shutdown();
disk->stop();
disk->shutdown();
}
void
@ -1405,8 +1405,8 @@ namespace llarp
job->nodedb = _nodedb;
job->logic = _logic;
job->cryptoworker = tp;
job->diskworker = &disk;
job->cryptoworker = cryptoworker;
job->diskworker = disk;
if(rc.IsPublicRouter())
job->hook = &Router::on_verify_server_rc;
else
@ -1492,8 +1492,8 @@ namespace llarp
LogInfo("RPC Caller to ", lokidRPCAddr, " started");
}
llarp_threadpool_start(tp);
disk.start();
cryptoworker->start();
disk->start();
for(const auto &rc : bootstrapRCList)
{

@ -156,16 +156,16 @@ namespace llarp
return _netloop;
}
llarp_threadpool *
std::shared_ptr< llarp::thread::ThreadPool >
threadpool() override
{
return tp;
return cryptoworker;
}
thread::ThreadPool *
std::shared_ptr< llarp::thread::ThreadPool >
diskworker() override
{
return &disk;
return disk;
}
// our ipv4 public setting
@ -174,13 +174,13 @@ namespace llarp
AddressInfo addrInfo;
llarp_ev_loop_ptr _netloop;
llarp_threadpool *tp;
std::shared_ptr< llarp::thread::ThreadPool > cryptoworker;
std::shared_ptr< Logic > _logic;
path::PathContext paths;
exit::Context _exitContext;
SecretKey _identity;
SecretKey _encryption;
thread::ThreadPool disk;
std::shared_ptr< thread::ThreadPool > disk;
llarp_dht_context *_dht = nullptr;
llarp_nodedb *_nodedb;
llarp_time_t _startedAt;
@ -301,8 +301,8 @@ namespace llarp
// set to max value right now
std::unordered_map< RouterID, llarp_time_t, PubKey::Hash > lokinetRouters;
Router(struct llarp_threadpool *tp, llarp_ev_loop_ptr __netloop,
std::shared_ptr< Logic > logic);
Router(std::shared_ptr< llarp::thread::ThreadPool > worker,
llarp_ev_loop_ptr __netloop, std::shared_ptr< Logic > logic);
~Router();

@ -696,8 +696,7 @@ namespace llarp
m_IsolatedNetLoop = llarp_make_ev_loop();
m_IsolatedLogic = std::make_shared< llarp::Logic >();
if(SetupNetworking())
llarp_ev_loop_run_single_process(
m_IsolatedNetLoop, m_IsolatedLogic->thread, m_IsolatedLogic);
llarp_ev_loop_run_single_process(m_IsolatedNetLoop, m_IsolatedLogic);
else
{
m_IsolatedNetLoop.reset();
@ -1282,7 +1281,7 @@ namespace llarp
return m_IsolatedLogic ? m_IsolatedLogic : m_Router->logic();
}
llarp_threadpool*
std::shared_ptr< llarp::thread::ThreadPool >
Endpoint::CryptoWorker()
{
return m_Router->threadpool();

@ -129,7 +129,7 @@ namespace llarp
EndpointNetLoop();
/// crypto worker threadpool
llarp_threadpool*
std::shared_ptr< llarp::thread::ThreadPool >
CryptoWorker();
AbstractRouter*

@ -173,8 +173,8 @@ namespace llarp
ex->msg.PutBuffer(payload);
ex->msg.introReply = path->intro;
ex->frame.F = ex->msg.introReply.pathID;
llarp_threadpool_queue_job(m_Endpoint->CryptoWorker(),
{ex, &AsyncKeyExchange::Encrypt});
m_Endpoint->CryptoWorker()->addJob(
std::bind(&AsyncKeyExchange::Encrypt, ex));
}
std::string

@ -368,11 +368,10 @@ namespace llarp
}
bool
ProtocolFrame::AsyncDecryptAndVerify(std::shared_ptr< Logic > logic,
path::Path_ptr recvPath,
llarp_threadpool* worker,
const Identity& localIdent,
IDataHandler* handler) const
ProtocolFrame::AsyncDecryptAndVerify(
std::shared_ptr< Logic > logic, path::Path_ptr recvPath,
const std::shared_ptr< llarp::thread::ThreadPool >& worker,
const Identity& localIdent, IDataHandler* handler) const
{
auto msg = std::make_shared< ProtocolMessage >();
if(T.IsZero())
@ -382,8 +381,7 @@ namespace llarp
auto dh = new AsyncFrameDecrypt(logic, localIdent, handler, msg, *this,
recvPath->intro);
dh->path = recvPath;
llarp_threadpool_queue_job(worker, {dh, &AsyncFrameDecrypt::Work});
return true;
return worker->addJob(std::bind(&AsyncFrameDecrypt::Work, dh));
}
SharedSecret shared;
if(!handler->GetCachedSessionKeyFor(T, shared))

@ -126,10 +126,10 @@ namespace llarp
Sign(const Identity& localIdent);
bool
AsyncDecryptAndVerify(std::shared_ptr< Logic > logic,
path::Path_ptr fromPath, llarp_threadpool* worker,
const Identity& localIdent,
IDataHandler* handler) const;
AsyncDecryptAndVerify(
std::shared_ptr< Logic > logic, path::Path_ptr fromPath,
const std::shared_ptr< llarp::thread::ThreadPool >& worker,
const Identity& localIdent, IDataHandler* handler) const;
bool
DecryptPayloadInto(const SharedSecret& sharedkey,

@ -13,8 +13,9 @@ namespace llarp
fflush(f);
}
} // namespace
FileLogStream::FileLogStream(thread::ThreadPool *disk, FILE *f,
llarp_time_t flushInterval, bool closeFile)
FileLogStream::FileLogStream(std::shared_ptr< thread::ThreadPool > disk,
FILE *f, llarp_time_t flushInterval,
bool closeFile)
: m_Disk(disk)
, m_File(f)
, m_FlushInterval(flushInterval)

@ -12,8 +12,8 @@ namespace llarp
/// flushable file based log stream
struct FileLogStream : public ILogStream
{
FileLogStream(thread::ThreadPool* disk, FILE* f, llarp_time_t flushInterval,
bool closefile = true);
FileLogStream(std::shared_ptr< thread::ThreadPool > disk, FILE* f,
llarp_time_t flushInterval, bool closefile = true);
~FileLogStream();
@ -42,7 +42,7 @@ namespace llarp
void
FlushLinesToDisk(llarp_time_t now);
thread::ThreadPool* m_Disk;
std::shared_ptr< thread::ThreadPool > m_Disk;
FILE* const m_File;
const llarp_time_t m_FlushInterval;
llarp_time_t m_LastFlush = 0;

@ -7,8 +7,8 @@ namespace llarp
{
struct JSONLogStream : public FileLogStream
{
JSONLogStream(thread::ThreadPool* disk, FILE* f, llarp_time_t flushInterval,
bool closeFile)
JSONLogStream(std::shared_ptr< thread::ThreadPool > disk, FILE* f,
llarp_time_t flushInterval, bool closeFile)
: FileLogStream(disk, f, flushInterval, closeFile)
{
}

@ -58,7 +58,7 @@ namespace llarp
bool
Logic::queue_func(std::function< void(void) > f)
{
size_t left = 1000;
size_t left = 10;
while(!this->thread->QueueFunc(f))
{
// our queue is full
@ -71,7 +71,7 @@ namespace llarp
{
// wait a bit and retry queuing because we are not in the same thread as
// we are calling the jobs in
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
left--;
if(left == 0) // too many retries
@ -105,7 +105,7 @@ namespace llarp
bool
Logic::can_flush() const
{
return ourPID && ourPID == ::getpid();
return ourID == std::this_thread::get_id();
}
} // namespace llarp

@ -12,17 +12,12 @@ namespace llarp
public:
struct llarp_threadpool* thread;
struct llarp_timer_context* timer;
const pid_t ourPID;
const std::thread::id ourID;
Logic()
: thread(llarp_init_same_process_threadpool())
, timer(llarp_init_timer())
, ourPID(::getpid())
{
}
Logic(struct llarp_threadpool* tp)
: thread(tp), timer(llarp_init_timer()), ourPID(0)
, ourID(std::this_thread::get_id())
{
}

@ -56,7 +56,7 @@ struct EventLoopTest : public ::testing::Test
void
RunLoop()
{
llarp_ev_loop_run_single_process(loop, _logic->thread, _logic);
llarp_ev_loop_run_single_process(loop,_logic);
}
};

@ -149,7 +149,7 @@ struct LinkLayerTest : public test::LlarpTest< NoOpCrypto >
RunMainloop()
{
m_logic->call_later({5000, this, &OnTimeout});
llarp_ev_loop_run_single_process(netLoop, m_logic->thread, m_logic);
llarp_ev_loop_run_single_process(netLoop, m_logic);
}
void

@ -47,10 +47,9 @@ struct AbyssTestBase : public ::testing::Test
void
Start()
{
threadpool = llarp_init_same_process_threadpool();
loop = llarp_make_ev_loop();
logic = std::make_shared< llarp::Logic >(threadpool);
logic = std::make_shared< llarp::Logic >();
threadpool = logic->thread;
sockaddr_in addr;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = htons((llarp::randint() % 2000) + 2000);
@ -177,7 +176,7 @@ struct AbyssTest : public AbyssTestBase,
void
RunLoop()
{
llarp_ev_loop_run_single_process(loop, threadpool, logic);
llarp_ev_loop_run_single_process(loop, logic);
}
};

Loading…
Cancel
Save