pull/606/head
Jeff Becker 5 years ago
parent d3a98db267
commit 5d388bc9f2
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -49,15 +49,11 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
{
ev->update_time();
ev->tick(EV_TICK_INTERVAL);
if(ev->running())
{
ev->update_time();
logic->tick_async(ev->time_now());
llarp_threadpool_tick(tp);
// tick log stream at the VERY END of the tick cycle so that all logs
// flush
llarp::LogContext::Instance().logStream->Tick(ev->time_now());
}
ev->update_time();
logic->tick_async(ev->time_now());
llarp_threadpool_tick(tp);
ev->update_time();
llarp::LogContext::Instance().logStream->Tick(ev->time_now());
}
}

@ -257,6 +257,8 @@ namespace llarp
Mutex m_PendingMutex ACQUIRED_AFTER(m_AuthedLinksMutex);
Pending m_Pending GUARDED_BY(m_PendingMutex);
};
using LinkLayer_ptr = std::shared_ptr<ILinkLayer>;
} // namespace llarp
#endif

@ -41,10 +41,10 @@ struct TryConnectJob
{
llarp_time_t lastAttempt = 0;
const llarp::RouterContact rc;
llarp::ILinkLayer *link;
llarp::LinkLayer_ptr link;
llarp::Router *router;
uint16_t triesLeft;
TryConnectJob(const llarp::RouterContact &remote, llarp::ILinkLayer *l,
TryConnectJob(const llarp::RouterContact &remote, llarp::LinkLayer_ptr l,
uint16_t tries, llarp::Router *r)
: rc(remote), link(l), router(r), triesLeft(tries)
{
@ -116,10 +116,8 @@ struct TryConnectJob
};
static void
on_try_connecting(void *u)
on_try_connecting(std::shared_ptr<TryConnectJob> j)
{
TryConnectJob *j = static_cast< TryConnectJob * >(u);
if(j->Attempt())
j->router->pendingEstablishJobs.erase(j->rc.pubkey);
}
@ -193,15 +191,13 @@ namespace llarp
{
if(!link->IsCompatable(remote))
continue;
std::unique_ptr< TryConnectJob > j = std::make_unique< TryConnectJob >(
remote, link.get(), numretries, this);
auto itr = pendingEstablishJobs.emplace(remote.pubkey, std::move(j));
std::shared_ptr< TryConnectJob > job = std::make_shared< TryConnectJob >(
remote, link, numretries, this);
auto itr = pendingEstablishJobs.emplace(remote.pubkey, job);
if(itr.second)
{
// only try establishing if we inserted a new element
TryConnectJob *job = itr.first->second.get();
// try establishing async
_logic->queue_job({job, &on_try_connecting});
_logic->queue_func(std::bind(&on_try_connecting, job));
return true;
}
else
@ -1973,12 +1969,12 @@ namespace llarp
if(outboundLinks.size() > 0)
return true;
static std::list< std::function< std::unique_ptr< ILinkLayer >(Router *) > >
static std::list< std::function< LinkLayer_ptr(Router *) > >
linkFactories = {utp::NewServerFromRouter, iwp::NewServerFromRouter};
for(const auto &factory : linkFactories)
{
std::unique_ptr< ILinkLayer > link = factory(this);
auto link = factory(this);
if(!link)
continue;
if(!link->EnsureKeys(transport_keyfile.string().c_str()))

@ -279,7 +279,6 @@ namespace llarp
std::string lokidRPCUser = "";
std::string lokidRPCPassword = "";
using LinkLayer_ptr = std::unique_ptr< ILinkLayer >;
using LinkSet = std::set< LinkLayer_ptr, ComparePtr< LinkLayer_ptr > >;
LinkSet outboundLinks;
@ -298,7 +297,7 @@ namespace llarp
std::unordered_map< RouterID, RouterContact, RouterID::Hash > validRouters;
// pending establishing session with routers
std::unordered_map< RouterID, std::unique_ptr< TryConnectJob >,
std::unordered_map< RouterID, std::shared_ptr< TryConnectJob >,
RouterID::Hash >
pendingEstablishJobs;

@ -55,10 +55,10 @@ namespace llarp
llarp_timer_run(this->timer, this->thread);
}
void
bool
Logic::queue_func(std::function< void(void) > f)
{
this->thread->QueueFunc(f);
return this->thread->QueueFunc(f);
}
uint32_t

@ -43,7 +43,7 @@ namespace llarp
void
queue_job(struct llarp_thread_job job);
void
bool
queue_func(std::function< void(void) > func);
uint32_t

@ -58,7 +58,12 @@ llarp_threadpool_queue_job(struct llarp_threadpool *pool,
struct llarp_thread_job job)
{
if(pool->impl)
pool->impl->addJob(std::bind(job.work, job.user));
{
while(!pool->impl->tryAddJob(std::bind(job.work, job.user)))
{
::usleep(1000);
}
}
else
{
// single threaded mode

@ -33,15 +33,16 @@ struct llarp_threadpool
return jobs.size();
}
void
bool
QueueFunc(std::function< void(void) > f) LOCKS_EXCLUDED(m_access)
{
if(impl)
impl->addJob(f);
return impl->tryAddJob(f);
else
{
llarp::util::Lock lock(&m_access);
jobs.emplace(f);
return true;
}
}
};

@ -176,9 +176,11 @@ llarp_timer_stop(struct llarp_timer_context* t)
{
// destroy all timers
// don't call callbacks on timers
llarp::util::Lock lock(&t->timersMutex);
t->timers.clear();
t->stop();
{
llarp::util::Lock lock(&t->timersMutex);
t->timers.clear();
t->stop();
}
if(t->ticker)
t->ticker->SignalAll();
}

@ -9,18 +9,17 @@ namespace llarp
{
using namespace std::placeholders;
std::unique_ptr< ILinkLayer >
LinkLaye_ptr
NewServer(Crypto* crypto, const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed)
{
return std::unique_ptr< ILinkLayer >(
new LinkLayer(crypto, routerEncSecret, getrc, h, sign, est, reneg,
timeout, closed));
return std::make_shared< LinkLayer >(crypto, routerEncSecret, getrc, h, sign, est, reneg,
timeout, closed);
}
std::unique_ptr< ILinkLayer >
LinkLayer_ptr
NewServerFromRouter(AbstractRouter* r)
{
using namespace std::placeholders;

@ -10,13 +10,13 @@ namespace llarp
namespace utp
{
std::unique_ptr< ILinkLayer >
LinkLayer_ptr
NewServer(Crypto* crypto, const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed);
std::unique_ptr< ILinkLayer >
LinkLayer_ptr
NewServerFromRouter(AbstractRouter* r);
} // namespace utp
} // namespace llarp

Loading…
Cancel
Save