lokinet/llarp/iwp/message_buffer.cpp

192 lines
5.1 KiB
C++
Raw Normal View History

2019-08-22 20:53:27 +00:00
#include <iwp/message_buffer.hpp>
#include <iwp/session.hpp>
2019-08-22 20:53:27 +00:00
#include <crypto/crypto.hpp>
namespace llarp
{
namespace iwp
{
OutboundMessage::OutboundMessage(
uint64_t msgid,
ILinkSession::Message_t msg,
llarp_time_t now,
ILinkSession::CompletionHandler handler)
2019-09-12 14:34:27 +00:00
: m_Data{std::move(msg)}
2019-08-23 11:32:52 +00:00
, m_MsgID{msgid}
, m_Completed{handler}
, m_LastFlush{now}
, m_StartedAt{now}
2019-08-23 11:32:52 +00:00
{
2019-09-12 14:34:27 +00:00
const llarp_buffer_t buf(m_Data);
CryptoManager::instance()->shorthash(m_Digest, buf);
m_Acks.set(0);
2019-08-23 11:32:52 +00:00
}
2019-09-12 14:34:27 +00:00
ILinkSession::Packet_t
2019-08-23 11:32:52 +00:00
OutboundMessage::XMIT() const
{
size_t extra = std::min(m_Data.size(), FragmentSize);
auto xmit = CreatePacket(Command::eXMIT, 10 + 32 + extra, 0, 0);
htobe16buf(xmit.data() + CommandOverhead + PacketOverhead, m_Data.size());
htobe64buf(xmit.data() + 2 + CommandOverhead + PacketOverhead, m_MsgID);
std::copy_n(
m_Digest.begin(), m_Digest.size(), xmit.data() + 10 + CommandOverhead + PacketOverhead);
std::copy_n(m_Data.data(), extra, xmit.data() + 10 + CommandOverhead + PacketOverhead + 32);
2019-08-22 20:53:27 +00:00
return xmit;
}
void
2019-08-23 11:32:52 +00:00
OutboundMessage::Completed()
2019-08-22 20:53:27 +00:00
{
if (m_Completed)
2019-08-22 20:53:27 +00:00
{
m_Completed(ILinkSession::DeliveryStatus::eDeliverySuccess);
}
m_Completed = nullptr;
}
bool
OutboundMessage::ShouldFlush(llarp_time_t now) const
{
2020-02-24 19:40:45 +00:00
return now - m_LastFlush >= TXFlushInterval;
2019-08-22 20:53:27 +00:00
}
2019-08-23 11:32:52 +00:00
void
2019-08-22 20:53:27 +00:00
OutboundMessage::Ack(byte_t bitmask)
{
m_Acks = std::bitset<8>(bitmask);
2019-08-22 20:53:27 +00:00
}
2019-08-23 11:32:52 +00:00
void
OutboundMessage::FlushUnAcked(
std::function<void(ILinkSession::Packet_t)> sendpkt, llarp_time_t now)
2019-08-22 20:53:27 +00:00
{
2019-09-12 14:34:27 +00:00
/// overhead for a data packet in plaintext
static constexpr size_t Overhead = 10;
uint16_t idx = 0;
const auto datasz = m_Data.size();
while (idx < datasz)
2019-08-22 20:53:27 +00:00
{
if (not m_Acks[idx / FragmentSize])
2019-08-22 20:53:27 +00:00
{
const size_t fragsz = idx + FragmentSize < datasz ? FragmentSize : datasz - idx;
auto frag = CreatePacket(Command::eDATA, fragsz + Overhead, 0, 0);
htobe16buf(frag.data() + 2 + PacketOverhead, idx);
htobe64buf(frag.data() + 4 + PacketOverhead, m_MsgID);
std::copy(
m_Data.begin() + idx,
m_Data.begin() + idx + fragsz,
frag.data() + PacketOverhead + Overhead + 2);
2019-09-12 14:34:27 +00:00
sendpkt(std::move(frag));
2019-08-22 20:53:27 +00:00
}
idx += FragmentSize;
}
m_LastFlush = now;
}
2019-08-23 11:32:52 +00:00
bool
OutboundMessage::IsTransmitted() const
{
2019-09-12 14:34:27 +00:00
const auto sz = m_Data.size();
for (uint16_t idx = 0; idx < sz; idx += FragmentSize)
2019-08-23 11:32:52 +00:00
{
if (not m_Acks.test(idx / FragmentSize))
2019-08-23 11:32:52 +00:00
return false;
}
return true;
2019-08-22 20:53:27 +00:00
}
bool
OutboundMessage::IsTimedOut(const llarp_time_t now) const
{
// TODO: make configurable by outbound message deliverer
2020-02-24 19:40:45 +00:00
return now > m_StartedAt && now - m_StartedAt > DeliveryTimeout;
}
void
OutboundMessage::InformTimeout()
{
if (m_Completed)
{
m_Completed(ILinkSession::DeliveryStatus::eDeliveryDropped);
}
m_Completed = nullptr;
}
InboundMessage::InboundMessage(uint64_t msgid, uint16_t sz, ShortHash h, llarp_time_t now)
: m_Data(size_t{sz}), m_Digset{std::move(h)}, m_MsgID(msgid), m_LastActiveAt{now}
2019-08-23 11:32:52 +00:00
{
}
2019-08-22 20:53:27 +00:00
2019-08-23 11:32:52 +00:00
void
InboundMessage::HandleData(uint16_t idx, const llarp_buffer_t& buf, llarp_time_t now)
2019-08-22 20:53:27 +00:00
{
if (idx + buf.sz > m_Data.size())
{
LogWarn("invalid fragment offset ", idx);
2019-08-22 20:53:27 +00:00
return;
}
byte_t* dst = m_Data.data() + idx;
std::copy_n(buf.base, buf.sz, dst);
2019-08-22 20:53:27 +00:00
m_Acks.set(idx / FragmentSize);
LogDebug("got fragment ", idx / FragmentSize);
m_LastActiveAt = now;
2019-08-22 20:53:27 +00:00
}
2019-09-12 14:34:27 +00:00
ILinkSession::Packet_t
2019-08-23 11:32:52 +00:00
InboundMessage::ACKS() const
2019-08-22 20:53:27 +00:00
{
auto acks = CreatePacket(Command::eACKS, 9);
htobe64buf(acks.data() + CommandOverhead + PacketOverhead, m_MsgID);
acks[PacketOverhead + 10] = AcksBitmask();
2019-08-22 20:53:27 +00:00
return acks;
}
byte_t
InboundMessage::AcksBitmask() const
{
return byte_t{(byte_t)m_Acks.to_ulong()};
}
2019-08-23 11:32:52 +00:00
bool
2019-08-22 20:53:27 +00:00
InboundMessage::IsCompleted() const
{
const auto sz = m_Data.size();
for (size_t idx = 0; idx < sz; idx += FragmentSize)
2019-08-22 20:53:27 +00:00
{
if (not m_Acks.test(idx / FragmentSize))
2019-08-22 20:53:27 +00:00
return false;
}
return true;
}
bool
InboundMessage::ShouldSendACKS(llarp_time_t now) const
{
2020-02-24 19:40:45 +00:00
return now > m_LastACKSent + ACKResendInterval;
2019-08-22 20:53:27 +00:00
}
2019-08-27 12:13:55 +00:00
bool
InboundMessage::IsTimedOut(const llarp_time_t now) const
{
2020-02-24 19:40:45 +00:00
return now > m_LastActiveAt && now - m_LastActiveAt > DeliveryTimeout;
2019-08-27 12:13:55 +00:00
}
2019-08-22 20:53:27 +00:00
void
InboundMessage::SendACKS(std::function<void(ILinkSession::Packet_t)> sendpkt, llarp_time_t now)
2019-08-22 20:53:27 +00:00
{
sendpkt(ACKS());
2019-08-22 20:53:27 +00:00
m_LastACKSent = now;
}
2019-08-23 11:32:52 +00:00
bool
2019-08-22 20:53:27 +00:00
InboundMessage::Verify() const
{
ShortHash gotten;
const llarp_buffer_t buf(m_Data);
2019-08-22 20:53:27 +00:00
CryptoManager::instance()->shorthash(gotten, buf);
2019-09-12 14:34:27 +00:00
return gotten == m_Digset;
2019-08-22 20:53:27 +00:00
}
2019-08-23 11:32:52 +00:00
} // namespace iwp
2019-11-30 02:08:13 +00:00
} // namespace llarp