on logic queue overflow put job on timer instead

pull/737/head
Jeff Becker 5 years ago
parent b8904ab0f4
commit b0406e1a76
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -58,28 +58,20 @@ namespace llarp
bool
Logic::queue_func(std::function< void(void) > f)
{
size_t left = 10;
while(!this->thread->QueueFunc(f))
if(!this->thread->QueueFunc(f))
{
// our queue is full
if(this->can_flush())
{
// we can flush the queue here so let's do it
this->tick(llarp::time_now_ms());
}
else
{
// wait a bit and retry queuing because we are not in the same thread as
// we are calling the jobs in
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
left--;
if(left == 0) // too many retries
return false;
// try calling it later if the job queue overflows
this->call_later(1, f);
}
return true;
}
void
Logic::call_later(llarp_time_t timeout, std::function< void(void) > func)
{
llarp_timer_call_func_later(this->timer, timeout, func);
}
uint32_t
Logic::call_later(const llarp_timeout_job& job)
{

@ -47,6 +47,9 @@ namespace llarp
uint32_t
call_later(const llarp_timeout_job& job);
void
call_later(llarp_time_t later, std::function< void(void) > func);
void
cancel_call(uint32_t id);

@ -18,6 +18,7 @@ namespace llarp
uint64_t started;
uint64_t timeout;
llarp_timer_handler_func func;
std::function< void(void) > deferredFunc;
bool done;
bool canceled;
@ -117,12 +118,24 @@ struct llarp_timer_context
{
llarp::util::Lock lock(&timersMutex);
uint32_t id = ++currentId;
const uint32_t id = ++currentId;
timers.emplace(
id, std::make_unique< llarp::timer >(m_Now, timeout_ms, user, func));
return id;
}
uint32_t
call_func_later(std::function< void(void) > func, llarp_time_t timeout)
{
llarp::util::Lock lock(&timersMutex);
const uint32_t id = ++currentId;
timers.emplace(
id, std::make_unique< llarp::timer >(m_Now, timeout, nullptr, nullptr));
timers[id]->deferredFunc = func;
return id;
}
void
cancel_all() LOCKS_EXCLUDED(timersMutex)
{
@ -157,6 +170,13 @@ llarp_timer_call_later(struct llarp_timer_context* t,
return t->call_later(job.user, job.handler, job.timeout);
}
uint32_t
llarp_timer_call_func_later(struct llarp_timer_context* t, llarp_time_t timeout,
std::function< void(void) > func)
{
return t->call_func_later(func, timeout);
}
void
llarp_free_timer(struct llarp_timer_context** t)
{
@ -285,6 +305,8 @@ namespace llarp
else
call(user, timeout, diff);
}
if(deferredFunc)
deferredFunc();
done = true;
}
} // namespace llarp

@ -27,6 +27,10 @@ uint32_t
llarp_timer_call_later(struct llarp_timer_context *t,
struct llarp_timeout_job job);
uint32_t
llarp_timer_call_func_later(llarp_timer_context *t, llarp_time_t timeout,
std::function< void(void) > func);
void
llarp_timer_cancel_job(struct llarp_timer_context *t, uint32_t id);

@ -92,10 +92,12 @@ namespace llarp
utp_context_set_option(_utp_ctx, UTP_LOG_NORMAL, 1);
utp_context_set_option(_utp_ctx, UTP_LOG_MTU, 1);
utp_context_set_option(_utp_ctx, UTP_LOG_DEBUG, 1);
utp_context_set_option(_utp_ctx, UTP_SNDBUF,
(MAX_LINK_MSG_SIZE * MaxSendQueueSize * size_t{3}) / size_t{2});
utp_context_set_option(_utp_ctx, UTP_RCVBUF,
(MAX_LINK_MSG_SIZE * MaxSendQueueSize * size_t{3}) / size_t{2});
utp_context_set_option(
_utp_ctx, UTP_SNDBUF,
(MAX_LINK_MSG_SIZE * MaxSendQueueSize * size_t{3}) / size_t{2});
utp_context_set_option(
_utp_ctx, UTP_RCVBUF,
(MAX_LINK_MSG_SIZE * MaxSendQueueSize * size_t{3}) / size_t{2});
}
LinkLayer::~LinkLayer()

@ -10,9 +10,9 @@ namespace llarp
{
LinkLayer_ptr
NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed)
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler timeout, SessionClosedHandler closed)
{
return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est,
reneg, timeout, closed, false);

Loading…
Cancel
Save