Merge pull request #732 from tewinget/path-build-status-messages

Adds Link-Relay Status Messages
This commit is contained in:
Jeff 2019-07-28 18:21:43 -04:00 committed by GitHub
commit c08f8361a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 803 additions and 87 deletions

View File

@ -344,11 +344,11 @@ request a commit to relay traffic to another node.
{
a: "c",
c: [ list, of, encrypted, frames ],
c: [ list, of, encrypted, LRCR frames ],
v: 0
}
c and r MUST contain dummy records if the hop length is less than the maximum
c MUST contain dummy records if the hop length is less than the maximum
hop length.
link relay commit record (LRCR)
@ -365,7 +365,7 @@ the path is extended by w.y seconds
{
c: "<32 byte public encryption key used for upstream>",
d: uint_optional_ms_delay,
d: uint_optional_ms_delay, // TODO
i: "<32 byte RC.k of next hop>",
l: uint_optional_lifespan,
n: "<32 bytes nounce for key exchange>",
@ -396,6 +396,39 @@ this proof of work requirement is subject to change
if i is equal to RC.k then any LRDM.x values are decrypted and interpreted as
routing layer messages. This indicates that we are the farthest hop in the path.
link relay status message (LRSM)
response to path creator about build status
{
a: "s",
c: [ list, of, encrypted, LRSR frames],
p: "<16 bytes rx path id>",
s: uint_status_flags, // for now, only set (or don't) success bit
v: 0
}
the creator of the LRSM MUST include dummy LRSR records
to pad out to the maximum path length
link relay status record (LRSR)
record indicating status of path build
{
s: uint_status_flags,
v: 0
}
uint_status_flags (bitwise booleans):
[0] = success
[1] = fail, hop timeout
[2] = fail, congestion
[3] = fail, refusal, next hop is not known to be a snode
[4] = fail, used by hop creator when decrypting frames if decryption fails
[5] = fail, used by hop creator when record decode fails
[4-63] reserved
link relay upstream message (LRUM)
sent to relay data via upstream direction of a previously created path.

View File

@ -189,6 +189,7 @@ set(LIB_SRC
messages/link_message.cpp
messages/relay.cpp
messages/relay_commit.cpp
messages/relay_status.cpp
net/address_info.cpp
net/exit_info.cpp
nodedb.cpp

View File

@ -7,42 +7,31 @@
namespace llarp
{
bool
EncryptedFrame::EncryptInPlace(const SecretKey& ourSecretKey,
const PubKey& otherPubkey)
EncryptedFrame::DoEncrypt(const SharedSecret& shared, bool noDH)
{
// format of frame is
// <32 bytes keyed hash of following data>
// <32 bytes nonce>
// <32 bytes pubkey>
// <N bytes encrypted payload>
//
byte_t* hash = data();
byte_t* noncePtr = hash + SHORTHASHSIZE;
byte_t* pubkey = noncePtr + TUNNONCESIZE;
byte_t* body = pubkey + PUBKEYSIZE;
SharedSecret shared;
auto crypto = CryptoManager::instance();
// if noDH flag, means key exchange has already taken place
// in this case, set pubkey to random noise and choose a
// random nonce here
if(noDH)
{
crypto->randbytes(noncePtr, TUNNONCESIZE);
crypto->randbytes(pubkey, PUBKEYSIZE);
}
TunnelNonce nonce(noncePtr);
llarp_buffer_t buf;
buf.base = body;
buf.cur = buf.base;
buf.sz = size() - EncryptedFrameOverheadSize;
auto crypto = CryptoManager::instance();
// set our pubkey
memcpy(pubkey, ourSecretKey.toPublic().data(), PUBKEYSIZE);
// randomize nonce
crypto->randbytes(noncePtr, TUNNONCESIZE);
TunnelNonce nonce(noncePtr);
// derive shared key
if(!crypto->dh_client(shared, otherPubkey, ourSecretKey, nonce))
{
llarp::LogError("DH failed");
return false;
}
// encrypt body
if(!crypto->xchacha20(buf, shared, nonce))
{
@ -60,11 +49,13 @@ namespace llarp
llarp::LogError("Failed to generate message auth");
return false;
}
return true;
}
bool
EncryptedFrame::DecryptInPlace(const SecretKey& ourSecretKey)
EncryptedFrame::EncryptInPlace(const SecretKey& ourSecretKey,
const PubKey& otherPubkey)
{
// format of frame is
// <32 bytes keyed hash of following data>
@ -72,23 +63,40 @@ namespace llarp
// <32 bytes pubkey>
// <N bytes encrypted payload>
//
ShortHash hash(data());
byte_t* noncePtr = data() + SHORTHASHSIZE;
byte_t* body = data() + EncryptedFrameOverheadSize;
TunnelNonce nonce(noncePtr);
PubKey otherPubkey(noncePtr + TUNNONCESIZE);
byte_t* hash = data();
byte_t* noncePtr = hash + SHORTHASHSIZE;
byte_t* pubkey = noncePtr + TUNNONCESIZE;
SharedSecret shared;
auto crypto = CryptoManager::instance();
// use dh_server because we are not the creator of this message
if(!crypto->dh_server(shared, otherPubkey, ourSecretKey, nonce))
// set our pubkey
memcpy(pubkey, ourSecretKey.toPublic().data(), PUBKEYSIZE);
// randomize nonce
crypto->randbytes(noncePtr, TUNNONCESIZE);
TunnelNonce nonce(noncePtr);
// derive shared key
if(!crypto->dh_client(shared, otherPubkey, ourSecretKey, nonce))
{
llarp::LogError("DH failed");
return false;
}
return DoEncrypt(shared, false);
}
bool
EncryptedFrame::DoDecrypt(const SharedSecret& shared)
{
ShortHash hash(data());
byte_t* noncePtr = data() + SHORTHASHSIZE;
byte_t* body = data() + EncryptedFrameOverheadSize;
TunnelNonce nonce(noncePtr);
auto crypto = CryptoManager::instance();
llarp_buffer_t buf;
buf.base = noncePtr;
buf.cur = buf.base;
@ -116,6 +124,34 @@ namespace llarp
llarp::LogError("decrypt failed");
return false;
}
return true;
}
bool
EncryptedFrame::DecryptInPlace(const SecretKey& ourSecretKey)
{
// format of frame is
// <32 bytes keyed hash of following data>
// <32 bytes nonce>
// <32 bytes pubkey>
// <N bytes encrypted payload>
//
byte_t* noncePtr = data() + SHORTHASHSIZE;
TunnelNonce nonce(noncePtr);
PubKey otherPubkey(noncePtr + TUNNONCESIZE);
SharedSecret shared;
auto crypto = CryptoManager::instance();
// use dh_server because we are not the creator of this message
if(!crypto->dh_server(shared, otherPubkey, ourSecretKey, nonce))
{
llarp::LogError("DH failed");
return false;
}
return DoDecrypt(shared);
}
} // namespace llarp

View File

@ -37,9 +37,15 @@ namespace llarp
}
}
bool
DoEncrypt(const SharedSecret& shared, bool noDH = false);
bool
DecryptInPlace(const SecretKey& seckey);
bool
DoDecrypt(const SharedSecret& shared);
bool
EncryptInPlace(const SecretKey& seckey, const PubKey& other);
};

