Fix more address sanitiser issues

pull/480/head
Michael 5 years ago
parent 15a9086d57
commit 25f10d5b11
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C

@ -190,7 +190,7 @@ if(NOT DEBIAN)
endif(NOT DEBIAN) endif(NOT DEBIAN)
if(ASAN) if(ASAN)
set(DEBUG_FLAGS ${DEBUG_FLAGS} -fsanitize=undefined -fno-omit-frame-pointer) set(DEBUG_FLAGS ${DEBUG_FLAGS} -fsanitize=address -fno-omit-frame-pointer)
set(OPTIMIZE_FLAGS "-O0") set(OPTIMIZE_FLAGS "-O0")
endif(ASAN) endif(ASAN)

@ -91,14 +91,14 @@ namespace llarp
} }
} }
ILinkSession* std::shared_ptr< ILinkSession >
LinkLayer::NewOutboundSession(const RouterContact& rc, LinkLayer::NewOutboundSession(const RouterContact& rc,
const AddressInfo& ai) const AddressInfo& ai)
{ {
(void)rc; (void)rc;
(void)ai; (void)ai;
// TODO: implement me // TODO: implement me
return nullptr; return {};
} }
void void

@ -21,12 +21,19 @@ namespace llarp
TimeoutHandler timeout, SessionClosedHandler closed); TimeoutHandler timeout, SessionClosedHandler closed);
~LinkLayer(); ~LinkLayer();
Crypto *const crypto; Crypto *const crypto;
Crypto *
OurCrypto() override
{
return crypto;
}
bool bool
Start(Logic *l) override; Start(Logic *l) override;
ILinkSession * std::shared_ptr< ILinkSession >
NewOutboundSession(const RouterContact &rc, NewOutboundSession(const RouterContact &rc,
const AddressInfo &ai) override; const AddressInfo &ai) override;

@ -143,10 +143,10 @@ namespace llarp
{ {
if(m_AuthedLinks.count(pk) > MaxSessionsPerKey) if(m_AuthedLinks.count(pk) > MaxSessionsPerKey)
{ {
s->SendClose(); s->Close();
return false; return false;
} }
m_AuthedLinks.emplace(pk, std::move(itr->second)); m_AuthedLinks.emplace(pk, itr->second);
itr = m_Pending.erase(itr); itr = m_Pending.erase(itr);
return true; return true;
} }
@ -206,13 +206,12 @@ namespace llarp
if(!PickAddress(rc, to)) if(!PickAddress(rc, to))
return false; return false;
llarp::Addr addr(to); llarp::Addr addr(to);
auto s = NewOutboundSession(rc, to); std::shared_ptr< ILinkSession > s = NewOutboundSession(rc, to);
if(PutSession(s)) if(PutSession(s))
{ {
s->Start(); s->Start();
return true; return true;
} }
delete s;
return false; return false;
} }
@ -258,7 +257,7 @@ namespace llarp
auto itr = m_AuthedLinks.begin(); auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end()) while(itr != m_AuthedLinks.end())
{ {
itr->second->SendClose(); itr->second->Close();
++itr; ++itr;
} }
} }
@ -267,7 +266,7 @@ namespace llarp
auto itr = m_Pending.begin(); auto itr = m_Pending.begin();
while(itr != m_Pending.end()) while(itr != m_Pending.end())
{ {
itr->second->SendClose(); itr->second->Close();
++itr; ++itr;
} }
} }
@ -283,7 +282,7 @@ namespace llarp
auto itr = range.first; auto itr = range.first;
while(itr != range.second) while(itr != range.second)
{ {
itr->second->SendClose(); itr->second->Close();
itr = m_AuthedLinks.erase(itr); itr = m_AuthedLinks.erase(itr);
} }
} }
@ -380,14 +379,14 @@ namespace llarp
} }
bool bool
ILinkLayer::PutSession(ILinkSession* s) ILinkLayer::PutSession(const std::shared_ptr< ILinkSession >& s)
{ {
static constexpr size_t MaxSessionsPerEndpoint = 5; static constexpr size_t MaxSessionsPerEndpoint = 5;
Lock lock(&m_PendingMutex); Lock lock(&m_PendingMutex);
llarp::Addr addr = s->GetRemoteEndpoint(); llarp::Addr addr = s->GetRemoteEndpoint();
if(m_Pending.count(addr) >= MaxSessionsPerEndpoint) if(m_Pending.count(addr) >= MaxSessionsPerEndpoint)
return false; return false;
m_Pending.emplace(addr, std::unique_ptr< ILinkSession >(s)); m_Pending.emplace(addr, s);
return true; return true;
} }

