mirror of
https://github.com/oxen-io/lokinet.git
synced 2024-11-02 03:40:12 +00:00
232 lines
5.4 KiB
C++
232 lines
5.4 KiB
C++
#ifndef LLARP_SCHEDULER_HPP
|
|
#define LLARP_SCHEDULER_HPP
|
|
|
|
#include <util/meta/object.hpp>
|
|
#include <util/thread/timerqueue.hpp>
|
|
|
|
#include <absl/time/time.h>
|
|
#include <atomic>
|
|
#include <functional>
|
|
#include <thread>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
namespace llarp
|
|
{
|
|
namespace thread
|
|
{
|
|
/// This is a general purpose event scheduler, supporting both one-off and
|
|
/// repeated events.
|
|
///
|
|
/// Notes:
|
|
/// - Events should not be started before their begin time
|
|
/// - Events may start an arbitrary amount of time after they are scheduled,
|
|
/// if there is a previous long running event.
|
|
class Scheduler
|
|
{
|
|
public:
|
|
using Callback = std::function< void() >;
|
|
using Handle = int;
|
|
static const Handle INVALID_HANDLE;
|
|
|
|
// Define our own clock so we can test easier
|
|
using Clock = std::function< absl::Time() >;
|
|
|
|
private:
|
|
/// struct for repeated events
|
|
struct RepeatData
|
|
{
|
|
Callback m_callback;
|
|
absl::Duration m_period;
|
|
std::atomic_bool m_isCancelled;
|
|
Handle m_handle;
|
|
|
|
RepeatData(Callback callback, absl::Duration period)
|
|
: m_callback(std::move(callback))
|
|
, m_period(period)
|
|
, m_isCancelled(false)
|
|
, m_handle(0)
|
|
{
|
|
}
|
|
};
|
|
|
|
using RepeatDataPtr = std::shared_ptr< RepeatData >;
|
|
using RepeatQueue = TimerQueue< RepeatDataPtr >;
|
|
// Just for naming purposes.
|
|
using Event = Callback;
|
|
using EventQueue = TimerQueue< Event >;
|
|
using EventItem = TimerQueueItem< Event >;
|
|
|
|
public:
|
|
// Looks more horrible than it is.
|
|
using EventDispatcher = std::function< void(const Callback&) >;
|
|
|
|
using EventKey = EventQueue::Key;
|
|
|
|
private:
|
|
Clock m_clock;
|
|
EventQueue m_eventQueue;
|
|
RepeatQueue m_repeatQueue;
|
|
object::Catalog< RepeatDataPtr > m_repeats;
|
|
|
|
util::Mutex m_threadMutex ACQUIRED_BEFORE(m_mutex); // protects running
|
|
util::Mutex m_mutex ACQUIRED_AFTER(m_threadMutex); // master mutex
|
|
absl::CondVar m_condition;
|
|
|
|
EventDispatcher m_dispatcher;
|
|
std::thread m_thread;
|
|
|
|
std::atomic_bool m_running;
|
|
std::atomic_size_t m_iterationCount;
|
|
|
|
std::vector< EventItem > m_events;
|
|
std::vector< EventItem >::iterator m_eventIt;
|
|
|
|
std::atomic_size_t m_repeatCount;
|
|
std::atomic_size_t m_eventCount;
|
|
|
|
Scheduler(const Scheduler&) = delete;
|
|
Scheduler&
|
|
operator=(const Scheduler&) = delete;
|
|
|
|
friend class DispatcherImpl;
|
|
friend class Tardis;
|
|
|
|
/// Dispatch thread function
|
|
void
|
|
dispatch();
|
|
|
|
/// Yield to the dispatch thread
|
|
void
|
|
yield();
|
|
|
|
public:
|
|
/// Return the epoch from which to create `Durations` from.
|
|
static absl::Time
|
|
epoch()
|
|
{
|
|
return absl::UnixEpoch();
|
|
}
|
|
|
|
static EventDispatcher
|
|
defaultDispatcher()
|
|
{
|
|
return [](const Callback& callback) { callback(); };
|
|
}
|
|
|
|
static Clock
|
|
defaultClock()
|
|
{
|
|
return &absl::Now;
|
|
}
|
|
|
|
Scheduler() : Scheduler(defaultDispatcher(), defaultClock())
|
|
{
|
|
}
|
|
|
|
explicit Scheduler(const EventDispatcher& dispatcher)
|
|
: Scheduler(dispatcher, defaultClock())
|
|
{
|
|
}
|
|
|
|
explicit Scheduler(const Clock& clock)
|
|
: Scheduler(defaultDispatcher(), clock)
|
|
{
|
|
}
|
|
|
|
Scheduler(EventDispatcher dispatcher, Clock clock);
|
|
|
|
~Scheduler();
|
|
|
|
/// Start the scheduler
|
|
/// Note that currently this "can't fail" and return `false`. If thread
|
|
/// spawning fails, an exception will be thrown.
|
|
bool
|
|
start();
|
|
|
|
void
|
|
stop();
|
|
|
|
Handle
|
|
schedule(absl::Time time, const Callback& callback,
|
|
const EventKey& key = EventKey(nullptr));
|
|
|
|
bool
|
|
reschedule(Handle handle, absl::Time time, bool wait = false);
|
|
bool
|
|
reschedule(Handle handle, const EventKey& key, absl::Time time,
|
|
bool wait = false);
|
|
|
|
bool
|
|
cancel(Handle handle, bool wait = false)
|
|
{
|
|
return cancel(handle, EventKey(nullptr), wait);
|
|
}
|
|
bool
|
|
cancel(Handle handle, const EventKey& key, bool wait = false);
|
|
|
|
void
|
|
cancelAll(bool wait = false);
|
|
|
|
Handle
|
|
scheduleRepeat(absl::Duration interval, const Callback& callback,
|
|
absl::Time startTime = absl::Time());
|
|
|
|
bool
|
|
cancelRepeat(Handle handle, bool wait = false);
|
|
|
|
void
|
|
cancelAllRepeats(bool wait = false);
|
|
|
|
size_t
|
|
repeatCount() const
|
|
{
|
|
return m_repeatCount;
|
|
}
|
|
|
|
size_t
|
|
eventCount() const
|
|
{
|
|
return m_eventCount;
|
|
}
|
|
};
|
|
|
|
class Tardis
|
|
{
|
|
mutable util::Mutex m_mutex;
|
|
absl::Time m_time;
|
|
Scheduler& m_scheduler;
|
|
|
|
public:
|
|
Tardis(Scheduler& scheduler) : m_time(absl::Now()), m_scheduler(scheduler)
|
|
{
|
|
m_scheduler.m_clock = std::bind(&Tardis::now, this);
|
|
}
|
|
|
|
void
|
|
advanceTime(absl::Duration duration)
|
|
{
|
|
{
|
|
absl::WriterMutexLock l(&m_mutex);
|
|
m_time += duration;
|
|
}
|
|
|
|
{
|
|
absl::WriterMutexLock l(&m_scheduler.m_mutex);
|
|
m_scheduler.m_condition.Signal();
|
|
}
|
|
}
|
|
|
|
absl::Time
|
|
now() const
|
|
{
|
|
absl::ReaderMutexLock l(&m_mutex);
|
|
return m_time;
|
|
}
|
|
};
|
|
} // namespace thread
|
|
|
|
} // namespace llarp
|
|
|
|
#endif
|