View File

@ -5,6 +5,7 @@
#include <messages/link_intro.hpp>
#include <messages/link_message.hpp>
#include <messages/relay_commit.hpp>
#include <messages/relay_status.hpp>
#include <messages/relay.hpp>
#include <router_contact.hpp>
#include <util/buffer.hpp>
@ -22,6 +23,7 @@ namespace llarp
RelayUpstreamMessage u;
DHTImmediateMessage m;
LR_CommitMessage c;
LR_StatusMessage s;
DiscardMessage x;
msg_holder_t() = default;
@ -88,6 +90,9 @@ namespace llarp
case 'c':
msg = &holder->c;
break;
case 's':
msg = &holder->s;
break;
case 'x':
msg = &holder->x;
break;

View File

@ -1,10 +1,12 @@
#include <messages/relay_commit.hpp>
#include <messages/relay_status.hpp>
#include <crypto/crypto.hpp>
#include <nodedb.hpp>
#include <path/path_context.hpp>
#include <path/transit_hop.hpp>
#include <router/abstractrouter.hpp>
#include <router/i_outbound_message_handler.hpp>
#include <routing/path_confirm_message.hpp>
#include <util/bencode.hpp>
#include <util/buffer.hpp>
@ -195,6 +197,45 @@ namespace llarp
{
}
static void
OnForwardLRCMResult(AbstractRouter* router, const PathID_t pathid,
const RouterID nextHop, const SharedSecret pathKey,
SendStatus sendStatus)
{
uint64_t status = LR_StatusRecord::FAIL_DEST_INVALID;
switch(sendStatus)
{
case SendStatus::Success:
// do nothing, will forward success message later
return;
case SendStatus::Timeout:
status = LR_StatusRecord::FAIL_TIMEOUT;
break;
case SendStatus::NoLink:
status = LR_StatusRecord::FAIL_CANNOT_CONNECT;
break;
case SendStatus::InvalidRouter:
status = LR_StatusRecord::FAIL_DEST_INVALID;
break;
case SendStatus::RouterNotFound:
status = LR_StatusRecord::FAIL_DEST_UNKNOWN;
break;
case SendStatus::Congestion:
status = LR_StatusRecord::FAIL_CONGESTION;
break;
default:
LogError("llarp::SendStatus value not in enum class");
std::abort();
break;
}
auto func = std::bind(&LR_StatusMessage::CreateAndSend, router, pathid,
nextHop, pathKey, status);
router->threadpool()->addJob(func);
}
/// this is done from logic thread
static void
SendLRCM(std::shared_ptr< LRCMFrameDecrypt > self)
@ -205,6 +246,9 @@ namespace llarp
// we are not allowed to forward it ... now what?
llarp::LogError("path to ", self->hop->info.upstream,
"not allowed, dropping build request on the floor");
OnForwardLRCMResult(self->context->Router(), self->hop->info.rxID,
self->hop->info.downstream, self->hop->pathKey,
SendStatus::InvalidRouter);
self->hop = nullptr;
return;
}
@ -227,24 +271,17 @@ namespace llarp
if(self->record.nextRC->IsPublicRouter()
&& self->record.nextRC->Verify(now))
{
llarp_nodedb* n = self->context->Router()->nodedb();
const RouterContact rc = *self->record.nextRC;
// store it into netdb if we don't have it
if(!n->Has(rc.pubkey))
{
std::shared_ptr< Logic > l = self->context->Router()->logic();
n->InsertAsync(rc, l, [=]() {
self->context->ForwardLRCM(self->hop->info.upstream,
self->frames);
self->hop = nullptr;
});
return;
}
self->context->Router()->nodedb()->UpdateAsyncIfNewer(
*self->record.nextRC.get());
}
}
}
// forward to next hop
self->context->ForwardLRCM(self->hop->info.upstream, self->frames);
using std::placeholders::_1;
auto func = std::bind(&OnForwardLRCMResult, self->context->Router(),
self->hop->info.rxID, self->hop->info.downstream,
self->hop->pathKey, _1);
self->context->ForwardLRCM(self->hop->info.upstream, self->frames, func);
self->hop = nullptr;
}
@ -257,9 +294,14 @@ namespace llarp
self->hop->info.downstream, self->hop->ExpireTime() + 10000);
// put hop
self->context->PutTransitHop(self->hop);
// send path confirmation
const llarp::routing::PathConfirmMessage confirm(self->hop->lifetime);
if(!self->hop->SendRoutingMessage(confirm, self->context->Router()))
// TODO: other status flags?
uint64_t status = LR_StatusRecord::SUCCESS;
if(!LR_StatusMessage::CreateAndSend(
self->context->Router(), self->hop->info.rxID,
self->hop->info.downstream, self->hop->pathKey, status))
{
llarp::LogError("failed to send path confirmation for ",
self->hop->info);

View File

@ -0,0 +1,279 @@
#include <messages/relay_status.hpp>
#include <crypto/crypto.hpp>
#include <path/path_context.hpp>
#include <path/ihophandler.hpp>
#include <router/abstractrouter.hpp>
#include <routing/path_confirm_message.hpp>
#include <util/bencode.hpp>
#include <util/buffer.hpp>
#include <util/logger.hpp>
#include <util/logic.hpp>
#include <util/memfn.hpp>
#include <functional>
namespace llarp
{
struct LRSM_AsyncHandler
: public std::enable_shared_from_this< LRSM_AsyncHandler >
{
using HopHandler_ptr = std::shared_ptr< llarp::path::IHopHandler >;
std::array< EncryptedFrame, 8 > frames;
uint64_t status = 0;
HopHandler_ptr path;
AbstractRouter* router;
LRSM_AsyncHandler(const std::array< EncryptedFrame, 8 >& _frames,
uint64_t _status, HopHandler_ptr _path,
AbstractRouter* _router)
: frames(_frames), status(_status), path(_path), router(_router)
{
}
~LRSM_AsyncHandler() = default;
void
handle()
{
path->HandleLRSM(status, frames, router);
}
void
queue_handle()
{
auto func =
std::bind(&llarp::LRSM_AsyncHandler::handle, shared_from_this());
router->threadpool()->addJob(func);
}
};
bool
LR_StatusMessage::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* buf)
{
bool read = false;
if(key == "c")
{
return BEncodeReadArray(frames, buf);
}
else if(key == "p")
{
if(!BEncodeMaybeReadDictEntry("p", pathid, read, key, buf))
{
return false;
}
}
else if(key == "s")
{
if(!BEncodeMaybeReadDictInt("s", status, read, key, buf))
{
return false;
}
}
else if(key == "v")
{
if(!BEncodeMaybeReadVersion("v", version, LLARP_PROTO_VERSION, read, key,
buf))
{
return false;
}
}
return read;
}
void
LR_StatusMessage::Clear()
{
std::for_each(frames.begin(), frames.end(), [](auto& f) { f.Clear(); });
}
bool
LR_StatusMessage::BEncode(llarp_buffer_t* buf) const
{
if(!bencode_start_dict(buf))
return false;
// msg type
if(!BEncodeWriteDictMsgType(buf, "a", "s"))
return false;
// frames
if(!BEncodeWriteDictArray("c", frames, buf))
return false;
// path id
if(!BEncodeWriteDictEntry("p", pathid, buf))
return false;
// status (for now, only success bit is relevant)
if(!BEncodeWriteDictInt("s", status, buf))
return false;
// version
if(!bencode_write_version_entry(buf))
return false;
return bencode_end(buf);
}
bool
LR_StatusMessage::HandleMessage(AbstractRouter* router) const
{
llarp::LogDebug("Received LR_Status message from (", session->GetPubKey(),
")");
if(frames.size() != path::max_len)
{
llarp::LogError("LRSM invalid number of records, ", frames.size(),
"!=", path::max_len);
return false;
}
auto path =
router->pathContext().GetByUpstream(session->GetPubKey(), pathid);
if(!path)
{
llarp::LogWarn(
"unhandled LR_Status message: no associated IHopHandler found");
return false;
}
auto handler =
std::make_shared< LRSM_AsyncHandler >(frames, status, path, router);
handler->queue_handle();
return true;
}
void
LR_StatusMessage::SetDummyFrames()
{
// TODO
return;
}
// call this from a worker thread
bool
LR_StatusMessage::CreateAndSend(AbstractRouter* router, const PathID_t pathid,
const RouterID nextHop,
const SharedSecret pathKey, uint64_t status)
{
auto message = std::make_shared< LR_StatusMessage >();
message->status = status & LR_StatusRecord::SUCCESS;
message->pathid = pathid;
message->SetDummyFrames();
if(!message->AddFrame(pathKey, status))
{
return false;
}
QueueSendMessage(router, nextHop, message);
return true; // can't guarantee delivery here, as far as we know it's fine
}
bool
LR_StatusMessage::AddFrame(const SharedSecret& pathKey, uint64_t status)
{
frames[7] = frames[6];
frames[6] = frames[5];
frames[5] = frames[4];
frames[4] = frames[3];
frames[3] = frames[2];
frames[2] = frames[1];
frames[1] = frames[0];
auto& frame = frames[0];
frame.Randomize();
LR_StatusRecord record;
record.status = status;
record.version = LLARP_PROTO_VERSION;
llarp_buffer_t buf(frame.data(), frame.size());
buf.cur = buf.base + EncryptedFrameOverheadSize;
// encode record
if(!record.BEncode(&buf))
{
// failed to encode?
LogError(Name(), " Failed to generate Status Record");
DumpBuffer(buf);
return false;
}
// use ephemeral keypair for frame
if(!frame.DoEncrypt(pathKey, true))
{
LogError(Name(), " Failed to encrypt LRSR");
DumpBuffer(buf);
return false;
}
return true;
}
void
LR_StatusMessage::QueueSendMessage(AbstractRouter* router,
const RouterID nextHop,
std::shared_ptr< LR_StatusMessage > msg)
{
auto func = std::bind(&LR_StatusMessage::SendMessage, router, nextHop, msg);
router->pathContext().logic()->queue_func(func);
}
void
LR_StatusMessage::SendMessage(AbstractRouter* router, const RouterID nextHop,
std::shared_ptr< LR_StatusMessage > msg)
{
llarp::LogDebug("Attempting to send LR_Status message to (", nextHop, ")");
if(not router->HasSessionTo(nextHop))
{
llarp::LogError(
"Sending LR_Status message, but no connection to previous hop (",
nextHop, ")");
return;
}
if(not router->SendToOrQueue(nextHop, msg.get()))
{
llarp::LogError("Sending LR_Status message, SendToOrQueue to ", nextHop,
" failed");
}
}
bool
LR_StatusRecord::BEncode(llarp_buffer_t* buf) const
{
return bencode_start_dict(buf) && BEncodeWriteDictInt("s", status, buf)
&& bencode_write_version_entry(buf) && bencode_end(buf);
}
bool
LR_StatusRecord::OnKey(llarp_buffer_t* buffer, llarp_buffer_t* key)
{
if(!key)
return true;
bool read = false;
if(!BEncodeMaybeReadDictInt("s", status, read, *key, buffer))
return false;
if(!BEncodeMaybeReadVersion("v", version, LLARP_PROTO_VERSION, read, *key,
buffer))
return false;
return read;
}
bool
LR_StatusRecord::BDecode(llarp_buffer_t* buf)
{
return bencode_read_dict(util::memFn(&LR_StatusRecord::OnKey, this), buf);
}
bool
LR_StatusRecord::operator==(const LR_StatusRecord& other) const
{
return status == other.status;
}
} // namespace llarp

