You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lokinet/llarp/iwp/message_buffer.cpp

197 lines
5.1 KiB
C++

5 years ago
#include <iwp/message_buffer.hpp>
#include <iwp/session.hpp>
5 years ago
#include <crypto/crypto.hpp>
namespace llarp
{
namespace iwp
{
5 years ago
OutboundMessage::OutboundMessage(uint64_t msgid,
ILinkSession::Message_t msg,
llarp_time_t now,
ILinkSession::CompletionHandler handler)
5 years ago
: m_Data{std::move(msg)}
, m_MsgID{msgid}
, m_Completed{handler}
, m_StartedAt{now}
{
5 years ago
const llarp_buffer_t buf(m_Data);
CryptoManager::instance()->shorthash(m_Digest, buf);
}
5 years ago
ILinkSession::Packet_t
OutboundMessage::XMIT() const
{
5 years ago
ILinkSession::Packet_t xmit(12 + 32);
xmit[0] = LLARP_PROTO_VERSION;
xmit[1] = Command::eXMIT;
htobe16buf(xmit.data() + 2, m_Data.size());
5 years ago
htobe64buf(xmit.data() + 4, m_MsgID);
5 years ago
std::copy_n(m_Digest.begin(), m_Digest.size(), xmit.begin() + 12);
5 years ago
return xmit;
}
void
OutboundMessage::Completed()
5 years ago
{
if(m_Completed)
{
m_Completed(ILinkSession::DeliveryStatus::eDeliverySuccess);
}
m_Completed = nullptr;
}
bool
OutboundMessage::ShouldFlush(llarp_time_t now) const
{
return now - m_LastFlush >= Session::TXFlushInterval;
5 years ago
}
void
5 years ago
OutboundMessage::Ack(byte_t bitmask)
{
m_Acks = std::bitset< 8 >(bitmask);
5 years ago
}
void
OutboundMessage::FlushUnAcked(
5 years ago
std::function< void(ILinkSession::Packet_t) > sendpkt, llarp_time_t now)
5 years ago
{
5 years ago
/// overhead for a data packet in plaintext
static constexpr size_t Overhead = 12;
uint16_t idx = 0;
const auto datasz = m_Data.size();
while(idx < datasz)
5 years ago
{
if(not m_Acks[idx / FragmentSize])
{
5 years ago
const size_t fragsz =
idx + FragmentSize < datasz ? FragmentSize : datasz - idx;
ILinkSession::Packet_t frag(fragsz + Overhead);
frag[0] = LLARP_PROTO_VERSION;
frag[1] = Command::eDATA;
5 years ago
htobe16buf(frag.data() + 2, idx);
htobe64buf(frag.data() + 4, m_MsgID);
5 years ago
std::copy(m_Data.begin() + idx, m_Data.begin() + idx + fragsz,
5 years ago
frag.begin() + Overhead);
sendpkt(std::move(frag));
5 years ago
}
idx += FragmentSize;
}
m_LastFlush = now;
}
bool
OutboundMessage::IsTransmitted() const
{
5 years ago
const auto sz = m_Data.size();
for(uint16_t idx = 0; idx < sz; idx += FragmentSize)
{
if(!m_Acks.test(idx / FragmentSize))
return false;
}
return true;
5 years ago
}
bool
OutboundMessage::IsTimedOut(const llarp_time_t now) const
{
// TODO: make configurable by outbound message deliverer
return now > m_StartedAt && now - m_StartedAt > Session::DeliveryTimeout;
}
void
OutboundMessage::InformTimeout()
{
if(m_Completed)
{
m_Completed(ILinkSession::DeliveryStatus::eDeliveryDropped);
}
m_Completed = nullptr;
}
InboundMessage::InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h,
llarp_time_t now)
: m_Digset{std::move(h)}
, m_Size{sz}
, m_MsgID{msgid}
, m_LastActiveAt{now}
{
}
5 years ago
void
InboundMessage::HandleData(uint16_t idx, const llarp_buffer_t &buf,
llarp_time_t now)
5 years ago
{
if(idx + buf.sz > m_Data.size())
5 years ago
return;
auto *dst = m_Data.data() + idx;
std::copy_n(buf.base, buf.sz, dst);
5 years ago
m_Acks.set(idx / FragmentSize);
LogDebug("got fragment ", idx / FragmentSize, " of ", m_Size);
m_LastActiveAt = now;
5 years ago
}
5 years ago
ILinkSession::Packet_t
InboundMessage::ACKS() const
5 years ago
{
5 years ago
ILinkSession::Packet_t acks(9);
acks[0] = LLARP_PROTO_VERSION;
acks[1] = Command::eACKS;
5 years ago
htobe64buf(acks.data() + 2, m_MsgID);
5 years ago
acks[8] = AcksBitmask();
5 years ago
return acks;
}
byte_t
InboundMessage::AcksBitmask() const
{
return byte_t{(byte_t)m_Acks.to_ulong()};
}
bool
5 years ago
InboundMessage::IsCompleted() const
{
for(uint16_t idx = 0; idx < m_Size; idx += FragmentSize)
{
if(!m_Acks.test(idx / FragmentSize))
return false;
}
return true;
}
bool
InboundMessage::ShouldSendACKS(llarp_time_t now) const
{
return now - m_LastACKSent > 1000 || IsCompleted();
}
bool
InboundMessage::IsTimedOut(const llarp_time_t now) const
{
return now > m_LastActiveAt
&& now - m_LastActiveAt > Session::DeliveryTimeout;
}
5 years ago
void
InboundMessage::SendACKS(
5 years ago
std::function< void(ILinkSession::Packet_t) > sendpkt, llarp_time_t now)
5 years ago
{
auto acks = ACKS();
AddRandomPadding(acks);
5 years ago
sendpkt(std::move(acks));
5 years ago
m_LastACKSent = now;
}
bool
5 years ago
InboundMessage::Verify() const
{
ShortHash gotten;
const llarp_buffer_t buf(m_Data.data(), m_Size);
5 years ago
CryptoManager::instance()->shorthash(gotten, buf);
5 years ago
return gotten == m_Digset;
5 years ago
}
} // namespace iwp
} // namespace llarp