remove obsolete timer-related code

pull/990/head
Thomas Winget 5 years ago
parent e53e3db171
commit f4c9e09d44

@ -53,7 +53,6 @@ set(LIB_UTIL_SRC
util/thread/thread_pool.cpp
util/thread/threading.cpp
util/thread/threadpool.cpp
util/thread/timer.cpp
util/thread/timerqueue.cpp
util/time.cpp
util/types.cpp

@ -31,11 +31,6 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
{
ev->update_time();
ev->tick(EV_TICK_INTERVAL);
if(ev->running())
{
ev->update_time();
logic->tick(ev->time_now());
}
llarp::LogContext::Instance().logStream->Tick(ev->time_now());
}
logic->clear_event_loop();

@ -1,5 +1,4 @@
#include <util/thread/logic.hpp>
#include <util/thread/timer.hpp>
#include <util/logging/logger.hpp>
#include <util/mem.h>
#include <util/metrics/metrics.hpp>
@ -8,22 +7,8 @@
namespace llarp
{
void
Logic::tick(llarp_time_t now)
{
if(m_Queue)
{
llarp_timer_set_time(m_Timer, now);
if(llarp_timer_should_call(m_Timer))
m_Queue(std::bind(&llarp_timer_tick_all, m_Timer));
return;
}
llarp_timer_tick_all_async(m_Timer, m_Thread, now);
}
Logic::Logic(size_t sz)
: m_Thread(llarp_init_threadpool(1, "llarp-logic", sz))
, m_Timer(llarp_init_timer())
{
llarp_threadpool_start(m_Thread);
/// set thread id
@ -41,7 +26,6 @@ namespace llarp
Logic::~Logic()
{
delete m_Thread;
llarp_free_timer(m_Timer);
}
size_t
@ -61,8 +45,6 @@ namespace llarp
Logic::stop()
{
llarp::LogDebug("logic thread stop");
// stop all timers from happening in the future
LogicCall(this, std::bind(&llarp_timer_stop, m_Timer));
// stop all operations on threadpool
llarp_threadpool_stop(m_Thread);
}
@ -150,20 +132,9 @@ namespace llarp
return 0;
}
uint32_t
Logic::call_later(const llarp_timeout_job& job)
{
llarp_timeout_job j;
j.user = job.user;
j.timeout = job.timeout;
j.handler = job.handler;
return llarp_timer_call_later(m_Timer, j);
}
void
Logic::cancel_call(uint32_t id)
{
llarp_timer_cancel_job(m_Timer, id);
auto loop = m_Loop;
if(loop != nullptr)
{
@ -174,7 +145,6 @@ namespace llarp
void
Logic::remove_call(uint32_t id)
{
llarp_timer_remove_job(m_Timer, id);
auto loop = m_Loop;
if(loop != nullptr)
{

@ -4,7 +4,6 @@
#include <ev/ev.hpp>
#include <util/mem.h>
#include <util/thread/threadpool.h>
#include <util/thread/timer.hpp>
#include <absl/types/optional.h>
namespace llarp
@ -16,10 +15,6 @@ namespace llarp
~Logic();
/// trigger times as needed
void
tick(llarp_time_t now);
/// stop all operation and wait for that to die
void
stop();
@ -31,9 +26,6 @@ namespace llarp
_traceLogicCall(std::function< void(void) > func, const char* filename,
int lineo);
uint32_t
call_later(const llarp_timeout_job& job);
uint32_t
call_later(llarp_time_t later, std::function< void(void) > func);
@ -62,7 +54,6 @@ namespace llarp
using ID_t = std::thread::id;
llarp_threadpool* const m_Thread;
llarp_ev_loop* m_Loop;
llarp_timer_context* const m_Timer;
absl::optional< ID_t > m_ID;
util::ContentionKiller m_Killer;
std::function< void(std::function< void(void) >) > m_Queue;

@ -1,305 +0,0 @@
#include <util/thread/timer.hpp>
#include <util/logging/logger.hpp>
#include <util/time.hpp>
#include <atomic>
#include <condition_variable>
#include <list>
#include <memory>
#include <queue>
#include <unordered_map>
#include <utility>
namespace llarp
{
struct timer
{
void* user;
uint64_t called_at;
uint64_t started;
uint64_t timeout;
llarp_timer_handler_func func;
std::function< void(void) > deferredFunc;
bool done;
bool canceled;
timer(llarp_time_t now, uint64_t ms = 0, void* _user = nullptr,
llarp_timer_handler_func _func = nullptr)
: user(_user)
, called_at(0)
, started(now)
, timeout(ms)
, func(std::move(_func))
, done(false)
, canceled(false)
{
}
~timer() = default;
void
exec();
static void
call(void* user)
{
static_cast< timer* >(user)->exec();
}
};
} // namespace llarp
struct llarp_timer_context
{
llarp::util::Mutex timersMutex; // protects timers
std::unordered_map< uint32_t, std::unique_ptr< llarp::timer > > timers
GUARDED_BY(timersMutex);
llarp::util::Mutex tickerMutex;
std::unique_ptr< llarp::util::Condition > ticker;
absl::Duration nextTickLen = absl::Milliseconds(100);
llarp_time_t m_Now;
llarp_time_t m_NextRequiredTickAt =
std::numeric_limits< llarp_time_t >::max();
size_t m_NumPendingTimers;
llarp_timer_context()
{
m_Now = llarp::time_now_ms();
}
uint32_t currentId = 0;
bool _run = true;
~llarp_timer_context() = default;
bool
run()
{
return _run;
}
void
stop()
{
_run = false;
}
void
cancel(uint32_t id) LOCKS_EXCLUDED(timersMutex)
{
llarp::util::Lock lock(&timersMutex);
const auto& itr = timers.find(id);
if(itr == timers.end())
return;
itr->second->canceled = true;
}
void
remove(uint32_t id) LOCKS_EXCLUDED(timersMutex)
{
llarp::util::Lock lock(&timersMutex);
const auto& itr = timers.find(id);
if(itr == timers.end())
return;
itr->second->func = nullptr;
itr->second->canceled = true;
}
uint32_t
call_later(void* user, llarp_timer_handler_func func, uint64_t timeout_ms)
LOCKS_EXCLUDED(timersMutex)
{
llarp::util::Lock lock(&timersMutex);
const uint32_t id = ++currentId;
timers.emplace(
id, std::make_unique< llarp::timer >(m_Now, timeout_ms, user, func));
m_NextRequiredTickAt = std::min(m_NextRequiredTickAt, m_Now + timeout_ms);
m_NumPendingTimers = timers.size();
return id;
}
uint32_t
call_func_later(std::function< void(void) > func, llarp_time_t timeout_ms)
{
llarp::util::Lock lock(&timersMutex);
const uint32_t id = ++currentId;
timers.emplace(
id,
std::make_unique< llarp::timer >(m_Now, timeout_ms, nullptr, nullptr));
timers[id]->deferredFunc = func;
m_NextRequiredTickAt = std::min(m_NextRequiredTickAt, m_Now + timeout_ms);
m_NumPendingTimers = timers.size();
return id;
}
void
cancel_all() LOCKS_EXCLUDED(timersMutex)
{
{
llarp::util::Lock lock(&timersMutex);
for(auto& item : timers)
{
item.second->func = nullptr;
item.second->canceled = true;
}
}
}
bool
ShouldTriggerTimers(llarp_time_t peekAhead) const
{
return m_NumPendingTimers > 0
and (m_Now + peekAhead) >= m_NextRequiredTickAt;
}
};
struct llarp_timer_context*
llarp_init_timer()
{
return new llarp_timer_context();
}
uint32_t
llarp_timer_call_later(struct llarp_timer_context* t,
struct llarp_timeout_job job)
{
return t->call_later(job.user, job.handler, job.timeout);
}
uint32_t
llarp_timer_call_func_later(struct llarp_timer_context* t, llarp_time_t timeout,
std::function< void(void) > func)
{
return t->call_func_later(func, timeout);
}
void
llarp_free_timer(struct llarp_timer_context* t)
{
delete t;
}
void
llarp_timer_remove_job(struct llarp_timer_context* t, uint32_t id)
{
t->remove(id);
}
void
llarp_timer_stop(struct llarp_timer_context* t)
{
llarp::LogDebug("timers stopping");
// destroy all timers
// don't call callbacks on timers
{
llarp::util::Lock lock(&t->timersMutex);
t->timers.clear();
t->stop();
}
if(t->ticker)
t->ticker->SignalAll();
llarp::LogDebug("timers stopped");
}
void
llarp_timer_cancel_job(struct llarp_timer_context* t, uint32_t id)
{
t->cancel(id);
}
void
llarp_timer_set_time(struct llarp_timer_context* t, llarp_time_t now)
{
if(now == 0)
now = llarp::time_now_ms();
t->m_Now = now;
}
void
llarp_timer_tick_all(struct llarp_timer_context* t)
{
if(!t->run())
return;
const auto now = llarp::time_now_ms();
t->m_Now = now;
std::list< std::unique_ptr< llarp::timer > > hit;
{
llarp::util::Lock lock(&t->timersMutex);
auto itr = t->timers.begin();
while(itr != t->timers.end())
{
if(now - itr->second->started >= itr->second->timeout
|| itr->second->canceled)
{
// timer hit
hit.emplace_back(std::move(itr->second));
itr = t->timers.erase(itr);
}
else
{
++itr;
}
}
}
while(not hit.empty())
{
const auto& h = hit.front();
h->called_at = now;
h->exec();
hit.pop_front();
}
// reindex next tick info
{
llarp::util::Lock lock(&t->timersMutex);
t->m_Now = now;
t->m_NextRequiredTickAt = std::numeric_limits< llarp_time_t >::max();
for(const auto& item : t->timers)
{
t->m_NextRequiredTickAt =
std::min(t->m_NextRequiredTickAt, item.second->timeout + t->m_Now);
}
t->m_NumPendingTimers = t->timers.size();
}
}
bool
llarp_timer_should_call(struct llarp_timer_context* t)
{
return t->ShouldTriggerTimers(0);
}
void
llarp_timer_tick_all_async(struct llarp_timer_context* t,
struct llarp_threadpool* pool, llarp_time_t now)
{
llarp_timer_set_time(t, now);
if(t->ShouldTriggerTimers(0))
llarp_threadpool_queue_job(pool, std::bind(&llarp_timer_tick_all, t));
}
namespace llarp
{
void
timer::exec()
{
if(func)
{
auto diff = called_at - started;
// zero out function pointer before call to prevent multiple calls being
// queued if call takes longer than 1 timer tick
auto call = func;
func = nullptr;
if(diff >= timeout)
call(user, timeout, 0);
else
call(user, timeout, diff);
}
if(deferredFunc && not canceled)
deferredFunc();
deferredFunc = nullptr;
done = true;
}
} // namespace llarp

@ -1,64 +0,0 @@
#ifndef LLARP_TIMER_HPP
#define LLARP_TIMER_HPP
#include <util/common.hpp>
#include <util/thread/threadpool.h>
#include <util/time.hpp>
#include <functional>
/** called with userptr, original timeout, left */
using llarp_timer_handler_func =
std::function< void(void *, uint64_t, uint64_t) >;
struct llarp_timeout_job
{
uint64_t timeout;
void *user;
llarp_timer_handler_func handler;
};
struct llarp_timer_context;
struct llarp_timer_context *
llarp_init_timer();
uint32_t
llarp_timer_call_later(struct llarp_timer_context *t,
struct llarp_timeout_job job);
uint32_t
llarp_timer_call_func_later(llarp_timer_context *t, llarp_time_t timeout,
std::function< void(void) > func);
void
llarp_timer_cancel_job(struct llarp_timer_context *t, uint32_t id);
void
llarp_timer_remove_job(struct llarp_timer_context *t, uint32_t id);
bool
llarp_timer_should_call(struct llarp_timer_context *t);
// cancel all
void
llarp_timer_stop(struct llarp_timer_context *t);
/// set timer's timestamp, if now is 0 use the current time from system clock,
/// llarp_time_t now
void
llarp_timer_set_time(struct llarp_timer_context *t, llarp_time_t now);
/// single threaded run timer, tick all timers
void
llarp_timer_tick_all(struct llarp_timer_context *t);
/// tick all timers into a threadpool asynchronously
void
llarp_timer_tick_all_async(struct llarp_timer_context *t,
struct llarp_threadpool *pool, llarp_time_t now);
void
llarp_free_timer(struct llarp_timer_context *t);
#endif

@ -29,12 +29,10 @@ struct AbyssTestBase : public ::testing::Test
ASSERT_EQ(meth, method);
}
static void
CancelIt(void* u, ABSL_ATTRIBUTE_UNUSED uint64_t orig, uint64_t left)
void
CancelIt()
{
if(left)
return;
static_cast< AbyssTestBase* >(u)->Stop();
Stop();
}
static void
@ -59,7 +57,7 @@ struct AbyssTestBase : public ::testing::Test
if(server->ServeAsync(loop, logic, a))
{
client->RunAsync(loop, a.ToString());
logic->call_later({1000, this, &CancelIt});
logic->call_later(1000, std::bind(&AbyssTestBase::CancelIt, this));
return;
}
std::this_thread::sleep_for(std::chrono::seconds(1));

Loading…
Cancel
Save