@ -11,6 +11,7 @@
#include <util/status.hpp> #include <util/status.hpp>
#include <list> #include <list>
#include <memory>
#include <unordered_map> #include <unordered_map>
namespace llarp namespace llarp
@ -60,6 +61,9 @@ namespace llarp
return llarp_ev_loop_time_now_ms(m_Loop); return llarp_ev_loop_time_now_ms(m_Loop);
} }
virtual Crypto*
OurCrypto() = 0;
bool bool
HasSessionTo(const RouterID& pk); HasSessionTo(const RouterID& pk);
@ -105,7 +109,7 @@ namespace llarp
Configure(llarp_ev_loop* loop, const std::string& ifname, int af, Configure(llarp_ev_loop* loop, const std::string& ifname, int af,
uint16_t port); uint16_t port);
virtual ILinkSession* virtual std::shared_ptr< ILinkSession >
NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) = 0; NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) = 0;
virtual void virtual void
@ -225,7 +229,7 @@ namespace llarp
using Mutex = util::NullMutex; using Mutex = util::NullMutex;
bool bool
PutSession(ILinkSession* s); PutSession(const std::shared_ptr< ILinkSession >& s);
llarp::Logic* m_Logic = nullptr; llarp::Logic* m_Logic = nullptr;
llarp_ev_loop* m_Loop = nullptr; llarp_ev_loop* m_Loop = nullptr;
@ -234,10 +238,10 @@ namespace llarp
SecretKey m_SecretKey; SecretKey m_SecretKey;
using AuthedLinks = using AuthedLinks =
std::unordered_multimap< RouterID, std::unique_ptr< ILinkSession >, std::unordered_multimap< RouterID, std::shared_ptr< ILinkSession >,
RouterID::Hash >; RouterID::Hash >;
using Pending = using Pending =
std::unordered_multimap< llarp::Addr, std::unique_ptr< ILinkSession >, std::unordered_multimap< llarp::Addr, std::shared_ptr< ILinkSession >,
llarp::Addr::Hash >; llarp::Addr::Hash >;
Mutex m_AuthedLinksMutex ACQUIRED_BEFORE(m_PendingMutex); Mutex m_AuthedLinksMutex ACQUIRED_BEFORE(m_PendingMutex);

