finish multithread cryptography first pass

pull/830/head
Jeff Becker 5 years ago
parent 14c9ef15ed
commit 327c545530
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -784,7 +784,7 @@ namespace llarp
}
llarp::LogWarn(Name(), " did not flush packets");
});
SendAllDownstream(Router());
Router()->PumpLL();
}
bool

@ -1,5 +1,6 @@
#include <iwp/linklayer.hpp>
#include <iwp/session.hpp>
#include <unordered_set>
namespace llarp
{
@ -22,13 +23,14 @@ namespace llarp
void
LinkLayer::Pump()
{
std::set< RouterID > sessions;
std::unordered_set< RouterID, RouterID::Hash > sessions;
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
sessions.emplace(itr->first);
const RouterID r{itr->first};
sessions.emplace(r);
++itr;
}
}

@ -22,11 +22,11 @@ namespace llarp
ILinkSession::Packet_t
OutboundMessage::XMIT() const
{
auto xmit = CreatePacket(Command::eXMIT, 12 + 32, 0, 0);
htobe16buf(xmit.data() + 2 + PacketOverhead, m_Data.size());
htobe64buf(xmit.data() + 4 + PacketOverhead, m_MsgID);
auto xmit = CreatePacket(Command::eXMIT, 10 + 32);
htobe16buf(xmit.data() + CommandOverhead + PacketOverhead, m_Data.size());
htobe64buf(xmit.data() + 2 + CommandOverhead + PacketOverhead, m_MsgID);
std::copy_n(m_Digest.begin(), m_Digest.size(),
xmit.begin() + 12 + PacketOverhead);
xmit.begin() + 10 + CommandOverhead + PacketOverhead);
return xmit;
}
@ -84,7 +84,7 @@ namespace llarp
const auto sz = m_Data.size();
for(uint16_t idx = 0; idx < sz; idx += FragmentSize)
{
if(!m_Acks.test(idx / FragmentSize))
if(not m_Acks.test(idx / FragmentSize))
return false;
}
return true;
@ -109,8 +109,8 @@ namespace llarp
InboundMessage::InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h,
llarp_time_t now)
: m_Digset{std::move(h)}
, m_Size{sz}
: m_Data(size_t{sz})
, m_Digset{std::move(h)}
, m_MsgID{msgid}
, m_LastActiveAt{now}
{
@ -126,18 +126,18 @@ namespace llarp
return;
}
auto *dst = m_Data.data() + idx;
byte_t *dst = m_Data.data() + idx;
std::copy_n(buf.base, buf.sz, dst);
m_Acks.set(idx / FragmentSize);
LogDebug("got fragment ", idx / FragmentSize, " of ", m_Size);
LogDebug("got fragment ", idx / FragmentSize);
m_LastActiveAt = now;
}
ILinkSession::Packet_t
InboundMessage::ACKS() const
{
auto acks = CreatePacket(Command::eACKS, 9, 0, 0);
htobe64buf(acks.data() + 2 + PacketOverhead, m_MsgID);
auto acks = CreatePacket(Command::eACKS, 9);
htobe64buf(acks.data() + CommandOverhead + PacketOverhead, m_MsgID);
acks[PacketOverhead + 10] = AcksBitmask();
return acks;
}
@ -151,9 +151,10 @@ namespace llarp
bool
InboundMessage::IsCompleted() const
{
for(uint16_t idx = 0; idx < m_Size; idx += FragmentSize)
const auto sz = m_Data.size();
for(size_t idx = 0; idx < sz; idx += FragmentSize)
{
if(!m_Acks.test(idx / FragmentSize))
if(not m_Acks.test(idx / FragmentSize))
return false;
}
return true;
@ -162,7 +163,7 @@ namespace llarp
bool
InboundMessage::ShouldSendACKS(llarp_time_t now) const
{
return now - m_LastACKSent > 1000 || IsCompleted();
return (now > m_LastACKSent && now - m_LastACKSent > 1000);
}
bool
@ -184,7 +185,7 @@ namespace llarp
InboundMessage::Verify() const
{
ShortHash gotten;
const llarp_buffer_t buf(m_Data.data(), m_Size);
const llarp_buffer_t buf(m_Data);
CryptoManager::instance()->shorthash(gotten, buf);
return gotten == m_Digset;
}

@ -26,10 +26,13 @@ namespace llarp
/// multiack
eMACK = 5,
/// close session
eCLOS = 6,
eCLOS = 0xff,
};
/// max size of data fragments
static constexpr size_t FragmentSize = 1024;
/// plaintext header overhead size
static constexpr size_t CommandOverhead = 2;
struct OutboundMessage
{
@ -78,9 +81,8 @@ namespace llarp
InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h,
llarp_time_t now);
AlignedBuffer< MAX_LINK_MSG_SIZE > m_Data;
ILinkSession::Message_t m_Data;
ShortHash m_Digset;
uint16_t m_Size = 0;
uint64_t m_MsgID = 0;
llarp_time_t m_LastACKSent = 0;
llarp_time_t m_LastActiveAt = 0;