View File

@ -0,0 +1,107 @@
#ifndef LLARP_RELAY_STATUS_HPP
#define LLARP_RELAY_STATUS_HPP
#include <crypto/encrypted_frame.hpp>
#include <crypto/types.hpp>
#include <messages/link_message.hpp>
#include <path/path_types.hpp>
#include <pow.hpp>
#include <array>
#include <memory>
namespace llarp
{
// forward declare
struct AbstractRouter;
namespace path
{
struct PathContext;
struct IHopHandler;
} // namespace path
struct LR_StatusRecord
{
static constexpr uint64_t SUCCESS = 1;
static constexpr uint64_t FAIL_TIMEOUT = 1 << 1;
static constexpr uint64_t FAIL_CONGESTION = 1 << 2;
static constexpr uint64_t FAIL_DEST_UNKNOWN = 1 << 3;
static constexpr uint64_t FAIL_DECRYPT_ERROR = 1 << 4;
static constexpr uint64_t FAIL_MALFORMED_RECORD = 1 << 5;
static constexpr uint64_t FAIL_DEST_INVALID = 1 << 6;
static constexpr uint64_t FAIL_CANNOT_CONNECT = 1 << 7;
uint64_t status = 0;
uint64_t version = 0;
bool
BDecode(llarp_buffer_t *buf);
bool
BEncode(llarp_buffer_t *buf) const;
bool
operator==(const LR_StatusRecord &other) const;
private:
bool
OnKey(llarp_buffer_t *buffer, llarp_buffer_t *key);
};
struct LR_StatusMessage : public ILinkMessage
{
std::array< EncryptedFrame, 8 > frames;
PathID_t pathid;
uint64_t status = 0;
LR_StatusMessage(const std::array< EncryptedFrame, 8 > &_frames)
: ILinkMessage(), frames(_frames)
{
}
LR_StatusMessage() = default;
~LR_StatusMessage() = default;
void
Clear() override;
bool
DecodeKey(const llarp_buffer_t &key, llarp_buffer_t *buf) override;
bool
BEncode(llarp_buffer_t *buf) const override;
bool
HandleMessage(AbstractRouter *router) const override;
void
SetDummyFrames();
static bool
CreateAndSend(AbstractRouter *router, const PathID_t pathid,
const RouterID nextHop, const SharedSecret pathKey,
uint64_t status);
bool
AddFrame(const SharedSecret &pathKey, uint64_t status);
static void
QueueSendMessage(AbstractRouter *router, const RouterID nextHop,
std::shared_ptr< LR_StatusMessage > msg);
static void
SendMessage(AbstractRouter *router, const RouterID nextHop,
std::shared_ptr< LR_StatusMessage > msg);
const char *
Name() const override
{
return "RelayStatus";
}
};
} // namespace llarp
#endif