@ -17,53 +17,70 @@ namespace llarp
{ {
virtual ~ILinkSession(){}; virtual ~ILinkSession(){};
/// hook for utp for when we have established a connection
virtual void
OnLinkEstablished(ILinkLayer *p) = 0;
/// called every event loop tick /// called every event loop tick
std::function< void(void) > Pump; virtual void
Pump() = 0;
/// called every timer tick /// called every timer tick
std::function< void(llarp_time_t) > Tick; virtual void Tick(llarp_time_t) = 0;
/// send a message buffer to the remote endpoint /// send a message buffer to the remote endpoint
std::function< bool(const llarp_buffer_t &) > SendMessageBuffer; virtual bool
SendMessageBuffer(const llarp_buffer_t &) = 0;
/// start the connection /// start the connection
std::function< void(void) > Start; virtual void
Start() = 0;
/// send a keepalive to the remote endpoint virtual void
std::function< bool(void) > SendKeepAlive; Close() = 0;
/// send close message /// send a keepalive to the remote endpoint
std::function< void(void) > SendClose; virtual bool
SendKeepAlive() = 0;
/// return true if we are established /// return true if we are established
std::function< bool(void) > IsEstablished; virtual bool
IsEstablished() = 0;
/// return true if this session has timed out /// return true if this session has timed out
std::function< bool(llarp_time_t) > TimedOut; virtual bool
TimedOut(llarp_time_t now) const = 0;
/// get remote public identity key /// get remote public identity key
std::function< PubKey(void) > GetPubKey; virtual PubKey
GetPubKey() const = 0;
/// get remote address /// get remote address
std::function< Addr(void) > GetRemoteEndpoint; virtual Addr
GetRemoteEndpoint() const = 0;
// get remote rc // get remote rc
std::function< RouterContact(void) > GetRemoteRC; virtual RouterContact
GetRemoteRC() const = 0;
/// handle a valid LIM /// handle a valid LIM
std::function< bool(const LinkIntroMessage *msg) > GotLIM; std::function< bool(const LinkIntroMessage *msg) > GotLIM;
/// send queue current blacklog /// send queue current blacklog
std::function< size_t(void) > SendQueueBacklog; virtual size_t
SendQueueBacklog() const = 0;
/// get parent link layer /// get parent link layer
std::function< ILinkLayer *(void) > GetLinkLayer; virtual ILinkLayer *
GetLinkLayer() const = 0;
/// renegotiate session when we have a new RC locally /// renegotiate session when we have a new RC locally
std::function< bool(void) > RenegotiateSession; virtual bool
RenegotiateSession() = 0;
/// return true if we should send an explicit keepalive message /// return true if we should send an explicit keepalive message
std::function< bool(void) > ShouldPing; virtual bool
ShouldPing() const = 0;
}; };
} // namespace llarp } // namespace llarp

@ -24,12 +24,6 @@ namespace llarp
{ {
namespace utp namespace utp
{ {
Crypto*
LinkLayer::OurCrypto()
{
return _crypto;
}
uint64 uint64
LinkLayer::OnConnect(utp_callback_arguments* arg) LinkLayer::OnConnect(utp_callback_arguments* arg)
{ {
@ -175,6 +169,8 @@ namespace llarp
LinkLayer::~LinkLayer() LinkLayer::~LinkLayer()
{ {
m_Pending.clear();
m_AuthedLinks.clear();
utp_destroy(_utp_ctx); utp_destroy(_utp_ctx);
} }
@ -302,7 +298,7 @@ namespace llarp
void void
LinkLayer::Stop() LinkLayer::Stop()
{ {
ForEachSession([](ILinkSession* s) { s->SendClose(); }); ForEachSession([](ILinkSession* s) { s->Close(); });
} }
bool bool
@ -331,11 +327,12 @@ namespace llarp
return "utp"; return "utp";
} }
ILinkSession* std::shared_ptr< ILinkSession >
LinkLayer::NewOutboundSession(const RouterContact& rc, LinkLayer::NewOutboundSession(const RouterContact& rc,
const AddressInfo& addr) const AddressInfo& addr)
{ {
return new Session(this, utp_create_socket(_utp_ctx), rc, addr); return std::make_shared< OutboundSession >(
this, utp_create_socket(_utp_ctx), rc, addr);
} }
uint64 uint64
@ -391,14 +388,16 @@ namespace llarp
static_cast< LinkLayer* >(utp_context_get_userdata(arg->context)); static_cast< LinkLayer* >(utp_context_get_userdata(arg->context));
Addr remote(*arg->address); Addr remote(*arg->address);
LogDebug("utp accepted from ", remote); LogDebug("utp accepted from ", remote);
Session* session = new Session(self, arg->socket, remote); std::shared_ptr< ILinkSession > session =
std::make_shared< InboundSession >(self, arg->socket, remote);
if(!self->PutSession(session)) if(!self->PutSession(session))
{ {
session->Close(); session->Close();
delete session;
} }
else else
{
session->OnLinkEstablished(self); session->OnLinkEstablished(self);
}
return 0; return 0;
} }

@ -58,11 +58,11 @@ namespace llarp
/// get AI rank /// get AI rank
uint16_t uint16_t
Rank() const; Rank() const override;
/// handle low level recv /// handle low level recv
void void
RecvFrom(const Addr& from, const void* buf, size_t sz); RecvFrom(const Addr& from, const void* buf, size_t sz) override;
#ifdef __linux__ #ifdef __linux__
/// process ICMP stuff on linux /// process ICMP stuff on linux
@ -71,11 +71,14 @@ namespace llarp
#endif #endif
Crypto* Crypto*
OurCrypto(); OurCrypto() override
{
return _crypto;
}
/// pump sessions /// pump sessions
void void
Pump(); Pump() override;
/// stop link layer /// stop link layer
void void
@ -83,15 +86,16 @@ namespace llarp
/// regenerate transport keypair /// regenerate transport keypair
bool bool
KeyGen(SecretKey& k); KeyGen(SecretKey& k) override;
/// do tick /// do tick
void void
Tick(llarp_time_t now); Tick(llarp_time_t now);
/// create new outbound session /// create new outbound session
ILinkSession* std::shared_ptr< ILinkSession >
NewOutboundSession(const RouterContact& rc, const AddressInfo& addr); NewOutboundSession(const RouterContact& rc,
const AddressInfo& addr) override;
/// create new socket /// create new socket
utp_socket* utp_socket*
@ -99,7 +103,7 @@ namespace llarp
/// get ai name /// get ai name
const char* const char*
Name() const; Name() const override;
}; };
} // namespace utp } // namespace utp