@ -11,13 +11,15 @@ namespace llarp
ILinkSession::Packet_t
CreatePacket(Command cmd, size_t plainsize, size_t minpad, size_t variance)
{
const size_t pad = minpad > 0 ? minpad + randint() % variance : 0;
ILinkSession::Packet_t pkt(PacketOverhead + plainsize + pad + 2);
const size_t pad =
minpad > 0 ? minpad + (variance > 0 ? randint() % variance : 0) : 0;
ILinkSession::Packet_t pkt(PacketOverhead + plainsize + pad
+ CommandOverhead);
// randomize pad
if(pad)
{
CryptoManager::instance()->randbytes(
pkt.data() + PacketOverhead + 2 + plainsize, pad);
pkt.data() + PacketOverhead + CommandOverhead + plainsize, pad);
}
// randomize nounce
CryptoManager::instance()->randbytes(pkt.data() + HMACSIZE, TUNNONCESIZE);
@ -39,8 +41,6 @@ namespace llarp
GotLIM = util::memFn(&Session::GotOutboundLIM, this);
CryptoManager::instance()->shorthash(m_SessionKey,
llarp_buffer_t(rc.pubkey));
m_EncryptNext = std::make_shared< CryptoQueue_t >();
m_DecryptNext = std::make_shared< CryptoQueue_t >();
}
Session::Session(LinkLayer* p, Addr from)
@ -54,8 +54,6 @@ namespace llarp
GotLIM = util::memFn(&Session::GotInboundLIM, this);
const PubKey pk = m_Parent->GetOurRC().pubkey;
CryptoManager::instance()->shorthash(m_SessionKey, llarp_buffer_t(pk));
m_EncryptNext = std::make_shared< CryptoQueue_t >();
m_DecryptNext = std::make_shared< CryptoQueue_t >();
}
void
@ -132,11 +130,13 @@ namespace llarp
void
Session::EncryptAndSend(ILinkSession::Packet_t data)
{
if(m_EncryptNext == nullptr)
m_EncryptNext = std::make_shared< CryptoQueue_t >();
m_EncryptNext->emplace_back(std::move(data));
if(!IsEstablished())
{
EncryptWorker(std::move(m_EncryptNext));
m_EncryptNext = std::make_shared< CryptoQueue_t >();
m_EncryptNext = nullptr;
}
}
@ -198,21 +198,21 @@ namespace llarp
Session::SendMACK()
{
// send multi acks
while(m_SendMACKS.size() > 0)
{
const auto sz = m_SendMACKS.size();
const auto max = Session::MaxACKSInMACK;
auto numAcks = std::min(sz, max);
auto mack = CreatePacket(Command::eMACK,
1 + (numAcks * sizeof(uint64_t)), 0, 0);
mack[PacketOverhead + 2] = byte_t{numAcks};
byte_t* ptr = mack.data() + 3 + PacketOverhead;
while(m_SendMACKs.size() > 0)
{
const auto sz = m_SendMACKs.size();
const auto max = Session::MaxACKSInMACK;
auto numAcks = std::min(sz, max);
auto mack =
CreatePacket(Command::eMACK, 1 + (numAcks * sizeof(uint64_t)));
mack[PacketOverhead + CommandOverhead] = byte_t{numAcks};
byte_t* ptr = mack.data() + 3 + PacketOverhead;
LogDebug("send ", numAcks, " macks to ", m_RemoteAddr);
auto itr = m_SendMACKS.begin();
auto itr = m_SendMACKs.begin();
while(numAcks > 0)
{
htobe64buf(ptr, *itr);
itr = m_SendMACKS.erase(itr);
itr = m_SendMACKs.erase(itr);
numAcks--;
ptr += sizeof(uint64_t);
}
@ -246,20 +246,20 @@ namespace llarp
}
}
auto self = shared_from_this();
if(!m_EncryptNext->empty())
if(m_EncryptNext && !m_EncryptNext->empty())
{
m_Parent->QueueWork([self, data = std::move(m_EncryptNext)] {
self->EncryptWorker(data);
});
m_EncryptNext = std::make_shared< CryptoQueue_t >();
m_EncryptNext = nullptr;
}
if(!m_DecryptNext->empty())
if(m_DecryptNext && !m_DecryptNext->empty())
{
m_Parent->QueueWork([self, data = std::move(m_DecryptNext)] {
self->DecryptWorker(data);
});
m_DecryptNext = std::make_shared< CryptoQueue_t >();
m_DecryptNext = nullptr;
}
}
@ -531,6 +531,8 @@ namespace llarp
void
Session::HandleSessionData(Packet_t pkt)
{
if(m_DecryptNext == nullptr)
m_DecryptNext = std::make_shared< CryptoQueue_t >();
m_DecryptNext->emplace_back(pkt);
}
@ -603,14 +605,15 @@ namespace llarp
LogError("impossibly short mack from ", m_RemoteAddr);
return;
}
byte_t numAcks = data[2 + PacketOverhead];
if(data.size() < 3 + PacketOverhead + (numAcks * sizeof(uint64_t)))
byte_t numAcks = data[CommandOverhead + PacketOverhead];
if(data.size()
< 1 + CommandOverhead + PacketOverhead + (numAcks * sizeof(uint64_t)))
{
LogError("short mack from ", m_RemoteAddr);
return;
}
LogDebug("got ", int(numAcks), " mack from ", m_RemoteAddr);
byte_t* ptr = data.data() + 3 + PacketOverhead;
byte_t* ptr = data.data() + CommandOverhead + PacketOverhead + 1;
while(numAcks > 0)
{
uint64_t acked = bufbe64toh(ptr);
@ -633,18 +636,18 @@ namespace llarp
void
Session::HandleNACK(std::vector< byte_t > data)
{
if(data.size() < 10 + PacketOverhead)
if(data.size() < CommandOverhead + sizeof(uint64_t) + PacketOverhead)
{
LogError("short nack from ", m_RemoteAddr);
return;
}
uint64_t txid = bufbe64toh(data.data() + 2 + PacketOverhead);
uint64_t txid =
bufbe64toh(data.data() + CommandOverhead + PacketOverhead);
LogDebug("got nack on ", txid, " from ", m_RemoteAddr);
auto itr = m_TXMsgs.find(txid);
if(itr != m_TXMsgs.end())
{
auto xmit = itr->second.XMIT();
EncryptAndSend(std::move(xmit));
EncryptAndSend(itr->second.XMIT());
}
m_LastRX = m_Parent->Now();
}
@ -652,14 +655,17 @@ namespace llarp
void
Session::HandleXMIT(std::vector< byte_t > data)
{
if(data.size() < 44 + PacketOverhead)
if(data.size() < CommandOverhead + PacketOverhead + sizeof(uint16_t)
+ sizeof(uint64_t) + ShortHash::SIZE)
{
LogError("short XMIT from ", m_RemoteAddr);
return;
}
uint16_t sz = bufbe16toh(data.data() + 2 + PacketOverhead);
uint64_t rxid = bufbe64toh(data.data() + 4 + PacketOverhead);
ShortHash h{data.data() + 12 + PacketOverhead};
uint16_t sz = bufbe16toh(data.data() + CommandOverhead + PacketOverhead);
uint64_t rxid = bufbe64toh(data.data() + CommandOverhead
+ sizeof(uint16_t) + PacketOverhead);
ShortHash h{data.data() + CommandOverhead + sizeof(uint16_t)
+ sizeof(uint64_t) + PacketOverhead};
LogDebug("rxid=", rxid, " sz=", sz, " h=", h.ToHex());
m_LastRX = m_Parent->Now();
{
@ -667,6 +673,7 @@ namespace llarp
auto itr = m_ReplayFilter.find(rxid);
if(itr != m_ReplayFilter.end())
{
m_SendMACKs.emplace(rxid);
LogDebug("duplicate rxid=", rxid, " from ", m_RemoteAddr);
return;
}
@ -684,14 +691,16 @@ namespace llarp
void
Session::HandleDATA(std::vector< byte_t > data)
{
if(data.size() <= 12 + PacketOverhead)
if(data.size() <= CommandOverhead + sizeof(uint16_t) + sizeof(uint64_t)
+ PacketOverhead)
{
LogError("short DATA from ", m_RemoteAddr, " ", data.size());
return;
}
m_LastRX = m_Parent->Now();
uint16_t sz = bufbe16toh(data.data() + 2 + PacketOverhead);
uint64_t rxid = bufbe64toh(data.data() + 4 + PacketOverhead);
m_LastRX = m_Parent->Now();
uint16_t sz = bufbe16toh(data.data() + CommandOverhead + PacketOverhead);
uint64_t rxid = bufbe64toh(data.data() + CommandOverhead
+ sizeof(uint16_t) + PacketOverhead);
auto itr = m_RXMsgs.find(rxid);
if(itr == m_RXMsgs.end())
{
@ -699,13 +708,13 @@ namespace llarp
{
LogDebug("no rxid=", rxid, " for ", m_RemoteAddr);
auto nack = CreatePacket(Command::eNACK, 8);
htobe64buf(nack.data() + PacketOverhead + 2, rxid);
htobe64buf(nack.data() + PacketOverhead + CommandOverhead, rxid);
EncryptAndSend(std::move(nack));
}
else
{
LogDebug("replay hit for rxid=", rxid, " for ", m_RemoteAddr);
m_SendMACKS.emplace(rxid);
m_SendMACKs.emplace(rxid);
}
return;
}
@ -721,10 +730,10 @@ namespace llarp
if(itr->second.Verify())
{
auto msg = std::move(itr->second);
const llarp_buffer_t buf(msg.m_Data.data(), msg.m_Size);
const llarp_buffer_t buf(msg.m_Data);
m_Parent->HandleMessage(this, buf);
m_ReplayFilter.emplace(itr->first, m_Parent->Now());
m_SendMACKS.emplace(itr->first);
m_SendMACKs.emplace(itr->first);
}
else
{

@ -4,11 +4,13 @@
#include <link/session.hpp>
#include <iwp/linklayer.hpp>
#include <iwp/message_buffer.hpp>
#include <unordered_set>
namespace llarp
{
namespace iwp
{
/// packet crypto overhead size
static constexpr size_t PacketOverhead = HMACSIZE + TUNNONCESIZE;
/// creates a packet with plaintext size + wire overhead + random pad
ILinkSession::Packet_t
@ -19,20 +21,19 @@ namespace llarp
public std::enable_shared_from_this< Session >
{
/// Time how long we try delivery for
static constexpr llarp_time_t DeliveryTimeout = 2000;
static constexpr llarp_time_t DeliveryTimeout = 1000;
/// Time how long we wait to recieve a message
static constexpr llarp_time_t RecievalTimeout = (DeliveryTimeout * 8) / 5;
/// How long to keep a replay window for
static constexpr llarp_time_t ReplayWindow = (RecievalTimeout * 3) / 2;
/// How often to acks RX messages
static constexpr llarp_time_t ACKResendInterval = 250;
static constexpr llarp_time_t ACKResendInterval = DeliveryTimeout / 4;
/// How often to retransmit TX fragments
static constexpr llarp_time_t TXFlushInterval = ACKResendInterval * 2;
static constexpr llarp_time_t TXFlushInterval = (DeliveryTimeout / 5) * 2;
/// How often we send a keepalive
static constexpr llarp_time_t PingInterval = 2000;
static constexpr llarp_time_t PingInterval = 5000;
/// How long we wait for a session to die with no tx from them
static constexpr llarp_time_t SessionAliveTimeout =
(PingInterval * 13) / 3;
static constexpr llarp_time_t SessionAliveTimeout = PingInterval * 5;
/// maximum number of messages we can ack in a multiack
static constexpr std::size_t MaxACKSInMACK = 1024 / sizeof(uint64_t);
@ -163,10 +164,10 @@ namespace llarp
/// maps rxid to time recieved
std::unordered_map< uint64_t, llarp_time_t > m_ReplayFilter;
/// list of rx messages to send in next set of multiacks
std::set< uint64_t > m_SendMACKS;
/// set of rx messages to send in next round of multiacks
std::unordered_set< uint64_t > m_SendMACKs;
using CryptoQueue_t = std::vector< Packet_t >;
using CryptoQueue_t = std::list< Packet_t >;
using CryptoQueue_ptr = std::shared_ptr< CryptoQueue_t >;
CryptoQueue_ptr m_EncryptNext;
CryptoQueue_ptr m_DecryptNext;

@ -1,6 +1,7 @@
#include <path/path.hpp>
#include <exit/exit_messages.hpp>
#include <link/i_link_manager.hpp>
#include <messages/discard.hpp>
#include <messages/relay_commit.hpp>
#include <messages/relay_status.hpp>
@ -341,6 +342,7 @@ namespace llarp
m_LastLatencyTestID = latency.T;
m_LastLatencyTestTime = now;
SendRoutingMessage(latency, r);
FlushUpstream(r);
return;
}
if(m_LastRecvMessage && now > m_LastRecvMessage)
@ -374,6 +376,7 @@ namespace llarp
LogDebug("failed to send upstream to ", Upstream());
}
}
r->linkManager().PumpLinks();
}
void
@ -485,6 +488,7 @@ namespace llarp
}
m_LastRecvMessage = r->Now();
}
FlushUpstream(r);
}
bool
@ -586,7 +590,10 @@ namespace llarp
latency.T = randint();
m_LastLatencyTestID = latency.T;
m_LastLatencyTestTime = now;
return SendRoutingMessage(latency, r);
if(!SendRoutingMessage(latency, r))
return false;
FlushUpstream(r);
return true;
}
LogWarn("got unwarranted path confirm message on tx=", RXID(),
" rx=", RXID());