View File

@ -3,6 +3,7 @@
#include <crypto/types.hpp>
#include <util/types.hpp>
#include <crypto/encrypted_frame.hpp>
#include <memory>
@ -49,6 +50,10 @@ namespace llarp
virtual llarp_time_t
LastRemoteActivityAt() const = 0;
virtual bool
HandleLRSM(uint64_t status, std::array< EncryptedFrame, 8 >& frames,
AbstractRouter* r) = 0;
uint64_t
NextSeqNo()
{

View File

@ -3,6 +3,7 @@
#include <exit/exit_messages.hpp>
#include <messages/discard.hpp>
#include <messages/relay_commit.hpp>
#include <messages/relay_status.hpp>
#include <path/pathbuilder.hpp>
#include <path/transit_hop.hpp>
#include <profiling.hpp>
@ -100,10 +101,116 @@ namespace llarp
return ss.str();
}
bool
Path::HandleLRSM(uint64_t status, std::array< EncryptedFrame, 8 >& frames,
AbstractRouter* r)
{
uint64_t currentStatus = LR_StatusRecord::SUCCESS;
size_t index = 0;
while(index < hops.size())
{
if(!frames[index].DoDecrypt(hops[index].shared))
{
currentStatus = LR_StatusRecord::FAIL_DECRYPT_ERROR;
break;
}
llarp::LogDebug("decrypted LRSM frame from ", hops[index].rc.pubkey);
llarp_buffer_t* buf = frames[index].Buffer();
buf->cur = buf->base + EncryptedFrameOverheadSize;
LR_StatusRecord record;
// successful decrypt
if(!record.BDecode(buf))
{
llarp::LogWarn("malformed frame inside LRCM from ",
hops[index].rc.pubkey);
currentStatus = LR_StatusRecord::FAIL_MALFORMED_RECORD;
break;
}
llarp::LogDebug("Decoded LR Status Record from ",
hops[index].rc.pubkey);
currentStatus = record.status;
if((currentStatus & LR_StatusRecord::SUCCESS) == 0)
{
break;
}
++index;
}
if((currentStatus & LR_StatusRecord::SUCCESS) == 1)
{
llarp::LogDebug("LR_Status message processed, path build successful");
HandlePathConfirmMessage(r);
}
else
{
r->routerProfiling().MarkPathFail(this);
llarp::LogInfo("LR_Status message processed, path build failed");
if(currentStatus & LR_StatusRecord::FAIL_TIMEOUT)
{
llarp::LogDebug("Path build failed due to timeout");
}
else if(currentStatus & LR_StatusRecord::FAIL_CONGESTION)
{
llarp::LogDebug("Path build failed due to congestion");
}
else if(currentStatus & LR_StatusRecord::FAIL_DEST_UNKNOWN)
{
llarp::LogDebug(
"Path build failed due to one or more nodes giving destination "
"unknown");
}
else if(currentStatus & LR_StatusRecord::FAIL_DEST_INVALID)
{
llarp::LogDebug(
"Path build failed due to one or more nodes considered an "
"invalid destination");
}
else if(currentStatus & LR_StatusRecord::FAIL_CANNOT_CONNECT)
{
llarp::LogDebug(
"Path build failed due to a node being unable to connect to the "
"next hop");
}
else if(currentStatus & LR_StatusRecord::FAIL_MALFORMED_RECORD)
{
llarp::LogDebug(
"Path build failed due to a malformed record in the build status "
"message");
}
else if(currentStatus & LR_StatusRecord::FAIL_DECRYPT_ERROR)
{
llarp::LogDebug(
"Path build failed due to a decrypt error in the build status "
"message");
}
else
{
llarp::LogDebug("Path build failed for an unspecified reason");
}
EnterState(ePathFailed, r->Now());
}
// TODO: meaningful return value?
return true;
}
void
Path::EnterState(PathStatus st, llarp_time_t now)
{
if(st == ePathExpired && _status == ePathBuilding)
if(st == ePathFailed)
{
_status = st;
}
else if(st == ePathExpired && _status == ePathBuilding)
{
_status = st;
m_PathSet->HandlePathBuildTimeout(shared_from_this());
@ -176,6 +283,9 @@ namespace llarp
case ePathExpired:
obj.Put("status", "expired");
break;
case ePathFailed:
obj.Put("status", "failed");
break;
case ePathIgnore:
obj.Put("status", "ignored");
break;
@ -271,6 +381,8 @@ namespace llarp
bool
Path::Expired(llarp_time_t now) const
{
if(_status == ePathFailed)
return true;
if(_status == ePathEstablished || _status == ePathTimeout)
return now >= ExpireTime();
if(_status == ePathBuilding)
@ -385,11 +497,9 @@ namespace llarp
}
bool
Path::HandlePathConfirmMessage(
ABSL_ATTRIBUTE_UNUSED const routing::PathConfirmMessage& msg,
AbstractRouter* r)
Path::HandlePathConfirmMessage(AbstractRouter* r)
{
LogDebug("Path built: ", HopsString());
LogDebug("Path Build Confirm, path: ", HopsString());
auto now = r->Now();
if(_status == ePathBuilding)
{
@ -413,6 +523,14 @@ namespace llarp
return false;
}
bool
Path::HandlePathConfirmMessage(
ABSL_ATTRIBUTE_UNUSED const routing::PathConfirmMessage& msg,
AbstractRouter* r)
{
return HandlePathConfirmMessage(r);
}
bool
Path::HandleHiddenServiceFrame(const service::ProtocolFrame& frame)
{

View File

@ -156,6 +156,10 @@ namespace llarp
return m_LastRecvMessage;
}
bool
HandleLRSM(uint64_t status, std::array< EncryptedFrame, 8 >& frames,
AbstractRouter* r) override;
void
SetBuildResultHook(BuildResultHookFunc func);
@ -255,6 +259,9 @@ namespace llarp
HandleDataDiscardMessage(const routing::DataDiscardMessage& msg,
AbstractRouter* r) override;
bool
HandlePathConfirmMessage(AbstractRouter* r);
bool
HandlePathConfirmMessage(const routing::PathConfirmMessage& msg,
AbstractRouter* r) override;

View File

@ -3,6 +3,7 @@
#include <messages/relay_commit.hpp>
#include <path/path.hpp>
#include <router/abstractrouter.hpp>
#include <router/i_outbound_message_handler.hpp>
namespace llarp
{
@ -65,30 +66,21 @@ namespace llarp
bool
PathContext::ForwardLRCM(const RouterID& nextHop,
const std::array< EncryptedFrame, 8 >& frames)
const std::array< EncryptedFrame, 8 >& frames,
SendStatusHandler handler)
{
if(handler == nullptr)
{
LogError("Calling ForwardLRCM without passing result handler");
return false;
}
auto msg = std::make_shared< const LR_CommitMessage >(frames);
LogDebug("forwarding LRCM to ", nextHop);
if(m_Router->HasSessionTo(nextHop))
{
return m_Router->SendToOrQueue(nextHop, msg.get());
}
const RouterID router = nextHop;
AbstractRouter* const r = m_Router;
m_Router->EnsureRouter(
nextHop, [msg, r, router](const std::vector< RouterContact >& found) {
if(found.size())
{
r->TryConnectAsync(found[0], 1);
r->SendToOrQueue(router, msg.get());
}
else
LogError("dropped LRCM to ", router,
" as we cannot find in via DHT");
});
LogInfo("we are not directly connected to ", router,
" so we need to do a lookup");
m_Router->SendToOrQueue(nextHop, msg.get(), handler);
return true;
}
template < typename Map_t, typename Key_t, typename CheckValue_t,

View File

@ -7,6 +7,7 @@
#include <path/pathset.hpp>
#include <path/transit_hop.hpp>
#include <routing/handler.hpp>
#include <router/i_outbound_message_handler.hpp>
#include <util/compare_ptr.hpp>
#include <util/types.hpp>
@ -79,7 +80,8 @@ namespace llarp
bool
ForwardLRCM(const RouterID& nextHop,
const std::array< EncryptedFrame, 8 >& frames);
const std::array< EncryptedFrame, 8 >& frames,
SendStatusHandler handler);
bool
HopIsUs(const RouterID& k) const;

View File

@ -34,6 +34,7 @@ namespace llarp
ePathBuilding,
ePathEstablished,
ePathTimeout,
ePathFailed,
ePathIgnore,
ePathExpired
};

View File

@ -5,6 +5,7 @@
#include <exit/exit_messages.hpp>
#include <messages/discard.hpp>
#include <messages/relay_commit.hpp>
#include <messages/relay_status.hpp>
#include <path/path_context.hpp>
#include <path/transit_hop.hpp>
#include <router/abstractrouter.hpp>
@ -37,7 +38,7 @@ namespace llarp
bool
TransitHop::Expired(llarp_time_t now) const
{
return now >= ExpireTime();
return destroy || (now >= ExpireTime());
}
llarp_time_t
@ -46,6 +47,36 @@ namespace llarp
return started + lifetime;
}
bool
TransitHop::HandleLRSM(uint64_t status,
std::array< EncryptedFrame, 8 >& frames,
AbstractRouter* r)
{
auto msg = std::make_shared< LR_StatusMessage >(frames);
msg->status = status;
msg->pathid = info.rxID;
// TODO: add to IHopHandler some notion of "path status"
const uint64_t ourStatus = LR_StatusRecord::SUCCESS;
if(!msg->AddFrame(pathKey, ourStatus))
{
return false;
}
LR_StatusMessage::QueueSendMessage(r, info.downstream, msg);
if((status & LR_StatusRecord::SUCCESS) == 0)
{
LogDebug(
"TransitHop received non-successful LR_StatusMessage, queueing "
"self-destruct");
QueueDestroySelf(r);
}
return true;
}
TransitHopInfo::TransitHopInfo(const RouterID& down,
const LR_CommitRecord& record)
: txID(record.txid)
@ -325,5 +356,18 @@ namespace llarp
return stream;
}
void
TransitHop::SetSelfDestruct()
{
destroy = true;
}
void
TransitHop::QueueDestroySelf(AbstractRouter* r)
{
auto func = std::bind(&TransitHop::SetSelfDestruct, this);
r->logic()->queue_func(func);
}
} // namespace path
} // namespace llarp

