Merge pull request #359 from majestrate/fix-libabyss-352

Fix #352
This commit is contained in:
Jeff 2019-02-27 11:18:46 -05:00 committed by GitHub
commit c10f7d82cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 698 additions and 559 deletions

View File

@ -51,7 +51,7 @@ GRADLE ?= gradle
JAVA_HOME ?= /usr/lib/jvm/default-java
# jsonrpc server
JSONRPC ?= OFF
JSONRPC ?= ON
# native avx2 code
AVX2 ?= OFF
# non x86 target
@ -63,7 +63,7 @@ NETNS ?= OFF
# cross compile?
CROSS ?= OFF
# build liblokinet-shared.so
SHARED_LIB ?= ON
SHARED_LIB ?= OFF
# enable generating coverage
COVERAGE ?= OFF
COVERAGE_OUTDIR ?= "$(TMPDIR)/lokinet-coverage"

View File

@ -9,23 +9,66 @@ LLARP supports by default an authenticated message transport over a
datagram based network layer.
protocol phases:
first phase: proof of flow
second phase: session handshake
thrid phase: data transmission
proof of flow:
At any time before the data transfer phase a reject message
is sent the session is reset.
Alice (A) is the sender and Bob (B) is the recipiant.
A asks for a flow id from B.
B MAY send a flow id to A or MAY reject the message from A.
session handshake:
an encrypted session is established using establish wire session messages
using a newly created flow id.
outer message format:
every outer message MAY be obfsucated via symettric encryption for dpi
resistance reasons, this is not authenticated encryption.
the message is first assumed to be sent in clear first.
if parsing of clear variant fails then the recipiant MUST fall back to assuming
the protocol is in obfuscated mode.
<16 bytes nounce, n>
<remaining bytes obsfucated, m>
obfuscated via:
K = HS(B_k)
N = HS(n + K)
X = SD(K, m, N[0:24])
outer-header:
<1 byte command>
<1 byte reserved, R, set to '=' (0x3d)>
<32 bytes flow id, B>
<1 byte reserved set to 0x3d>
command 'O' - obtain flow id
obtain a handshake cookie
obtain a flow id
<outer-header>
<32 bytes random>
<8 bytes net id>
<remaining discarded>
<6 magic bytes "netid?">
<8 bytes netid, I>
<8 bytes timestamp milliseconds since epoch, T>
<32 bytes ed25519 public key of sender, A_k>
<0-N bytes discarded>
<last 64 bytes signature of unobfuscated packet, Z>
the if the network id differs from the current network's id a reject message
MUST be sent
@ -35,106 +78,59 @@ MUST be replied to with a message rejected or a give handshake cookie
command 'G' - give flow id
<outer-header>
<32 byte new flow-id, X>
<remaining discarded>
<6 magic bytes "netid!">
<16 bytes new flow id>
<32 bytes ed25519 public key of sender, A_k>
<0-N bytes discarded>
<last 64 bytes signature of unobfsucated packet, Z>
give a flow id to a remote endpoint that asks for one
after recieving a give flow id message a session negotiation can happen with that flow id.
B is the B value from the request flow id message
X is a 32 byte the flow id, calcuated via:
command 'R' - flow rejected
r = RAND(32)
a = "::ffff.<ascii representation of ipv4 address>" + ":" + "<port number>"
X = HS(a + B + r + net id)
resulting message:
"G=<32 byte flowid><32 bytes new flowid>"
after recieving a give flow id message a session negotiation can happen
command 'R' - message rejected
reject new flow
<outer-header>
<N arbitrary bytes reason for rejection>
<14 ascii bytes reason for rejection null padded>
<8 bytes timestamp>
<32 bytes ed25519 public key of sender, A_k>
<0-N bytes discarded>
<last 64 bytes signature of unobsfucated packet, Z>
reject a message on a flow id
command 'E' - establish wire session
B is the flow id from the recipiant
resulting message of reject with reason "no you"
"R=<32 byte flow-id>no you"
command 'S' - session negotiation
negotiate encrypted session
establish an encrypted session using a flow id
<outer-header>
<24 bytes nounce, N>
<encrypted session negotiation data, X>
<last 32 bytes keyed hash, Z>
<16 bytes flow id, B>
<32 bytes ephemeral public encryption key, E>
<8 bytes packet counter starting at 0>
<optional 32 bytes authenticated credentials, A>
<last 64 bytes signature of unobfuscated packet using identity key, Z>
B is the flow id from the recipiant generated by the give flow id message (from outer header)
N is a random nounce
X is encrypted session negotiation data
Z is a keyed hash
every time we try establishing a wire session we increment the counter
by 1 for the next message we send.
Z is generated via:
when we get an establish wire session message
we reply with an establish wire session message with counter being counter + 1
msg.Z = '0x00' * 32
msg.Z = MDS(msg, tx_K)
if A is provided that is interpreted as being generated via:
session negotiation:
h0 = HS('<insert some password here>')
h1 = EDDH(us, them)
A = HS(B + h0 + h1)
The session starts out with each side having 2 session keys rx_K and tx_K for
decrypting inbound messages and encrypting outbound messages respectively.
each side establishes their own rx key using this message.
when each side has both established thier rx key data can be transmitted.
The initiator (alice) and the recipiant (bob) start out with static session keys
command 'D' - encrypted data transmission
k_a = HS(a.k)
k_b = HS(b.k)
a.rx_K = k_a
b.rx_K = k_b
a.tx_K = k_b
b.tx_K = k_a
decryption is done via:
SD(msg.X, rx_K, msg.N)
encrypted payload is bencoded LIM (see proto_v0.txt)
the initiator starts out by sending a LIM a_LIM to the recipiant.
the recipiant replies with a LIM b_LIM to the initiator.
when the initiator gets a valid LIM from the recipiant the session keys for data
transmission are set to:
k_a = TKE(a.k, b.k, a.sk, a_LIM.n)
k_b = TKE(b.k, a.k, b.sk, b_LIM.n)
a.rx_K = k_a
b.rx_K = k_b
a.tx_K = k_b
b.tx_K = k_a
afterwards data transmission may happen.
the intiator's remote address is permitted to change during data transmission.
remote address of the last successfully
D - encrypted data transmission
transmit encrypted data on session
transmit encrypted data on a wire session
<outer-header>
<16 bytes flow-id, F>
<24 bytes nonce, N>
<encrypted data, X>
<N encrypted data, X>
<last 32 bytes keyed hash of entire payload, Z>
@ -158,7 +154,7 @@ header:
<1 byte command>
command: 'K' (keep alive)
command: 'k' (keep alive)
tell other side to acknoledge they are alive
@ -171,7 +167,7 @@ tell other side to acknoledge they are alive
<8 bytes milliseconds since epoch our current time>
<remaining bytes discarded>
command: 'L' (keep alive ack)
command: 'l' (keep alive ack)
acknolege keep alive message
@ -183,7 +179,29 @@ acknolege keep alive message
<remaining bytes discarded>
command: 'C' (congestion)
command: 'n' (advertise neighboors)
tell peer about neighboors, only sent by non service nodes to other non service
nodes.
<header>
<route between us and them>
<0 or more intermediate routes>
<route from a service node>
route:
<1 byte route version (currently 0)>
<1 byte flags, lsb set indicates src is a service node>
<2 bytes latency in ms>
<2 bytes backpressure>
<2 bytes number of connected peers>
<8 bytes publish timestamp ms since epoch>
<32 bytes pubkey neighboor>
<32 bytes pubkey src>
<64 bytes signature of entire route signed by src>
command: 'c' (congestion)
tell other side to slow down
@ -192,7 +210,7 @@ tell other side to slow down
<4 bytes milliseconds slowdown lifetime>
<remaining bytes discarded>
command: 'D' (anti-congestion)
command: 'd' (anti-congestion)
tell other side to speed up
@ -201,26 +219,35 @@ tell other side to speed up
<4 bytes milliseconds speedup lifetime>
<remaining bytes discarded>
command: 'T' (transmit)
command: 't' (transmit data)
transit fragment
transmit a message to a peer
if this fragment is not addressed to us we route it to the neighboor
with the shortest route to the recipiant as advertised by all neighboors.
<header>
<1 byte number of 16 byte blocks offset from beginning of message, O>
<1 byte number of 16 byte blocks size of fragment data, N>
<4 bytes sequence number>
<32 bytes expected digest of message, present if O is 0, otherwise omitted>
<16 * N bytes of data>
<remaining bytes discarded>
<32 bytes public identity key of recipiant>
<32 bytes public identity key of sender>
<24 bytes nounce, N>
<N bytes encrypted message, X>
<last 32 bytes keyed hash, Z>
command: 'A' (ack)
encrypted via:
acknoledge fragments
K = EDDH(recipiant, sender)
X = SE(msg, K, N)
Z = MDS(X, K)
<header>
<1 byte number of acks following, N>
<8 * N bytes acks>
<remaining bytes discarded>
encrypted message format:
<1 byte version, currently 0>
<1 byte number of acks following, aN>
<8 * aN bytes acks>
<4 byte sequence number of fragment or 0 if no fragment is included>
<2 byte 16 byte block offset in message of this fragment if it is included>
<remaining bytes fragment data aligned to 16 bytes>
<discard anything not aligned to 16 bytes>
ack format:
@ -231,7 +258,7 @@ ack format:
<1 byte bitmask fragments posative ack (msb is fragment 0, lsb is fragment 7)>
command: 'R' (rotate keys)
command: 'r' (rotate keys)
inform remote that their RX key should be rotated
@ -248,7 +275,7 @@ B.rx_K = n_K
<32 bytes next public encryption key, K>
<remaining bytes discarded>
command: 'U' (upgrade)
command: 'u' (upgrade)
request protocol upgrade
@ -257,7 +284,7 @@ request protocol upgrade
<1 byte protocol max version to upgrade to>
<remaining bytes discarded>
command: 'V' (version upgrade)
command: 'v' (version upgrade)
sent in response to upgrade message