@ -12,7 +12,7 @@ namespace llarp
using namespace std::placeholders; using namespace std::placeholders;
void void
Session::OnLinkEstablished(LinkLayer* p) Session::OnLinkEstablished(ILinkLayer* p)
{ {
parent = p; parent = p;
EnterState(eLinkEstablished); EnterState(eLinkEstablished);
@ -79,13 +79,6 @@ namespace llarp
} }
} }
void
Session::Connect()
{
utp_connect(sock, remoteAddr, remoteAddr.SockLen());
EnterState(eConnecting);
}
void void
Session::OutboundLinkEstablished(LinkLayer* p) Session::OutboundLinkEstablished(LinkLayer* p)
{ {
@ -136,7 +129,7 @@ namespace llarp
} }
void void
Session::TickImpl(llarp_time_t now) Session::Tick(llarp_time_t now)
{ {
PruneInboundMessages(now); PruneInboundMessages(now);
m_TXRate = 0; m_TXRate = 0;
@ -194,7 +187,7 @@ namespace llarp
} }
bool bool
Session::IsTimedOut(llarp_time_t now) const Session::TimedOut(llarp_time_t now) const
{ {
if(state == eInitial) if(state == eInitial)
return false; return false;
@ -206,14 +199,14 @@ namespace llarp
return state == eClose; return state == eClose;
} }
const PubKey& PubKey
Session::RemotePubKey() const Session::GetPubKey() const
{ {
return remoteRC.pubkey; return remoteRC.pubkey;
} }
Addr Addr
Session::RemoteEndpoint() Session::GetRemoteEndpoint() const
{ {
return remoteAddr; return remoteAddr;
} }
@ -227,163 +220,27 @@ namespace llarp
parent = p; parent = p;
remoteTransportPubKey.Zero(); remoteTransportPubKey.Zero();
SendQueueBacklog = [&]() -> size_t { return sendq.size(); };
ShouldPing = [&]() -> bool {
auto dlt = parent->Now() - lastActive;
return dlt >= 10000;
};
SendKeepAlive = [&]() -> bool {
if(state == eSessionReady)
{
DiscardMessage msg;
std::array< byte_t, 128 > tmp;
llarp_buffer_t buf(tmp);
if(!msg.BEncode(&buf))
return false;
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
return this->QueueWriteBuffers(buf);
}
return true;
};
gotLIM = false; gotLIM = false;
recvBufOffset = 0; recvBufOffset = 0;
TimedOut = std::bind(&Session::IsTimedOut, this, std::placeholders::_1);
GetPubKey = std::bind(&Session::RemotePubKey, this);
GetRemoteRC = [&]() -> RouterContact { return this->remoteRC; };
GetLinkLayer = std::bind(&Session::GetParent, this);
lastActive = parent->Now(); lastActive = parent->Now();
Pump = std::bind(&Session::DoPump, this);
Tick = std::bind(&Session::TickImpl, this, std::placeholders::_1);
SendMessageBuffer =
std::bind(&Session::QueueWriteBuffers, this, std::placeholders::_1);
IsEstablished = [=]() {
return this->state == eSessionReady || this->state == eLinkEstablished;
};
SendClose = std::bind(&Session::Close, this);
GetRemoteEndpoint = std::bind(&Session::RemoteEndpoint, this);
RenegotiateSession = std::bind(&Session::Rehandshake, this);
}
/// outbound session
Session::Session(LinkLayer* p, utp_socket* s, const RouterContact& rc,
const AddressInfo& addr)
: Session(p)
{
remoteTransportPubKey = addr.pubkey;
remoteRC = rc;
RouterID rid = remoteRC.pubkey;
OurCrypto()->shorthash(txKey, llarp_buffer_t(rid));
rid = p->GetOurRC().pubkey;
OurCrypto()->shorthash(rxKey, llarp_buffer_t(rid));
sock = s;
assert(utp_set_userdata(sock, this) == this);
assert(s == sock);
remoteAddr = addr;
Start = std::bind(&Session::Connect, this);
GotLIM = std::bind(&Session::OutboundLIM, this, std::placeholders::_1);
} }
/// inbound session bool
Session::Session(LinkLayer* p, utp_socket* s, const Addr& addr) : Session(p) Session::ShouldPing() const
{ {
RouterID rid = p->GetOurRC().pubkey; auto dlt = parent->Now() - lastActive;
OurCrypto()->shorthash(rxKey, llarp_buffer_t(rid)); return dlt >= 10000;
remoteRC.Clear(); };
sock = s;
assert(s == sock);
assert(utp_set_userdata(sock, this) == this);
remoteAddr = addr;
Start = []() {};
GotLIM = std::bind(&Session::InboundLIM, this, std::placeholders::_1);
}
ILinkLayer* ILinkLayer*
Session::GetParent() Session::GetLinkLayer() const
{ {
return parent; return parent;
} }
bool
Session::InboundLIM(const LinkIntroMessage* msg)
{
if(gotLIM && remoteRC.pubkey != msg->rc.pubkey)
{
Close();
return false;
}
if(!gotLIM)
{
remoteRC = msg->rc;
OurCrypto()->shorthash(txKey, llarp_buffer_t(remoteRC.pubkey));
if(!DoKeyExchange(std::bind(&Crypto::transport_dh_server, OurCrypto(),
_1, _2, _3, _4),
rxKey, msg->N, remoteRC.enckey,
parent->TransportSecretKey()))
return false;
std::array< byte_t, LinkIntroMessage::MaxSize > tmp;
llarp_buffer_t buf(tmp);
LinkIntroMessage replymsg;
replymsg.rc = parent->GetOurRC();
if(!replymsg.rc.Verify(OurCrypto(), parent->Now()))
{
LogError("our RC is invalid? closing session to", remoteAddr);
Close();
return false;
}
replymsg.N.Randomize();
replymsg.P = DefaultLinkSessionLifetime;
if(!replymsg.Sign(parent->Sign))
{
LogError("failed to sign LIM for inbound handshake from ",
remoteAddr);
Close();
return false;
}
// encode
if(!replymsg.BEncode(&buf))
{
LogError("failed to encode LIM for handshake from ", remoteAddr);
Close();
return false;
}
// rewind
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send
if(!SendMessageBuffer(buf))
{
LogError("failed to repl to handshake from ", remoteAddr);
Close();
return false;
}
if(!DoKeyExchange(std::bind(&Crypto::transport_dh_client, OurCrypto(),
_1, _2, _3, _4),
txKey, replymsg.N, remoteRC.enckey,
parent->RouterEncryptionSecret()))
return false;
LogDebug("Sent reply LIM");
gotLIM = true;
EnterState(eSessionReady);
/// future LIM are used for session renegotiation
GotLIM = std::bind(&Session::GotSessionRenegotiate, this,
std::placeholders::_1);
}
return true;
}
void void
Session::DoPump() Session::Pump()
{ {
// pump write queue // pump write queue
PumpWrite(); PumpWrite();
@ -392,11 +249,11 @@ namespace llarp
} }
bool bool
Session::QueueWriteBuffers(const llarp_buffer_t& buf) Session::SendMessageBuffer(const llarp_buffer_t& buf)
{ {
if(sendq.size() >= MaxSendQueueSize) if(sendq.size() >= MaxSendQueueSize)
return false; return false;
// premptive pump // preemptive pump
if(sendq.size() >= MaxSendQueueSize / 2) if(sendq.size() >= MaxSendQueueSize / 2)
PumpWrite(); PumpWrite();
size_t sz = buf.sz; size_t sz = buf.sz;
@ -418,27 +275,19 @@ namespace llarp
} }
bool bool
Session::OutboundLIM(const LinkIntroMessage* msg) Session::SendKeepAlive()
{ {
if(gotLIM && remoteRC.pubkey != msg->rc.pubkey) if(state == eSessionReady)
{
return false;
}
remoteRC = msg->rc;
gotLIM = true;
if(!DoKeyExchange(std::bind(&Crypto::transport_dh_server, OurCrypto(), _1,
_2, _3, _4),
rxKey, msg->N, remoteRC.enckey,
parent->RouterEncryptionSecret()))
{ {
Close(); DiscardMessage msg;
return false; std::array< byte_t, 128 > tmp;
llarp_buffer_t buf(tmp);
if(!msg.BEncode(&buf))
return false;
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
return this->SendMessageBuffer(buf);
} }
/// future LIM are used for session renegotiation
GotLIM = std::bind(&Session::GotSessionRenegotiate, this,
std::placeholders::_1);
EnterState(eSessionReady);
return true; return true;
} }
@ -593,7 +442,7 @@ namespace llarp
} }
bool bool
Session::Rehandshake() Session::RenegotiateSession()
{ {
LinkIntroMessage lim; LinkIntroMessage lim;
lim.rc = parent->GetOurRC(); lim.rc = parent->GetOurRC();
@ -741,5 +590,142 @@ namespace llarp
{ {
lastActive = parent->Now(); lastActive = parent->Now();
} }
InboundSession::InboundSession(LinkLayer* p, utp_socket* s,
const Addr& addr)
: Session(p)
{
sock = s;
remoteAddr = addr;
RouterID rid = p->GetOurRC().pubkey;
OurCrypto()->shorthash(rxKey, llarp_buffer_t(rid));
remoteRC.Clear();
assert(s == sock);
assert(utp_set_userdata(sock, this) == this);
GotLIM = std::bind(&InboundSession::InboundLIM, this, _1);
}
bool
InboundSession::InboundLIM(const LinkIntroMessage* msg)
{
if(gotLIM && remoteRC.pubkey != msg->rc.pubkey)
{
Close();
return false;
}
if(!gotLIM)
{
remoteRC = msg->rc;
OurCrypto()->shorthash(txKey, llarp_buffer_t(remoteRC.pubkey));
if(!DoKeyExchange(std::bind(&Crypto::transport_dh_server, OurCrypto(),
_1, _2, _3, _4),
rxKey, msg->N, remoteRC.enckey,
parent->TransportSecretKey()))
return false;
std::array< byte_t, LinkIntroMessage::MaxSize > tmp;
llarp_buffer_t buf(tmp);
LinkIntroMessage replymsg;
replymsg.rc = parent->GetOurRC();
if(!replymsg.rc.Verify(OurCrypto(), parent->Now()))
{
LogError("our RC is invalid? closing session to", remoteAddr);
Close();
return false;
}
replymsg.N.Randomize();
replymsg.P = DefaultLinkSessionLifetime;
if(!replymsg.Sign(parent->Sign))
{
LogError("failed to sign LIM for inbound handshake from ",
remoteAddr);
Close();
return false;
}
// encode
if(!replymsg.BEncode(&buf))
{
LogError("failed to encode LIM for handshake from ", remoteAddr);
Close();
return false;
}
// rewind
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send
if(!SendMessageBuffer(buf))
{
LogError("failed to repl to handshake from ", remoteAddr);
Close();
return false;
}
if(!DoKeyExchange(std::bind(&Crypto::transport_dh_client, OurCrypto(),
_1, _2, _3, _4),
txKey, replymsg.N, remoteRC.enckey,
parent->RouterEncryptionSecret()))
return false;
LogDebug("Sent reply LIM");
gotLIM = true;
EnterState(eSessionReady);
/// future LIM are used for session renegotiation
GotLIM = std::bind(&Session::GotSessionRenegotiate, this, _1);
}
return true;
}
OutboundSession::OutboundSession(LinkLayer* p, utp_socket* s,
const RouterContact& rc,
const AddressInfo& addr)
: Session(p)
{
remoteTransportPubKey = addr.pubkey;
remoteRC = rc;
sock = s;
remoteAddr = addr;
RouterID rid = remoteRC.pubkey;
OurCrypto()->shorthash(txKey, llarp_buffer_t(rid));
rid = p->GetOurRC().pubkey;
OurCrypto()->shorthash(rxKey, llarp_buffer_t(rid));
assert(utp_set_userdata(sock, this) == this);
assert(s == sock);
GotLIM = std::bind(&OutboundSession::OutboundLIM, this, _1);
}
void
OutboundSession::Start()
{
utp_connect(sock, remoteAddr, remoteAddr.SockLen());
EnterState(eConnecting);
}
bool
OutboundSession::OutboundLIM(const LinkIntroMessage* msg)
{
if(gotLIM && remoteRC.pubkey != msg->rc.pubkey)
{
return false;
}
remoteRC = msg->rc;
gotLIM = true;
if(!DoKeyExchange(std::bind(&Crypto::transport_dh_server, OurCrypto(), _1,
_2, _3, _4),
rxKey, msg->N, remoteRC.enckey,
parent->RouterEncryptionSecret()))
{
Close();
return false;
}
/// future LIM are used for session renegotiation
GotLIM = std::bind(&Session::GotSessionRenegotiate, this, _1);
EnterState(eSessionReady);
return true;
}
} // namespace utp } // namespace utp
} // namespace llarp } // namespace llarp