@ -260,7 +260,7 @@ namespace llarp
PathContext::PumpUpstream()
{
m_TransitPaths.ForEach([&](auto& ptr) { ptr->FlushUpstream(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->SendAllUpstream(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->UpstreamFlush(m_Router); });
}
void
@ -268,7 +268,7 @@ namespace llarp
{
m_TransitPaths.ForEach(
[&](auto& ptr) { ptr->FlushDownstream(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->SendAllDownstream(m_Router); });
m_OurPaths.ForEach([&](auto& ptr) { ptr->DownstreamFlush(m_Router); });
}
void

@ -376,13 +376,13 @@ namespace llarp
}
void
PathSet::SendAllUpstream(AbstractRouter* r)
PathSet::UpstreamFlush(AbstractRouter* r)
{
ForEachPath([r](const Path_ptr& p) { p->FlushUpstream(r); });
}
void
PathSet::SendAllDownstream(AbstractRouter* r)
PathSet::DownstreamFlush(AbstractRouter* r)
{
ForEachPath([r](const Path_ptr& p) { p->FlushDownstream(r); });
}

@ -276,10 +276,10 @@ namespace llarp
}
void
SendAllUpstream(AbstractRouter* r);
UpstreamFlush(AbstractRouter* r);
void
SendAllDownstream(AbstractRouter* r);
DownstreamFlush(AbstractRouter* r);
size_t numPaths;

@ -3,6 +3,7 @@
#include <dht/context.hpp>
#include <exit/context.hpp>
#include <exit/exit_messages.hpp>
#include <link/i_link_manager.hpp>
#include <messages/discard.hpp>
#include <messages/relay_commit.hpp>
#include <messages/relay_status.hpp>
@ -171,6 +172,7 @@ namespace llarp
}
m_LastActivity = r->Now();
}
FlushDownstream(r);
}
else
{
@ -181,6 +183,7 @@ namespace llarp
r->SendToOrQueue(info.upstream, &msg);
}
}
r->linkManager().PumpLinks();
}
void
@ -193,6 +196,7 @@ namespace llarp
info.upstream, " to ", info.downstream);
r->SendToOrQueue(info.downstream, &msg);
}
r->linkManager().PumpLinks();
}
void
@ -430,7 +434,7 @@ namespace llarp
void
TransitHop::QueueDestroySelf(AbstractRouter* r)
{
auto func = std::bind(&TransitHop::SetSelfDestruct, this);
auto func = std::bind(&TransitHop::SetSelfDestruct, shared_from_this());
r->logic()->queue_func(func);
}
} // namespace path

@ -164,9 +164,11 @@ namespace llarp
void
Router::PumpLL()
{
if(_stopping.load())
return;
paths.PumpUpstream();
paths.PumpDownstream();
_linkManager.PumpLinks();
paths.PumpUpstream();
}
bool
@ -1011,6 +1013,7 @@ namespace llarp
}
LogInfo("have ", nodedb->num_loaded(), " routers");
_netloop->add_ticker(std::bind(&Router::PumpLL, this));
ScheduleTicker(1000);
_running.store(true);
_startedAt = Now();

@ -1065,7 +1065,7 @@ namespace llarp
for(const auto& item : m_state->m_SendQueue)
item.second->SendRoutingMessage(*item.first, router);
m_state->m_SendQueue.clear();
SendAllUpstream(Router());
router->PumpLL();
}
bool

@ -55,9 +55,7 @@ namespace llarp
if(this->thread)
{
llarp_threadpool_stop(this->thread);
llarp_threadpool_join(this->thread);
}
llarp_free_threadpool(&this->thread);
llarp::LogDebug("logic timer stop");
if(this->timer)

Loading…
Cancel
Save