View File

@ -24,15 +24,28 @@ struct DemoHandler : public abyss::httpd::IRPCHandler
struct DemoCall : public abyss::http::IRPCClientHandler
{
DemoCall(abyss::http::ConnImpl* impl) : abyss::http::IRPCClientHandler(impl)
std::function< void(void) > m_Callback;
llarp::Logic* m_Logic;
DemoCall(abyss::http::ConnImpl* impl, llarp::Logic* logic,
std::function< void(void) > callback)
: abyss::http::IRPCClientHandler(impl)
, m_Callback(callback)
, m_Logic(logic)
{
llarp::LogInfo("new call");
}
bool
HandleResponse(abyss::http::RPC_Response resp) override
static void
CallCallback(void* u)
{
llarp::json::ToString(resp, std::cout);
static_cast< DemoCall* >(u)->m_Callback();
}
bool HandleResponse(abyss::http::RPC_Response) override
{
llarp::LogInfo("response get");
m_Logic->queue_job({this, &CallCallback});
return true;
}
@ -51,10 +64,18 @@ struct DemoCall : public abyss::http::IRPCClientHandler
struct DemoClient : public abyss::http::JSONRPC
{
llarp_ev_loop* m_Loop;
llarp::Logic* m_Logic;
DemoClient(llarp_ev_loop* l, llarp::Logic* logic)
: abyss::http::JSONRPC(), m_Loop(l), m_Logic(logic)
{
}
abyss::http::IRPCClientHandler*
NewConn(abyss::http::ConnImpl* impl)
{
return new DemoCall(impl);
return new DemoCall(impl, m_Logic, std::bind(&llarp_ev_loop_stop, m_Loop));
}
void
@ -109,7 +130,7 @@ main(__attribute__((unused)) int argc, __attribute__((unused)) char* argv[])
addr.sin_port = htons(1222);
addr.sin_family = AF_INET;
DemoServer serv;
DemoClient client;
DemoClient client(loop, logic);
llarp::Addr a(addr);
while(true)
{

View File

@ -70,7 +70,7 @@ namespace abyss
ConnImpl* self = static_cast< ConnImpl* >(conn->user);
if(!self->ProcessRead((const char*)buf.base, buf.sz))
{
self->CloseError();
self->CloseError("on read failed");
}
}
@ -138,8 +138,8 @@ namespace abyss
Close();
return true;
case json::IParser::eParseError:
handler->HandleError();
return false;
CloseError("json parse error");
return true;
default:
return false;
}
@ -195,8 +195,9 @@ namespace abyss
}
void
CloseError()
CloseError(const char* msg)
{
LogError("CloseError: ", msg);
if(handler)
handler->HandleError();
handler = nullptr;
@ -221,72 +222,20 @@ namespace abyss
std::stringstream ss;
json::ToString(m_RequestBody, ss);
body = ss.str();
// request base
char buf[512] = {0};
int sz = snprintf(buf, sizeof(buf),
"POST /rpc HTTP/1.0\r\nContent-Type: "
"application/json\r\nContent-Length: %zu\r\nAccept: "
"application/json\r\n",
body.size());
if(sz <= 0)
return;
if(!llarp_tcp_conn_async_write(m_Conn, llarp_buffer_t(buf, sz)))
{
llarp::LogError("failed to write first part of request");
CloseError();
return;
}
// header delimiter
buf[0] = ':';
buf[1] = ' ';
// CRLF
buf[2] = '\r';
buf[3] = '\n';
// write extra request header
m_SendHeaders.emplace("Content-Type", "application/json");
m_SendHeaders.emplace("Content-Length", std::to_string(body.size()));
m_SendHeaders.emplace("Accept", "application/json");
std::stringstream request;
request << "POST /json_rpc HTTP/1.0\r\n";
for(const auto& item : m_SendHeaders)
{
// header name
if(!llarp_tcp_conn_async_write(
m_Conn, llarp_buffer_t(item.first.c_str(), item.first.size())))
{
CloseError();
return;
}
// header delimiter
request << item.first << ": " << item.second << "\r\n";
request << "\r\n" << body;
std::string buf = request.str();
if(!llarp_tcp_conn_async_write(m_Conn,
llarp_buffer_t(buf, 2 * sizeof(char))))
llarp_buffer_t(buf.c_str(), buf.size())))
{
CloseError();
return;
}
// header value
if(!llarp_tcp_conn_async_write(
m_Conn,
llarp_buffer_t(item.second.c_str(), item.second.size())))
{
CloseError();
return;
}
// CRLF
if(!llarp_tcp_conn_async_write(
m_Conn, llarp_buffer_t(buf + 2, 2 * sizeof(char))))
{
CloseError();
return;
}
}
// CRLF
if(!llarp_tcp_conn_async_write(
m_Conn, llarp_buffer_t(buf + 2, 2 * sizeof(char))))
{
CloseError();
return;
}
// request body
if(!llarp_tcp_conn_async_write(
m_Conn, llarp_buffer_t(body.c_str(), body.size())))
{
CloseError();
CloseError("failed to write request");
return;
}
llarp::LogDebug("request sent");

