lokinet/llarp/path/transit_hop.cpp
2023-10-16 11:05:07 -07:00

264 lines
6.8 KiB
C++

#include "path.hpp"
#include "path_context.hpp"
#include "transit_hop.hpp"
#include <llarp/exit/context.hpp>
#include <llarp/exit/exit_messages.hpp>
#include <llarp/link/link_manager.hpp>
#include <llarp/messages/discard.hpp>
#include <llarp/router/router.hpp>
#include <llarp/util/buffer.hpp>
#include <oxenc/endian.h>
namespace llarp::path
{
std::string
TransitHopInfo::ToString() const
{
return fmt::format(
"[TransitHopInfo tx={} rx={} upstream={} downstream={}]", txID, rxID, upstream, downstream);
}
TransitHop::TransitHop()
: AbstractHopHandler{}
, m_UpstreamGather{TRANSIT_HOP_QUEUE_SIZE}
, m_DownstreamGather{TRANSIT_HOP_QUEUE_SIZE}
{
m_UpstreamGather.enable();
m_DownstreamGather.enable();
m_UpstreamWorkCounter = 0;
m_DownstreamWorkCounter = 0;
}
bool
TransitHop::Expired(llarp_time_t now) const
{
return destroy || (now >= ExpireTime());
}
llarp_time_t
TransitHop::ExpireTime() const
{
return started + lifetime;
}
TransitHopInfo::TransitHopInfo(const RouterID& down, const LR_CommitRecord& record)
: txID(record.txid), rxID(record.rxid), upstream(record.nextHop), downstream(down)
{}
/** Note: this is one of two places where AbstractRoutingMessage::bt_encode() is called, the
other of which is llarp/path/path.cpp in Path::SendRoutingMessage(). For now,
we will default to the override of ::bt_encode() that returns an std::string. The role that
llarp_buffer_t plays here is likely superfluous, and can be replaced with either a leaner
llarp_buffer, or just handled using strings.
One important consideration is the frequency at which routing messages are sent, making
superfluous copies important to optimize out here. We have to instantiate at least one
std::string whether we pass a bt_dict_producer as a reference or create one within the
::bt_encode() call.
If we decide to stay with std::strings, the function Path::HandleUpstream (along with the
functions it calls and so on) will need to be modified to take an std::string that we can
std::move around.
*/
bool
TransitHop::SendRoutingMessage(const routing::AbstractRoutingMessage& msg, Router* r)
{
if (!IsEndpoint(r->pubkey()))
return false;
auto buf = msg.bt_encode();
TunnelNonce N;
N.Randomize();
// pad to nearest MESSAGE_PAD_SIZE bytes
auto dlt = buf.size() % PAD_SIZE;
if (dlt)
{
dlt = PAD_SIZE - dlt;
// randomize padding
CryptoManager::instance()->randbytes(reinterpret_cast<uint8_t*>(buf.data()), dlt);
}
return HandleDownstream(buf, N, r);
}
void
TransitHop::DownstreamWork(TrafficQueue_t msgs, Router* r)
{
auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayDownstreamMessage> msgs;
while (auto maybe = self->m_DownstreamGather.tryPopFront())
{
msgs.push_back(*maybe);
}
self->HandleAllDownstream(std::move(msgs), r);
};
for (auto& ev : msgs)
{
RelayDownstreamMessage msg;
// const llarp_buffer_t buf(ev.first);
uint8_t* buf = ev.first.data();
size_t sz = ev.first.size();
msg.pathid = info.rxID;
msg.nonce = ev.second ^ nonceXOR;
CryptoManager::instance()->xchacha20(buf, sz, pathKey, ev.second);
std::memcpy(msg.enc.data(), buf, sz);
llarp::LogDebug(
"relay ",
msg.enc.size(),
" bytes downstream from ",
info.upstream,
" to ",
info.downstream);
if (m_DownstreamGather.full())
{
r->loop()->call(flushIt);
}
if (m_DownstreamGather.enabled())
m_DownstreamGather.pushBack(msg);
}
r->loop()->call(flushIt);
}
void
TransitHop::UpstreamWork(TrafficQueue_t msgs, Router* r)
{
for (auto& ev : msgs)
{
RelayUpstreamMessage msg;
uint8_t* buf = ev.first.data();
size_t sz = ev.first.size();
CryptoManager::instance()->xchacha20(buf, sz, pathKey, ev.second);
msg.pathid = info.txID;
msg.nonce = ev.second ^ nonceXOR;
std::memcpy(msg.enc.data(), buf, sz);
if (m_UpstreamGather.tryPushBack(msg) != thread::QueueReturn::Success)
break;
}
// Flush it:
r->loop()->call([self = shared_from_this(), r] {
std::vector<RelayUpstreamMessage> msgs;
while (auto maybe = self->m_UpstreamGather.tryPopFront())
{
msgs.push_back(*maybe);
}
self->HandleAllUpstream(std::move(msgs), r);
});
}
void
TransitHop::HandleAllUpstream(std::vector<RelayUpstreamMessage> msgs, Router* r)
{
if (IsEndpoint(r->pubkey()))
{
for (const auto& msg : msgs)
{
const llarp_buffer_t buf(msg.enc);
if (!r->ParseRoutingMessageBuffer(buf, this, info.rxID))
{
LogWarn("invalid upstream data on endpoint ", info);
}
m_LastActivity = r->now();
}
FlushDownstream(r);
for (const auto& other : m_FlushOthers)
{
other->FlushDownstream(r);
}
m_FlushOthers.clear();
}
else
{
for (const auto& msg : msgs)
{
llarp::LogDebug(
"relay ",
msg.enc.size(),
" bytes upstream from ",
info.downstream,
" to ",
info.upstream);
r->send_data_message(info.upstream, msg);
}
}
r->TriggerPump();
}
void
TransitHop::HandleAllDownstream(std::vector<RelayDownstreamMessage> msgs, Router* r)
{
for (const auto& msg : msgs)
{
log::debug(
path_cat,
"Relaying {} bytes downstream from {} to {}",
msg.enc.size(),
info.upstream,
info.downstream);
r->send_data_message(info.downstream, msg.bt_encode());
}
r->TriggerPump();
}
void
TransitHop::FlushUpstream(Router* r)
{
if (not m_UpstreamQueue.empty())
{
r->queue_work([self = shared_from_this(),
data = std::exchange(m_UpstreamQueue, {}),
r]() mutable { self->UpstreamWork(std::move(data), r); });
}
}
void
TransitHop::FlushDownstream(Router* r)
{
if (not m_DownstreamQueue.empty())
{
r->queue_work([self = shared_from_this(),
data = std::exchange(m_DownstreamQueue, {}),
r]() mutable { self->DownstreamWork(std::move(data), r); });
}
}
std::string
TransitHop::ToString() const
{
return fmt::format(
"[TransitHop {} started={} lifetime={}", info, started.count(), lifetime.count());
}
void
TransitHop::Stop()
{
m_UpstreamGather.disable();
m_DownstreamGather.disable();
}
void
TransitHop::SetSelfDestruct()
{
destroy = true;
}
void
TransitHop::QueueDestroySelf(Router* r)
{
r->loop()->call([self = shared_from_this()] { self->SetSelfDestruct(); });
}
} // namespace llarp::path