You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lokinet/llarp/path/transit_hop.cpp

263 lines
6.6 KiB
C++

#include "transit_hop.hpp"
#include <llarp/router/router.hpp>
#include <llarp/util/buffer.hpp>
1 year ago
namespace llarp::path
{
1 year ago
std::string
TransitHopInfo::ToString() const
{
1 year ago
return fmt::format(
"[TransitHopInfo tx={} rx={} upstream={} downstream={}]", txID, rxID, upstream, downstream);
}
TransitHop::TransitHop()
: AbstractHopHandler{}
, m_UpstreamGather{TRANSIT_HOP_QUEUE_SIZE}
, m_DownstreamGather{TRANSIT_HOP_QUEUE_SIZE}
1 year ago
{
m_UpstreamGather.enable();
m_DownstreamGather.enable();
m_UpstreamWorkCounter = 0;
m_DownstreamWorkCounter = 0;
}
12 months ago
bool
TransitHop::send_path_control_message(
11 months ago
std::string, std::string, std::function<void(oxen::quic::message m)>)
12 months ago
{
return true;
}
1 year ago
bool
TransitHop::Expired(llarp_time_t now) const
{
return destroy || (now >= ExpireTime());
}
1 year ago
llarp_time_t
TransitHop::ExpireTime() const
{
return started + lifetime;
}
TransitHopInfo::TransitHopInfo(const RouterID& down) : downstream(down)
1 year ago
{}
/** Note: this is one of two places where AbstractRoutingMessage::bt_encode() is called, the
other of which is llarp/path/path.cpp in Path::SendRoutingMessage(). For now,
we will default to the override of ::bt_encode() that returns an std::string. The role that
llarp_buffer_t plays here is likely superfluous, and can be replaced with either a leaner
llarp_buffer, or just handled using strings.
One important consideration is the frequency at which routing messages are sent, making
superfluous copies important to optimize out here. We have to instantiate at least one
std::string whether we pass a bt_dict_producer as a reference or create one within the
::bt_encode() call.
If we decide to stay with std::strings, the function Path::HandleUpstream (along with the
functions it calls and so on) will need to be modified to take an std::string that we can
std::move around.
*/
1 year ago
bool
TransitHop::SendRoutingMessage(std::string payload, Router* r)
1 year ago
{
if (!IsEndpoint(r->pubkey()))
return false;
1 year ago
TunnelNonce N;
N.Randomize();
// pad to nearest MESSAGE_PAD_SIZE bytes
auto dlt = payload.size() % PAD_SIZE;
1 year ago
if (dlt)
{
dlt = PAD_SIZE - dlt;
1 year ago
// randomize padding
crypto::randbytes(reinterpret_cast<uint8_t*>(payload.data()), dlt);
1 year ago
}
// TODO: relay message along
return true;
1 year ago
}
1 year ago
void
TransitHop::DownstreamWork(TrafficQueue_t msgs, Router* r)
1 year ago
{
auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayDownstreamMessage> msgs;
while (auto maybe = self->m_DownstreamGather.tryPopFront())
{
1 year ago
msgs.push_back(*maybe);
}
1 year ago
self->HandleAllDownstream(std::move(msgs), r);
};
for (auto& ev : msgs)
{
RelayDownstreamMessage msg;
// const llarp_buffer_t buf(ev.first);
uint8_t* buf = ev.first.data();
size_t sz = ev.first.size();
1 year ago
msg.pathid = info.rxID;
msg.nonce = ev.second ^ nonceXOR;
crypto::xchacha20(buf, sz, pathKey, ev.second);
std::memcpy(msg.enc.data(), buf, sz);
1 year ago
llarp::LogDebug(
"relay ",
msg.enc.size(),
" bytes downstream from ",
info.upstream,
" to ",
info.downstream);
if (m_DownstreamGather.full())
{
1 year ago
r->loop()->call(flushIt);
}
1 year ago
if (m_DownstreamGather.enabled())
m_DownstreamGather.pushBack(msg);
}
1 year ago
r->loop()->call(flushIt);
}
1 year ago
void
TransitHop::UpstreamWork(TrafficQueue_t msgs, Router* r)
1 year ago
{
for (auto& ev : msgs)
{
1 year ago
RelayUpstreamMessage msg;
uint8_t* buf = ev.first.data();
size_t sz = ev.first.size();
crypto::xchacha20(buf, sz, pathKey, ev.second);
1 year ago
msg.pathid = info.txID;
msg.nonce = ev.second ^ nonceXOR;
std::memcpy(msg.enc.data(), buf, sz);
1 year ago
if (m_UpstreamGather.tryPushBack(msg) != thread::QueueReturn::Success)
break;
}
1 year ago
// Flush it:
r->loop()->call([self = shared_from_this(), r] {
std::vector<RelayUpstreamMessage> msgs;
while (auto maybe = self->m_UpstreamGather.tryPopFront())
{
1 year ago
msgs.push_back(*maybe);
}
1 year ago
self->HandleAllUpstream(std::move(msgs), r);
});
}
1 year ago
void
TransitHop::HandleAllUpstream(std::vector<RelayUpstreamMessage> msgs, Router* r)
1 year ago
{
if (IsEndpoint(r->pubkey()))
{
1 year ago
for (const auto& msg : msgs)
{
1 year ago
const llarp_buffer_t buf(msg.enc);
if (!r->ParseRoutingMessageBuffer(buf, *this, info.rxID))
{
1 year ago
LogWarn("invalid upstream data on endpoint ", info);
}
m_LastActivity = r->now();
}
1 year ago
FlushDownstream(r);
for (const auto& other : m_FlushOthers)
{
1 year ago
other->FlushDownstream(r);
}
1 year ago
m_FlushOthers.clear();
}
1 year ago
else
{
for (const auto& msg : msgs)
{
llarp::LogDebug(
"relay ",
1 year ago
msg.enc.size(),
" bytes upstream from ",
info.downstream,
" to ",
1 year ago
info.upstream);
r->send_data_message(info.upstream, msg.bt_encode());
}
}
1 year ago
r->TriggerPump();
}
1 year ago
void
TransitHop::HandleAllDownstream(std::vector<RelayDownstreamMessage> msgs, Router* r)
1 year ago
{
for (const auto& msg : msgs)
{
log::debug(
path_cat,
"Relaying {} bytes downstream from {} to {}",
1 year ago
msg.enc.size(),
info.upstream,
info.downstream);
// TODO: is this right?
r->send_data_message(info.downstream, msg.bt_encode());
}
1 year ago
r->TriggerPump();
}
1 year ago
void
TransitHop::FlushUpstream(Router* r)
1 year ago
{
if (not m_UpstreamQueue.empty())
{
r->queue_work([self = shared_from_this(),
data = std::exchange(m_UpstreamQueue, {}),
r]() mutable { self->UpstreamWork(std::move(data), r); });
}
1 year ago
}
1 year ago
void
TransitHop::FlushDownstream(Router* r)
1 year ago
{
if (not m_DownstreamQueue.empty())
{
r->queue_work([self = shared_from_this(),
data = std::exchange(m_DownstreamQueue, {}),
r]() mutable { self->DownstreamWork(std::move(data), r); });
}
1 year ago
}
1 year ago
std::string
TransitHop::ToString() const
{
return fmt::format(
"[TransitHop {} started={} lifetime={}", info, started.count(), lifetime.count());
}
void
TransitHop::Stop()
{
m_UpstreamGather.disable();
m_DownstreamGather.disable();
}
void
TransitHop::SetSelfDestruct()
{
destroy = true;
}
void
TransitHop::QueueDestroySelf(Router* r)
1 year ago
{
r->loop()->call([self = shared_from_this()] { self->SetSelfDestruct(); });
}
} // namespace llarp::path