View File

@ -39,6 +39,16 @@ namespace llarp
return *this;
}
void
Resize(size_t sz)
{
if(sz <= EncryptedFrameSize)
{
_sz = sz;
UpdateBuffer();
}
}
bool
DecryptInPlace(const SecretKey& seckey, llarp::Crypto* crypto);

View File

@ -554,6 +554,10 @@ namespace llarp
/// calls connected hooks
void
connected()
{
sockaddr_storage st;
socklen_t sl;
if (getpeername(fd, (sockaddr*)&st, &sl) == 0)
{
// we are connected yeh boi
if(_conn)
@ -563,6 +567,11 @@ namespace llarp
}
_calledConnected = true;
}
else
{
error();
}
}
void
flush_write();
@ -575,8 +584,9 @@ namespace llarp
}
void
error()
error() override
{
_shouldClose = true;
if(_conn)
{
#ifndef _WIN32
@ -591,6 +601,7 @@ namespace llarp
if(_conn->error)
_conn->error(_conn);
}
}
virtual ssize_t

View File

@ -20,7 +20,7 @@ namespace llarp
if(tcp.read)
tcp.read(&tcp, llarp_buffer_t(buf, amount));
}
else
else if(amount < 0)
{
// error
_shouldClose = true;
@ -123,7 +123,7 @@ namespace llarp
return -1;
b.sz = ret;
udp->recvfrom(udp, addr, ManagedBuffer{b});
return 0;
return ret;
}
int
@ -323,6 +323,7 @@ llarp_epoll_loop::tick(int ms)
epoll_event events[1024];
int result;
result = epoll_wait(epollfd, events, 1024, ms);
bool didIO = false;
if(result > 0)
{
int idx = 0;
@ -331,22 +332,27 @@ llarp_epoll_loop::tick(int ms)
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr);
if(ev)
{
llarp::LogDebug(idx, " of ", result,
llarp::LogDebug(idx, " of ", result, " on ", ev->fd,
" events=", std::to_string(events[idx].events));
if(events[idx].events & EPOLLERR)
{
llarp::LogDebug("epoll error");
ev->error();
}
else
{
if(events[idx].events & EPOLLIN)
{
ev->read(readbuf, sizeof(readbuf));
}
// write THEN READ don't revert me
if(events[idx].events & EPOLLOUT)
{
llarp::LogDebug("epoll out");
ev->flush_write();
}
if(events[idx].events & EPOLLIN)
{
llarp::LogDebug("epoll in");
if(ev->read(readbuf, sizeof(readbuf)) > 0)
didIO = true;
}
}
}
++idx;
@ -354,6 +360,9 @@ llarp_epoll_loop::tick(int ms)
}
if(result != -1)
tick_listeners();
/// if we didn't get an io events we sleep to avoid 100% cpu use
if(!didIO)
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
return result;
}

View File