@ -13,14 +13,14 @@ namespace llarp
{ {
struct LinkLayer; struct LinkLayer;
struct Session final : public ILinkSession struct Session : public ILinkSession
{ {
/// remote router's rc /// remote router's rc
RouterContact remoteRC; RouterContact remoteRC;
/// underlying socket /// underlying socket
utp_socket* sock; utp_socket* sock;
/// link layer parent /// link layer parent
LinkLayer* parent; ILinkLayer* parent;
/// did we get a LIM from the remote yet? /// did we get a LIM from the remote yet?
bool gotLIM; bool gotLIM;
/// remote router's transport pubkey /// remote router's transport pubkey
@ -68,15 +68,10 @@ namespace llarp
util::StatusObject util::StatusObject
ExtractStatus() const override; ExtractStatus() const override;
/// base virtual ~Session() = 0;
Session(LinkLayer* p);
/// outbound
Session(LinkLayer* p, utp_socket* s, const RouterContact& rc,
const AddressInfo& addr);
/// inbound /// base
Session(LinkLayer* p, utp_socket* s, const Addr& remote); explicit Session(LinkLayer* p);
enum State enum State
{ {
@ -88,41 +83,49 @@ namespace llarp
eClose // utp connection is closed eClose // utp connection is closed
}; };
/// get router
// Router*
// Router();
Crypto*
OurCrypto();
/// session state, call EnterState(State) to set /// session state, call EnterState(State) to set
State state; State state;
/// hook for utp for when we have established a connection /// hook for utp for when we have established a connection
void void
OnLinkEstablished(LinkLayer* p); OnLinkEstablished(ILinkLayer* p) override;
Crypto*
OurCrypto();
/// switch states /// switch states
void void
EnterState(State st); EnterState(State st);
Session();
~Session();
/// handle LIM after handshake /// handle LIM after handshake
bool bool
GotSessionRenegotiate(const LinkIntroMessage* msg); GotSessionRenegotiate(const LinkIntroMessage* msg);
/// re negotiate session with our new local RC /// re negotiate session with our new local RC
bool bool
Rehandshake(); RenegotiateSession() override;
bool
ShouldPing() const override;
/// pump tx queue /// pump tx queue
void void
PumpWrite(); PumpWrite();
void void
DoPump(); Pump() override;
bool
SendKeepAlive() override;
bool
IsEstablished() override
{
return state == eSessionReady || state == eLinkEstablished;
}
bool
TimedOut(llarp_time_t now) const override;
/// verify a fragment buffer and the decrypt it /// verify a fragment buffer and the decrypt it
/// buf is assumed to be FragmentBufferSize bytes long /// buf is assumed to be FragmentBufferSize bytes long
@ -136,7 +139,7 @@ namespace llarp
/// queue a fully formed message /// queue a fully formed message
bool bool
QueueWriteBuffers(const llarp_buffer_t& buf); SendMessageBuffer(const llarp_buffer_t& buf) override;
/// prune expired inbound messages /// prune expired inbound messages
void void
@ -165,42 +168,67 @@ namespace llarp
MutateKey(SharedSecret& K, const AlignedBuffer< 24 >& A); MutateKey(SharedSecret& K, const AlignedBuffer< 24 >& A);
void void
TickImpl(llarp_time_t now); Tick(llarp_time_t now) override;
/// close session /// close session
void void
Close(); Close() override;
/// low level read /// low level read
bool bool
Recv(const byte_t* buf, size_t sz); Recv(const byte_t* buf, size_t sz);
/// handle inbound LIM
bool
InboundLIM(const LinkIntroMessage* msg);
/// handle outbound LIM
bool
OutboundLIM(const LinkIntroMessage* msg);
/// return true if timed out
bool
IsTimedOut(llarp_time_t now) const;
/// get remote identity pubkey /// get remote identity pubkey
const PubKey& PubKey
RemotePubKey() const; GetPubKey() const override;
/// get remote address /// get remote address
Addr Addr
RemoteEndpoint(); GetRemoteEndpoint() const override;
RouterContact
GetRemoteRC() const override
{
return remoteRC;
}
/// get parent link /// get parent link
ILinkLayer* ILinkLayer*
GetParent(); GetLinkLayer() const override;
void void
MarkEstablished(); MarkEstablished();
size_t
SendQueueBacklog() const override
{
return sendq.size();
}
};
struct InboundSession final : public Session
{
InboundSession(LinkLayer* p, utp_socket* s, const Addr& addr);
bool
InboundLIM(const LinkIntroMessage* msg);
void
Start() override
{
}
};
struct OutboundSession final : public Session
{
OutboundSession(LinkLayer* p, utp_socket* s, const RouterContact& rc,
const AddressInfo& addr);
bool
OutboundLIM(const LinkIntroMessage* msg);
void
Start() override;
}; };
} // namespace utp } // namespace utp
} // namespace llarp } // namespace llarp

@ -419,11 +419,11 @@ TEST(TestThreadPool, recurseJob)
static constexpr size_t depth = 10; static constexpr size_t depth = 10;
static constexpr size_t capacity = 100; static constexpr size_t capacity = 100;
ThreadPool pool(threads, capacity);
util::Barrier barrier(threads + 1); util::Barrier barrier(threads + 1);
std::atomic_size_t counter{0}; std::atomic_size_t counter{0};
ThreadPool pool(threads, capacity);
pool.start(); pool.start();
ASSERT_TRUE(pool.addJob(std::bind(recurse, std::ref(barrier), ASSERT_TRUE(pool.addJob(std::bind(recurse, std::ref(barrier),

Loading…
Cancel
Save