mirror of
https://github.com/oxen-io/lokinet.git
synced 2024-11-17 15:25:35 +00:00
b4440094b0
- util::Mutex is now a std::shared_timed_mutex, which is capable of
exclusive and shared locks.
- util::Lock is still present as a std::lock_guard<util::Mutex>.
- the locking annotations are preserved, but updated to the latest
supported by clang rather than using abseil's older/deprecated ones.
- ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into
locks anymore (WTF abseil).
- ReleasableLock is gone. Instead there are now some llarp::util helper
methods to obtain unique and/or shared locks:
- `auto lock = util::unique_lock(mutex);` gets an RAII-but-also
unlockable object (std::unique_lock<T>, with T inferred from
`mutex`).
- `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e.
"reader") lock of the mutex.
- `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be
used to atomically lock multiple mutexes at once (returning a
tuple of the locks).
This are templated on the mutex which makes them a bit more flexible
than using a concrete type: they can be used for any type of lockable
mutex, not only util::Mutex. (Some of the code here uses them for
getting locks around a std::mutex). Until C++17, using the RAII types
is painfully verbose:
```C++
// pre-C++17 - needing to figure out the mutex type here is annoying:
std::unique_lock<util::Mutex> lock(mutex);
// pre-C++17 and even more verbose (but at least the type isn't needed):
std::unique_lock<decltype(mutex)> lock(mutex);
// our compromise:
auto lock = util::unique_lock(mutex);
// C++17:
std::unique_lock lock(mutex);
```
All of these functions will also warn (under gcc or clang) if you
discard the return value. You can also do fancy things like
`auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a
lock take over an already-locked mutex).
- metrics code is gone, which also removes a big pile of code that was
only used by metrics:
- llarp::util::Scheduler
- llarp:🧵:TimerQueue
- llarp::util::Stopwatch
156 lines
3.6 KiB
C++
156 lines
3.6 KiB
C++
#ifndef LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP
|
|
#define LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP
|
|
|
|
#include <router/i_outbound_message_handler.hpp>
|
|
|
|
#include <util/thread/logic.hpp>
|
|
#include <util/thread/queue.hpp>
|
|
#include <util/thread/threading.hpp>
|
|
#include <path/path_types.hpp>
|
|
#include <router_id.hpp>
|
|
|
|
#include <list>
|
|
#include <unordered_map>
|
|
#include <utility>
|
|
|
|
struct llarp_buffer_t;
|
|
|
|
namespace llarp
|
|
{
|
|
struct ILinkManager;
|
|
class Logic;
|
|
enum class SessionResult;
|
|
|
|
struct OutboundMessageHandler final : public IOutboundMessageHandler
|
|
{
|
|
public:
|
|
~OutboundMessageHandler() override = default;
|
|
|
|
OutboundMessageHandler(size_t maxQueueSize = MAX_OUTBOUND_QUEUE_SIZE);
|
|
|
|
bool
|
|
QueueMessage(const RouterID &remote, const ILinkMessage *msg,
|
|
SendStatusHandler callback) override EXCLUDES(_mutex);
|
|
|
|
void
|
|
Tick() override;
|
|
|
|
void
|
|
QueueRemoveEmptyPath(const PathID_t &pathid) override;
|
|
|
|
util::StatusObject
|
|
ExtractStatus() const override;
|
|
|
|
void
|
|
Init(ILinkManager *linkManager, std::shared_ptr< Logic > logic);
|
|
|
|
private:
|
|
using Message = std::pair< std::vector< byte_t >, SendStatusHandler >;
|
|
|
|
struct MessageQueueEntry
|
|
{
|
|
uint16_t priority;
|
|
Message message;
|
|
PathID_t pathid;
|
|
RouterID router;
|
|
|
|
bool
|
|
operator<(const MessageQueueEntry &other) const
|
|
{
|
|
return other.priority < priority;
|
|
}
|
|
};
|
|
|
|
struct MessageQueueStats
|
|
{
|
|
uint64_t queued = 0;
|
|
uint64_t dropped = 0;
|
|
uint64_t sent = 0;
|
|
uint32_t queueWatermark = 0;
|
|
|
|
uint32_t perTickMax = 0;
|
|
uint32_t numTicks = 0;
|
|
};
|
|
|
|
using MessageQueue = std::priority_queue< MessageQueueEntry >;
|
|
|
|
void
|
|
OnSessionEstablished(const RouterID &router);
|
|
|
|
void
|
|
OnConnectTimeout(const RouterID &router);
|
|
|
|
void
|
|
OnRouterNotFound(const RouterID &router);
|
|
|
|
void
|
|
OnInvalidRouter(const RouterID &router);
|
|
|
|
void
|
|
OnNoLink(const RouterID &router);
|
|
|
|
void
|
|
OnSessionResult(const RouterID &router, const SessionResult result);
|
|
|
|
void
|
|
DoCallback(SendStatusHandler callback, SendStatus status);
|
|
|
|
void
|
|
QueueSessionCreation(const RouterID &remote);
|
|
|
|
bool
|
|
EncodeBuffer(const ILinkMessage *msg, llarp_buffer_t &buf);
|
|
|
|
bool
|
|
Send(const RouterID &remote, const Message &msg);
|
|
|
|
bool
|
|
SendIfSession(const RouterID &remote, const Message &msg);
|
|
|
|
bool
|
|
QueueOutboundMessage(const RouterID &remote, Message &&msg,
|
|
const PathID_t &pathid, uint16_t priority = 0);
|
|
|
|
void
|
|
ProcessOutboundQueue();
|
|
|
|
void
|
|
RemoveEmptyPathQueues();
|
|
|
|
void
|
|
SendRoundRobin();
|
|
|
|
void
|
|
FinalizeSessionRequest(const RouterID &router, SendStatus status)
|
|
EXCLUDES(_mutex);
|
|
|
|
llarp::thread::Queue< MessageQueueEntry > outboundQueue;
|
|
llarp::thread::Queue< PathID_t > removedPaths;
|
|
bool removedSomePaths;
|
|
|
|
mutable util::Mutex _mutex; // protects pendingSessionMessageQueues
|
|
|
|
std::unordered_map< RouterID, MessageQueue, RouterID::Hash >
|
|
pendingSessionMessageQueues GUARDED_BY(_mutex);
|
|
|
|
std::unordered_map< PathID_t, MessageQueue, PathID_t::Hash >
|
|
outboundMessageQueues;
|
|
|
|
std::queue< PathID_t > roundRobinOrder;
|
|
|
|
ILinkManager *_linkManager;
|
|
std::shared_ptr< Logic > _logic;
|
|
|
|
util::ContentionKiller m_Killer;
|
|
|
|
// paths cannot have pathid "0", so it can be used as the "pathid"
|
|
// for non-traffic (control) messages, so they can be prioritized.
|
|
static const PathID_t zeroID;
|
|
|
|
MessageQueueStats m_queueStats;
|
|
};
|
|
|
|
} // namespace llarp
|
|
|
|
#endif // LLARP_ROUTER_OUTBOUND_MESSAGE_HANDLER_HPP
|