@ -1,23 +1,207 @@
#include <link/iwp.hpp>
#include <link/iwp_internal.hpp>
#include <router/abstractrouter.hpp>
namespace llarp
{
namespace iwp
{
std::unique_ptr< ILinkLayer >
NewServerFromRouter(AbstractRouter*)
void
OuterMessage::Clear()
{
command = 0;
flow.Zero();
netid.Zero();
nextFlowID.Zero();
rejectReason.clear();
N.Zero();
X.Zero();
Xsize = 0;
Z.Zero();
}
bool
OuterMessage::Decode(llarp_buffer_t* buf)
{
if(buf->size_left() < 34)
return false;
command = *buf->cur;
++buf->cur;
if(*buf->cur != '=')
return false;
std::copy_n(flow.begin(), 32, buf->cur);
buf->cur += 32;
switch(command)
{
case eOCMD_ObtainFlowID:
if(buf->size_left() < 40)
return false;
buf->cur += 32;
std::copy_n(netid.begin(), 8, buf->cur);
return true;
case eOCMD_GiveFlowID:
if(buf->size_left() < 32)
return false;
std::copy_n(nextFlowID.begin(), 32, buf->cur);
return true;
case eOCMD_Reject:
rejectReason = std::string(buf->cur, buf->base + buf->sz);
return true;
case eOCMD_SessionNegotiate:
// explicit fallthrough
case eOCMD_TransmitData:
if(buf->size_left() <= 56)
return false;
std::copy_n(N.begin(), N.size(), buf->cur);
buf->cur += N.size();
Xsize = buf->size_left() - Z.size();
if(Xsize > X.size())
return false;
std::copy_n(X.begin(), Xsize, buf->cur);
buf->cur += Xsize;
std::copy_n(Z.begin(), Z.size(), buf->cur);
return true;
default:
return false;
}
}
LinkLayer::LinkLayer(Crypto* c, const SecretKey& enckey, GetRCFunc getrc,
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler t, SessionClosedHandler closed)
: ILinkLayer(enckey, getrc, h, sign, est, reneg, t, closed), crypto(c)
{
}
LinkLayer::~LinkLayer()
{
}
void
LinkLayer::Pump()
{
ILinkLayer::Pump();
}
const char*
LinkLayer::Name() const
{
return "iwp";
}
bool
LinkLayer::KeyGen(SecretKey& k)
{
k.Zero();
crypto->encryption_keygen(k);
return !k.IsZero();
}
uint16_t
LinkLayer::Rank() const
{
return 2;
}
bool
LinkLayer::Start(Logic* l)
{
if(!ILinkLayer::Start(l))
return false;
/// TODO: change me to true when done
return false;
}
void
LinkLayer::RecvFrom(const Addr& from, const void* pkt, size_t sz)
{
m_OuterMsg.Clear();
llarp_buffer_t buf(pkt, sz);
if(!m_OuterMsg.Decode(&buf))
{
LogError("failed to decode outer message");
return;
}
NetID ourNetID;
switch(m_OuterMsg.command)
{
case eOCMD_ObtainFlowID:
if(!ShouldSendFlowID(from))
return; // drop
if(m_OuterMsg.netid == ourNetID)
SendFlowID(from, m_OuterMsg.flow);
else
SendReject(from, m_OuterMsg.flow, "bad net id");
}
}
ILinkSession*
LinkLayer::NewOutboundSession(const RouterContact& rc,
const AddressInfo& ai)
{
(void)rc;
(void)ai;
// TODO: implement me
return nullptr;
}
std::unique_ptr< ILinkLayer >
NewServer(llarp::Crypto*, const SecretKey&, llarp::GetRCFunc,
llarp::LinkMessageHandler, llarp::SessionEstablishedHandler,
llarp::SessionRenegotiateHandler, llarp::SignBufferFunc,
llarp::TimeoutHandler, llarp::SessionClosedHandler)
void
LinkLayer::SendFlowID(const Addr& to, const FlowID_t& flow)
{
// TODO: implement me
(void)to;
(void)flow;
}
bool
LinkLayer::ShouldSendFlowID(const Addr& to) const
{
(void)to;
// TODO: implement me
return false;
}
void
LinkLayer::SendReject(const Addr& to, const FlowID_t& flow, const char* msg)
{
// TODO: implement me
(void)to;
(void)flow;
(void)msg;
}
std::unique_ptr< ILinkLayer >
NewServerFromRouter(AbstractRouter* r)
{
using namespace std::placeholders;
return NewServer(
r->crypto(), r->encryption(), std::bind(&AbstractRouter::rc, r),
std::bind(&AbstractRouter::HandleRecvLinkMessageBuffer, r, _1, _2),
std::bind(&AbstractRouter::OnSessionEstablished, r, _1),
std::bind(&AbstractRouter::CheckRenegotiateValid, r, _1, _2),
std::bind(&AbstractRouter::Sign, r, _1, _2),
std::bind(&AbstractRouter::OnConnectTimeout, r, _1),
std::bind(&AbstractRouter::SessionClosed, r, _1));
}
std::unique_ptr< ILinkLayer >
NewServer(Crypto* c, const SecretKey& enckey, GetRCFunc getrc,
LinkMessageHandler h, SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, SignBufferFunc sign,
TimeoutHandler t, SessionClosedHandler closed)
{
(void)c;
(void)enckey;
(void)getrc;
(void)h;
(void)est;
(void)reneg;
(void)sign;
(void)t;
(void)closed;
// TODO: implement me
return nullptr;
}

View File

@ -3,6 +3,7 @@
#include <constants/link_layer.hpp>
#include <crypto/crypto.hpp>
#include <crypto/encrypted.hpp>
#include <crypto/types.hpp>
#include <link/server.hpp>
#include <link/session.hpp>
@ -17,6 +18,117 @@ namespace llarp
{
struct LinkLayer;
using FlowID_t = llarp::AlignedBuffer< 32 >;
using OuterCommand_t = byte_t;
constexpr OuterCommand_t eOCMD_ObtainFlowID = 'O';
constexpr OuterCommand_t eOCMD_GiveFlowID = 'G';
constexpr OuterCommand_t eOCMD_Reject = 'R';
constexpr OuterCommand_t eOCMD_SessionNegotiate = 'S';
constexpr OuterCommand_t eOCMD_TransmitData = 'D';
using InnerCommand_t = byte_t;
constexpr InnerCommand_t eICMD_KeepAlive = 'k';
constexpr InnerCommand_t eICMD_KeepAliveAck = 'l';
constexpr InnerCommand_t eICMD_Congestion = 'c';
constexpr InnerCommand_t eICMD_AntiCongestion = 'd';
constexpr InnerCommand_t eICMD_Transmit = 't';
constexpr InnerCommand_t eICMD_Ack = 'a';
constexpr InnerCommand_t eICMD_RotateKeys = 'r';
constexpr InnerCommand_t eICMD_UpgradeProtocol = 'u';
constexpr InnerCommand_t eICMD_VersionUpgrade = 'v';
struct OuterMessage
{
// required memebers
byte_t command;
FlowID_t flow;
// optional memebers follow
NetID netid;
FlowID_t nextFlowID;
std::string rejectReason;
AlignedBuffer< 24 > N;
// TODO: compute optimal size
AlignedBuffer< 1440 > X;
size_t Xsize;
ShortHash Z;
/// encode to buffer
bool
Encode(llarp_buffer_t *buf) const;
/// decode from buffer
bool
Decode(llarp_buffer_t *buf);
/// verify signature if needed
bool
Verify(const SharedSecret &K) const;
/// clear members
void
Clear();
};
/// TODO: fixme
constexpr size_t MaxFrags = 8;
using MessageBuffer_t = AlignedBuffer< MAX_LINK_MSG_SIZE >;
using FragmentLen_t = uint16_t;
using SequenceNum_t = uint32_t;
using WritePacketFunc = std::function< void(const llarp_buffer_t &) >;
struct MessageState
{
/// default
MessageState();
/// inbound
MessageState(const ShortHash &digest, SequenceNum_t num);
/// outbound
MessageState(const ShortHash &digest, const llarp_buffer_t &buf,
SequenceNum_t num);
/// the expected hash of the message
const ShortHash expectedHash;
/// which fragments have we got
std::bitset< MaxFrags > acks;
/// the message buffer
MessageBuffer_t msg;
/// the message's size
FragmentLen_t sz;
/// the last activity we have had
llarp_time_t lastActiveAt;
// sequence number
const SequenceNum_t seqno;
/// return true if this message is to be removed
/// because of inactivity
bool
IsExpired(llarp_time_t now) const;
/// return true if we have recvieved or sent the underlying message in
/// full.
bool
IsDone() const;
/// return true if we should retransmit some packets
bool
ShouldRetransmit(llarp_time_t now) const;
/// transmit unacked fragments
bool
TransmitUnacked(WritePacketFunc write_pkt) const;
/// transmit acks packet
bool
TransmitAcks(WritePacketFunc write_pkt);
};
struct Session final : public llarp::ILinkSession
{
/// base
@ -35,12 +147,15 @@ namespace llarp
return {};
}
/// pump ll io
void
PumpIO();
/// tick every 1 s
void
TickIO(llarp_time_t now);
/// queue full message
bool
QueueMessageBuffer(const llarp_buffer_t &buf);
@ -57,6 +172,7 @@ namespace llarp
void
Close();
/// start outbound handshake
void
Connect();
@ -105,196 +221,6 @@ namespace llarp
/// maximum fragment size
static constexpr FragLen_t fragsize = MAX_LINK_MSG_SIZE / maxfrags;
struct FragmentHeader
{
/// protocol version, always LLARP_PROTO_VERSION
Proto_t version = LLARP_PROTO_VERSION;
/// fragment command type
Cmd_t cmd = 0;
/// if cmd is XMIT this is the number of additional fragments this
/// message has
/// if cmd is FACK this is the fragment bitfield of the
/// messages acked otherwise 0
Flags_t flags = 0;
/// if cmd is XMIT this is the fragment index
/// if cmd is FACK this is set to 0xff to indicate message drop
/// otherwise set to 0
/// any other cmd it is set to 0
Fragno_t fragno = 0;
/// if cmd is XMIT then this is the size of the current fragment
/// if cmd is FACK then this MUST be set to 0
FragLen_t fraglen = 0;
/// if cmd is XMIT or FACK this is the sequence number of the message
/// otherwise it's 0
Seqno_t seqno = 0;
bool
Decode(llarp_buffer_t *buf)
{
if(buf->size_left() < fragoverhead)
return false;
version = *buf->cur;
if(version != LLARP_PROTO_VERSION)
return false;
buf->cur++;
cmd = *buf->cur;
buf->cur++;
flags = *buf->cur;
buf->cur++;
fragno = *buf->cur;
buf->cur++;
buf->read_uint16(fraglen);
buf->read_uint32(seqno);
return fraglen <= fragsize;
}
bool
Encode(llarp_buffer_t *buf, const llarp_buffer_t &body)
{
if(body.sz > fragsize)
return false;
fraglen = body.sz;
if(buf->size_left() < (fragoverhead + fraglen))
return false;
*buf->cur = LLARP_PROTO_VERSION;
buf->cur++;
*buf->cur = cmd;
buf->cur++;
*buf->cur = flags;
buf->cur++;
*buf->cur = fragno;
buf->cur++;
buf->put_uint16(fraglen);
buf->put_uint32(seqno);
if(fraglen)
memcpy(buf->cur, body.base, fraglen);
buf->cur += fraglen;
return true;
}
};
struct MessageState
{
/// default
MessageState(){};
/// inbound
MessageState(Flags_t numfrags)
{
acks.set();
if(numfrags <= maxfrags)
{
while(numfrags)
acks.reset(maxfrags - (numfrags--));
}
else // invalid value
return;
}
/// outbound
MessageState(const llarp_buffer_t &buf)
{
sz = std::min(buf.sz, MAX_LINK_MSG_SIZE);
memcpy(msg.data(), buf.base, sz);
size_t idx = 0;
acks.set();
while(idx * fragsize < sz)
acks.reset(idx++);
};
/// which fragments have we got
std::bitset< maxfrags > acks;
/// the message buffer
MessageBuffer_t msg;
/// the message's size
FragLen_t sz;
/// the last activity we have had
llarp_time_t lastActiveAt;
/// return true if this message is to be removed
/// because of inactivity
bool
IsExpired(llarp_time_t now) const
{
return now > lastActiveAt && now - lastActiveAt > 2000;
}
bool
IsDone() const
{
return acks.all();
}
bool
ShouldRetransmit(llarp_time_t now) const
{
if(IsDone())
return false;
return now > lastActiveAt && now - lastActiveAt > 500;
}
template < typename write_pkt_func >
bool
TransmitUnacked(write_pkt_func write_pkt, Seqno_t seqno) const
{
static FragLen_t maxfragsize = fragsize;
FragmentHeader hdr;
hdr.seqno = seqno;
hdr.cmd = XMIT;
AlignedBuffer< fragoverhead + fragsize > frag;
llarp_buffer_t buf(frag);
const byte_t *ptr = msg.data();
Fragno_t idx = 0;
FragLen_t len = sz;
while(idx < maxfrags)
{
const FragLen_t l = std::min(len, maxfragsize);
if(!acks.test(idx))
{
hdr.fragno = idx;
hdr.fraglen = l;
if(!hdr.Encode(&buf, llarp_buffer_t(ptr, l)))
return false;
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
len -= l;
if(write_pkt(buf.base, buf.sz) != int(buf.sz))
return false;
}
ptr += l;
len -= l;
if(l >= fragsize)
++idx;
else
break;
}
return true;
}
template < typename write_pkt_func >
bool
TransmitAcks(write_pkt_func write_pkt, Seqno_t seqno)
{
FragmentHeader hdr;
hdr.seqno = seqno;
hdr.cmd = FACK;
hdr.flags = 0;
byte_t idx = 0;
while(idx < maxfrags)
{
if(acks.test(idx))
hdr.flags |= 1 << idx;
++idx;
}
hdr.fraglen = 0;
hdr.fragno = 0;
AlignedBuffer< fragoverhead > frag;
llarp_buffer_t buf(frag);
if(!hdr.Encode(&buf, llarp_buffer_t(nullptr, nullptr, 0)))
return false;
return write_pkt(buf.base, buf.sz) == int(buf.sz);
}
};
using MessageHolder_t = std::unordered_map< Seqno_t, MessageState >;
MessageHolder_t m_Inbound;
@ -310,11 +236,10 @@ namespace llarp
struct LinkLayer final : public llarp::ILinkLayer
{
LinkLayer(llarp::Crypto *crypto, const SecretKey &encryptionSecretKey,
const SecretKey &identitySecretKey, llarp::GetRCFunc getrc,
llarp::LinkMessageHandler h, llarp::SignBufferFunc sign,
llarp::GetRCFunc getrc, llarp::LinkMessageHandler h,
llarp::SessionEstablishedHandler established,
llarp::SessionRenegotiateHandler reneg,
llarp::TimeoutHandler timeout,
llarp::SignBufferFunc sign, llarp::TimeoutHandler timeout,
llarp::SessionClosedHandler closed);
~LinkLayer();
@ -339,36 +264,37 @@ namespace llarp
uint16_t
Rank() const override;
const byte_t *
IndentityKey() const
{
return m_IdentityKey.data();
}
/// verify that a new flow id matches addresses and old flow id
bool
VerifyFlowID(const FlowID_t &newID, const Addr &from,
const FlowID_t &oldID) const;
const AlignedBuffer< 32 > &
CookieSec() const
{
return m_CookieSec;
}
RouterID
GetRouterID() const
{
return m_IdentityKey.toPublic();
}
void
RecvFrom(const llarp::Addr &from, const void *buf, size_t sz) override;
private:
bool
SignBuffer(llarp::Signature &sig, llarp_buffer_t buf) const
{
return crypto->sign(sig, m_IdentityKey, buf);
}
const llarp::SecretKey m_IdentityKey;
AlignedBuffer< 32 > m_CookieSec;
ShouldSendFlowID(const Addr &from) const;
/// handle ll recv
void
RecvFrom(const llarp::Addr &from, const void *buf, size_t sz) override;
SendReject(const Addr &to, const FlowID_t &flow, const char *msg);
void
SendFlowID(const Addr &to, const FlowID_t &flow);
using ActiveFlows_t =
std::unordered_map< FlowID_t, RouterID, FlowID_t::Hash >;
ActiveFlows_t m_ActiveFlows;
using PendingFlows_t = std::unordered_map< Addr, FlowID_t, Addr::Hash >;
/// flows that are pending authentication
PendingFlows_t m_PendingFlows;
/// cookie used in flow id computation
AlignedBuffer< 32 > m_FlowCookie;
OuterMessage m_OuterMsg;
};
} // namespace iwp
} // namespace llarp

View File

@ -30,7 +30,9 @@ namespace llarp
using GetRCFunc = std::function< const llarp::RouterContact&(void) >;
/// handler of session established
using SessionEstablishedHandler = std::function< void(llarp::RouterContact) >;
/// return false to reject
/// return true to accept
using SessionEstablishedHandler = std::function< bool(ILinkSession*) >;
/// f(new, old)
/// handler of session renegotiation

View File

@ -777,10 +777,10 @@ namespace llarp
Close();
return false;
}
EnterState(eSessionReady);
/// future LIM are used for session renegotiation
GotLIM = std::bind(&Session::GotSessionRenegotiate, this,
std::placeholders::_1);
EnterState(eSessionReady);
return true;
}
@ -982,7 +982,8 @@ namespace llarp
if(st == eSessionReady)
{
parent->MapAddr(remoteRC.pubkey.as_array(), this);
parent->SessionEstablished(remoteRC);
if(!parent->SessionEstablished(this))
Close();
}
}