View File

@ -92,6 +92,8 @@ namespace llarp
llarp_proto_version_t version;
llarp_time_t m_LastActivity = 0;
bool destroy = false;
bool
IsEndpoint(const RouterID& us) const
{
@ -107,6 +109,10 @@ namespace llarp
return m_LastActivity;
}
bool
HandleLRSM(uint64_t status, std::array< EncryptedFrame, 8 >& frames,
AbstractRouter* r) override;
std::ostream&
print(std::ostream& stream, int level, int spaces) const;
@ -132,6 +138,9 @@ namespace llarp
HandleDataDiscardMessage(const routing::DataDiscardMessage& msg,
AbstractRouter* r) override;
bool
HandlePathConfirmMessage(AbstractRouter* r);
bool
HandlePathConfirmMessage(const routing::PathConfirmMessage& msg,
AbstractRouter* r) override;
@ -193,6 +202,13 @@ namespace llarp
bool
HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y,
AbstractRouter* r) override;
private:
void
SetSelfDestruct();
void
QueueDestroySelf(AbstractRouter* r);
};
inline std::ostream&

View File

@ -3,6 +3,7 @@
#include <util/types.hpp>
#include <util/status.hpp>
#include <router/i_outbound_message_handler.hpp>
#include <vector>
#include <ev/ev.h>
#include <functional>
@ -174,7 +175,8 @@ namespace llarp
GetRandomGoodRouter(RouterID &r) = 0;
virtual bool
SendToOrQueue(const RouterID &remote, const ILinkMessage *msg) = 0;
SendToOrQueue(const RouterID &remote, const ILinkMessage *msg,
SendStatusHandler handler = nullptr) = 0;
virtual void
PersistSessionUntil(const RouterID &remote, llarp_time_t until) = 0;

