add threadpool for ntcp dh

pull/1122/head
Jeff Becker 6 years ago
parent b02464990b
commit 91fdb038d9
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -0,0 +1,82 @@
#ifndef CRYPTO_WORKER_H_
#define CRYPTO_WORKER_H_
#include <condition_variable>
#include <mutex>
#include <deque>
#include <thread>
#include <vector>
#include <pair>
#include <memory>
namespace i2p
{
namespace worker
{
template<typename Caller>
struct ThreadPool
{
typedef std::function<void(void)> ResultFunc;
typedef std::function<Result(void)> WorkFunc;
typedef std::pair<std::shared_ptr<Caller>, WorkFunc> Job;
typedef std::mutex mtx_t;
typedef std::unique_lock<mtx_t> lock_t;
typedef std::condition_variable cond_t;
ThreadPool(int workers)
{
stop = false;
if(workers > 0)
{
while(workers--)
{
threads.emplace_back([this] {
Job job;
for (;;)
{
{
lock_t lock(this->queue_mutex);
this->condition.wait(
lock, [this] { return this->stop || !this->jobs.empty(); });
if (this->stop && this->jobs.empty()) return;
job = std::move(this->jobs.front());
this->jobs.pop_front();
}
}
job.first->GetService().post(job.second());
});
}
}
};
void Offer(const Job & job)
{
{
lock_t lock(queue_mutex);
if (stop) return;
jobs.emplace_back(job);
}
condition.notify_one();
}
~ThreadPool()
{
{
lock_t lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(auto &t: threads) t.join();
}
std::vector<std::thread> threads;
std::deque<Job> jobs;
mtx_t queue_mutex;
cond_t condition;
bool stop;
};
}
}
#endif

@ -171,27 +171,14 @@ namespace transport
return;
}
}
#if ((__GNUC__ == 4) && (__GNUC_MINOR__ <= 7)) || defined(__NetBSD__)
// due the bug in gcc 4.7. std::shared_future.get() is not const
if (!m_DHKeysPair)
m_DHKeysPair = transports.GetNextDHKeysPair ();
CreateAESKey (m_Establisher->phase1.pubKey);
SendPhase2 ();
#else
// TODO: check for number of pending keys
auto s = shared_from_this ();
auto keyCreated = std::async (std::launch::async, [s] ()
{
m_Server.Work(s, [s]() -> std::function<void(void)> {
if (!s->m_DHKeysPair)
s->m_DHKeysPair = transports.GetNextDHKeysPair ();
s->CreateAESKey (s->m_Establisher->phase1.pubKey);
}).share ();
m_Server.GetService ().post ([s, keyCreated]()
{
keyCreated.get ();
s->SendPhase2 ();
return std::bind(&NTCPSession::SendPhase2, s);
});
#endif
}
}
@ -788,12 +775,14 @@ namespace transport
}
//-----------------------------------------
NTCPServer::NTCPServer ():
NTCPServer::NTCPServer (int workers):
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service),
m_TerminationTimer (m_Service), m_NTCPAcceptor (nullptr), m_NTCPV6Acceptor (nullptr),
m_ProxyType(eNoProxy), m_Resolver(m_Service), m_ProxyEndpoint(nullptr),
m_SoftLimit(0), m_HardLimit(0)
{
if(workers <= 0) workers = 1;
m_CryptoPool = std::make_shared<Pool>(workers);
}
NTCPServer::~NTCPServer ()

@ -12,6 +12,7 @@
#include "RouterInfo.h"
#include "I2NPProtocol.h"
#include "TransportSession.h"
#include "CryptoWorker.h"
namespace i2p
{
@ -131,6 +132,8 @@ namespace transport
{
public:
typedef i2p::worker::ThreadPool<NTCPSession> Pool;
enum RemoteAddressType
{
eIP4Address,
@ -146,7 +149,7 @@ namespace transport
};
NTCPServer ();
NTCPServer (int workers=4);
~NTCPServer ();
void Start ();
@ -193,6 +196,11 @@ namespace transport
void ScheduleTermination ();
void HandleTerminationTimer (const boost::system::error_code& ecode);
void Work(std::shared_ptr<NTCPSession> conn, Pool::WorkFunc work)
{
m_CryptoPool->Offer({conn, work});
}
private:
bool m_IsRunning;
@ -210,6 +218,8 @@ namespace transport
boost::asio::ip::tcp::resolver m_Resolver;
boost::asio::ip::tcp::endpoint * m_ProxyEndpoint;
std::shared_ptr<Pool> m_CryptoPool;
uint16_t m_SoftLimit, m_HardLimit;
public:

Loading…
Cancel
Save