View File

@ -89,17 +89,18 @@ namespace llarp
record.nextHop = hop.upstream;
record.commkey = seckey_topublic(hop.commkey);
auto buf = frame.Buffer();
buf->cur = buf->base + EncryptedFrameOverheadSize;
llarp_buffer_t buf(frame.data(), frame.size());
buf.cur = buf.base + EncryptedFrameOverheadSize;
// encode record
if(!record.BEncode(buf))
if(!record.BEncode(&buf))
{
// failed to encode?
LogError("Failed to generate Commit Record");
DumpBuffer(*buf);
DumpBuffer(buf);
delete ctx;
return;
}
frame.Resize(buf.cur - buf.base);
// use ephemeral keypair for frame
SecretKey framekey;
ctx->crypto->encryption_keygen(framekey);

View File

@ -47,8 +47,8 @@ namespace llarp
struct AbstractRouter : public util::IStateful
{
virtual void
OnSessionEstablished(RouterContact rc) = 0;
virtual bool
OnSessionEstablished(ILinkSession *) = 0;
virtual bool
HandleRecvLinkMessageBuffer(ILinkSession *from,
@ -121,6 +121,9 @@ namespace llarp
virtual void
OnConnectTimeout(ILinkSession *session) = 0;
/// connect to N random routers
virtual void
ConnectToRandomRouters(int N) = 0;
/// inject configuration and reconfigure router
virtual bool
Reconfigure(Config *conf) = 0;

View File

@ -192,11 +192,10 @@ namespace llarp
return false;
}
void
Router::OnSessionEstablished(RouterContact rc)
bool
Router::OnSessionEstablished(ILinkSession * s)
{
async_verify_RC(rc, nullptr);
LogInfo("session with ", RouterID(rc.pubkey), " established");
return async_verify_RC(s->GetRemoteRC());
}
Router::Router(struct llarp_threadpool *_tp, struct llarp_ev_loop *__netloop,
@ -851,7 +850,7 @@ namespace llarp
{
whitelistRouters = IsTrueValue(val);
}
if(StrEq(key, "jsonrpc"))
if(StrEq(key, "jsonrpc") || StrEq(key, "addr"))
{
lokidRPCAddr = val;
}
@ -989,7 +988,8 @@ namespace llarp
return false;
// store it in nodedb async
async_verify_RC(newrc, nullptr);
if(!async_verify_RC(newrc))
return false;
// update dht if required
if(dht()->impl->Nodes()->HasNode(dht::Key_t{newrc.pubkey}))
{
@ -1086,15 +1086,18 @@ namespace llarp
LogError("we have no bootstrap nodes specified");
}
if(inboundLinks.size() == 0)
if(!IsServiceNode())
{
size_t connected = NumberOfConnectedRouters();
if(connected < minConnectedRouters)
{
size_t dlt = minConnectedRouters - connected;
LogInfo("connecting to ", dlt, " random routers to keep alive");
ConnectToRandomRouters(dlt);
}
paths.BuildPaths(now);
_hiddenServiceContext.Tick(now);
}
if(NumberOfConnectedRouters() < minConnectedRouters)
{
ConnectToRandomRouters(minConnectedRouters);
}
_exitContext.Tick(now);
if(rpcCaller)
rpcCaller->Tick(now);
@ -1222,20 +1225,21 @@ namespace llarp
return false;
}
void
Router::async_verify_RC(const RouterContact &rc, ILinkLayer *link)
bool
Router::async_verify_RC(const RouterContact &rc)
{
if(pendingVerifyRC.count(rc.pubkey))
return;
if(rc.IsPublicRouter() && whitelistRouters)
if(rc.IsPublicRouter() && whitelistRouters && IsServiceNode())
{
if(lokinetRouters.find(rc.pubkey) == lokinetRouters.end())
{
LogInfo(rc.pubkey, " is NOT a valid service node, rejecting");
link->CloseSessionTo(rc.pubkey);
return;
return false;
}
}
if(pendingVerifyRC.count(rc.pubkey))
return true;
LogInfo("session with ", RouterID(rc.pubkey), " established");
llarp_async_verify_rc *job = &pendingVerifyRC[rc.pubkey];
async_verify_context *ctx = new async_verify_context();
ctx->router = this;
@ -1260,6 +1264,7 @@ namespace llarp
job->hook = &Router::on_verify_client_rc;
llarp_nodedb_async_verify(job);
return true;
}
void

View File

@ -211,7 +211,7 @@ namespace llarp
uint16_t m_OutboundPort = 0;
/// always maintain this many connections to other routers
size_t minConnectedRouters = 1;
size_t minConnectedRouters = 3;
/// hard upperbound limit on the number of router to router connections
size_t maxConnectedRouters = 2000;
@ -321,8 +321,8 @@ namespace llarp
~Router();
void
OnSessionEstablished(RouterContact rc) override;
bool
OnSessionEstablished(ILinkSession *) override;
bool
HandleRecvLinkMessageBuffer(ILinkSession *from,
@ -493,7 +493,7 @@ namespace llarp
const PathID_t &rxid) override;
void
ConnectToRandomRouters(int N);
ConnectToRandomRouters(int N) override;
size_t
NumberOfConnectedRouters() const override;
@ -504,8 +504,8 @@ namespace llarp
bool
GetRandomConnectedRouter(RouterContact &result) const override;
void
async_verify_RC(const RouterContact &rc, ILinkLayer *link);
bool
async_verify_RC(const RouterContact &rc);
void
HandleDHTLookupForSendTo(RouterID remote,

View File

@ -131,6 +131,12 @@ namespace llarp
&& netID == other.netID;
}
bool
operator!=(const RouterContact & other) const
{
return !(*this == other);
}
void
Clear();

View File

@ -1011,9 +1011,6 @@ namespace llarp
p->SetDropHandler(std::bind(
&Endpoint::OutboundContext::HandleDataDrop, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
p->SetDeadChecker(std::bind(&Endpoint::CheckPathIsDead, m_Endpoint,
std::placeholders::_1,
std::placeholders::_2));
}
void
@ -1024,10 +1021,15 @@ namespace llarp
}
bool
Endpoint::CheckPathIsDead(__attribute__((unused)) path::Path* p,
__attribute__((unused)) llarp_time_t latency)
Endpoint::CheckPathIsDead(path::Path*, llarp_time_t)
{
return false;
RouterLogic()->call_later(
{100, this, [](void* u, uint64_t, uint64_t left) {
if(left)
return;
HandlePathDead(u);
}});
return true;
}
bool

View File

@ -31,7 +31,7 @@ namespace llarp
{
/// minimum interval for publishing introsets
static const llarp_time_t INTROSET_PUBLISH_INTERVAL =
DEFAULT_PATH_LIFETIME / 4;
DEFAULT_PATH_LIFETIME / 8;
static const llarp_time_t INTROSET_PUBLISH_RETRY_INTERVAL = 5000;

View File

@ -189,9 +189,9 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob)
return true;
}
},
[&](llarp::RouterContact rc) {
ASSERT_EQ(rc, Bob.GetRC());
llarp::LogInfo("alice established with bob");
[&](llarp::ILinkSession * s) -> bool {
const auto rc = s->GetRemoteRC();
return rc.pubkey == Bob.GetRC().pubkey;
},
[&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; },
[&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool {
@ -228,10 +228,11 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob)
Bob.gotLIM = true;
return sendDiscardMessage(s);
},
[&](llarp::RouterContact rc) {
ASSERT_EQ(rc, Alice.GetRC());
[&](llarp::ILinkSession * s) -> bool {
if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey)
return false;
llarp::LogInfo("bob established with alice");
Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(),
return Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(),
sendDiscardMessage);
},
[&](llarp::RouterContact newrc, llarp::RouterContact oldrc) -> bool {
@ -252,7 +253,6 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob)
ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC()));
RunMainloop();
ASSERT_TRUE(Alice.gotLIM);
ASSERT_TRUE(Bob.gotLIM);
ASSERT_TRUE(success);
}
@ -260,6 +260,74 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob)
TEST_F(LinkLayerTest, TestUTPAliceConnectToBob)
{
Alice.link = llarp::utp::NewServer(
&crypto, Alice.encryptionKey,
[&]() -> const llarp::RouterContact& { return Alice.GetRC(); },
[&](llarp::ILinkSession*, const llarp_buffer_t& buf) -> bool {
return AliceGotMessage(buf);
},
[&](llarp::ILinkSession * s) -> bool {
if(s->GetRemoteRC().pubkey != Bob.GetRC().pubkey)
return false;
llarp::LogInfo("alice established with bob");
return true;
},
[&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; },
[&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool {
return crypto.sign(sig, Alice.signingKey, buf);
},
[&](llarp::ILinkSession* session) {
ASSERT_FALSE(session->IsEstablished());
Stop();
},
[&](llarp::RouterID router) { ASSERT_EQ(router, Bob.GetRouterID()); });
Bob.link = llarp::utp::NewServer(
&crypto, Bob.encryptionKey,
[&]() -> const llarp::RouterContact& { return Bob.GetRC(); },
[&](llarp::ILinkSession*, const llarp_buffer_t& ) -> bool {
return true;
},
[&](llarp::ILinkSession * s) -> bool {
if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey)
return false;
llarp::LogInfo("bob established with alice");
logic->queue_job({s, [](void * u) {
llarp::ILinkSession * self = static_cast<llarp::ILinkSession*>(u);
std::array< byte_t, 32 > tmp;
llarp_buffer_t otherBuf(tmp);
llarp::DiscardMessage discard;
if(!discard.BEncode(&otherBuf))
return;
otherBuf.sz = otherBuf.cur - otherBuf.base;
otherBuf.cur = otherBuf.base;
self->SendMessageBuffer(otherBuf);
}});
return true;
},
[&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; },
[&](llarp::Signature& sig, const llarp_buffer_t& buf) -> bool {
return crypto.sign(sig, Bob.signingKey, buf);
},
[&](llarp::ILinkSession* session) {
ASSERT_FALSE(session->IsEstablished());
},
[&](llarp::RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); });
ASSERT_TRUE(Alice.Start(logic.get(), netLoop, AlicePort));
ASSERT_TRUE(Bob.Start(logic.get(), netLoop, BobPort));
ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC()));
RunMainloop();
ASSERT_TRUE(Bob.gotLIM);
ASSERT_TRUE(success);
}
/*
TEST_F(LinkLayerTest, TestIWPAliceConnectToBob)
{
Alice.link = llarp::iwp::NewServer(
&crypto, Alice.encryptionKey,
[&]() -> const llarp::RouterContact& { return Alice.GetRC(); },
[&](llarp::ILinkSession* s, const llarp_buffer_t& buf) -> bool {
@ -305,7 +373,7 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob)
return s->SendMessageBuffer(otherBuf);
};
Bob.link = llarp::utp::NewServer(
Bob.link = llarp::iwp::NewServer(
&crypto, Bob.encryptionKey,
[&]() -> const llarp::RouterContact& { return Bob.GetRC(); },
[&](llarp::ILinkSession* s, const llarp_buffer_t& buf) -> bool {
@ -342,91 +410,5 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob)
ASSERT_TRUE(Alice.gotLIM);
ASSERT_TRUE(Bob.gotLIM);
ASSERT_TRUE(success);
}
TEST_F(LinkLayerTest, TestIWPAliceConnectToBob)
{
/*
Alice.link = llarp::iwp::NewServer(
&crypto, Alice.encryptionKey,
[&]() -> const llarp::RouterContact& { return Alice.GetRC(); },
[&](llarp::ILinkSession* s, llarp_buffer_t buf) -> bool {
if(Alice.gotLIM)
{
return AliceGotMessage(buf);
}
else
{
llarp::LinkIntroMessage msg;
if(!msg.BDecode(&buf))
return false;
if(!s->GotLIM(&msg))
return false;
Alice.gotLIM = true;
return true;
}
},
[&](llarp::RouterContact rc) {
ASSERT_EQ(rc, Bob.GetRC());
llarp::LogInfo("alice established with bob");
},
[&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; },
[&](llarp::Signature& sig, llarp_buffer_t buf) -> bool {
return crypto.sign(sig, Alice.signingKey, buf);
},
[&](llarp::ILinkSession* session) {
ASSERT_FALSE(session->IsEstablished());
Stop();
},
[&](llarp::RouterID router) { ASSERT_EQ(router, Bob.GetRouterID()); });
auto sendDiscardMessage = [](llarp::ILinkSession* s) -> bool {
// send discard message in reply to complete unit test
byte_t tmp[32] = {0};
auto otherBuf = llarp::StackBuffer< decltype(tmp) >(tmp);
llarp::DiscardMessage discard;
if(!discard.BEncode(&otherBuf))
return false;
otherBuf.sz = otherBuf.cur - otherBuf.base;
otherBuf.cur = otherBuf.base;
return s->SendMessageBuffer(otherBuf);
};
Bob.link = llarp::iwp::NewServer(
&crypto, Bob.encryptionKey,
[&]() -> const llarp::RouterContact& { return Bob.GetRC(); },
[&](llarp::ILinkSession* s, llarp_buffer_t buf) -> bool {
llarp::LinkIntroMessage msg;
if(!msg.BDecode(&buf))
return false;
if(!s->GotLIM(&msg))
return false;
Bob.gotLIM = true;
return true;
},
[&](llarp::RouterContact rc) {
ASSERT_EQ(rc, Alice.GetRC());
llarp::LogInfo("bob established with alice");
Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(),
sendDiscardMessage);
},
[&](llarp::RouterContact, llarp::RouterContact) -> bool { return true; },
[&](llarp::Signature& sig, llarp_buffer_t buf) -> bool {
return crypto.sign(sig, Bob.signingKey, buf);
},
[&](llarp::ILinkSession* session) {
ASSERT_FALSE(session->IsEstablished());
},
[&](llarp::RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); });
ASSERT_TRUE(Alice.Start(logic.get(), netLoop, AlicePort));
ASSERT_TRUE(Bob.Start(logic.get(), netLoop, BobPort));
ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC()));
RunMainloop();
ASSERT_TRUE(Alice.gotLIM);
ASSERT_TRUE(Bob.gotLIM);
ASSERT_TRUE(success);
*/
}