View File

@ -168,11 +168,15 @@ namespace llarp
}
bool
Router::SendToOrQueue(const RouterID &remote, const ILinkMessage *msg)
Router::SendToOrQueue(const RouterID &remote, const ILinkMessage *msg,
SendStatusHandler handler)
{
using std::placeholders::_1;
auto func = std::bind(&Router::MessageSent, this, remote, _1);
return _outboundMessageHandler.QueueMessage(remote, msg, func);
if(handler == nullptr)
{
using std::placeholders::_1;
handler = std::bind(&Router::MessageSent, this, remote, _1);
}
return _outboundMessageHandler.QueueMessage(remote, msg, handler);
}
void

View File

@ -387,7 +387,8 @@ namespace llarp
/// NOT threadsafe
/// MUST be called in the logic thread
bool
SendToOrQueue(const RouterID &remote, const ILinkMessage *msg) override;
SendToOrQueue(const RouterID &remote, const ILinkMessage *msg,
SendStatusHandler handler) override;
void
ForEachPeer(std::function< void(const ILinkSession *, bool) > visit,

View File

@ -59,10 +59,17 @@ llarp_threadpool_wait(struct llarp_threadpool *pool)
void
llarp_threadpool_queue_job(struct llarp_threadpool *pool,
struct llarp_thread_job job)
{
return llarp_threadpool_queue_job(pool, (std::bind(job.work, job.user)));
}
void
llarp_threadpool_queue_job(struct llarp_threadpool *pool,
std::function< void() > func)
{
if(pool->impl)
{
while(!pool->impl->tryAddJob(std::bind(job.work, job.user)))
while(!pool->impl->tryAddJob(func))
{
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
@ -70,8 +77,7 @@ llarp_threadpool_queue_job(struct llarp_threadpool *pool,
else
{
// single threaded mode
while(pool->jobs->tryPushBack(std::bind(job.work, job.user))
!= llarp::thread::QueueReturn::Success)
while(pool->jobs->tryPushBack(func) != llarp::thread::QueueReturn::Success)
{
if(!pool->jobs->enabled())
return;

View File

@ -86,6 +86,15 @@ llarp_threadpool_tick(struct llarp_threadpool *tp);
void
llarp_threadpool_queue_job(struct llarp_threadpool *tp,
struct llarp_thread_job j);
#ifdef __cplusplus
void
llarp_threadpool_queue_job(struct llarp_threadpool *tp,
std::function< void() > func);
#endif
void
llarp_threadpool_start(struct llarp_threadpool *tp);