mirror of
https://github.com/oxen-io/lokinet.git
synced 2024-11-03 23:15:52 +00:00
1a1f93c171
* use std::unique_ptr where bare pointers aren't absolutely required
199 lines
4.4 KiB
C++
199 lines
4.4 KiB
C++
#include "llarp/iwp/transit_message.hpp"
|
|
#include "llarp/endian.h"
|
|
#include "llarp/iwp/frame_state.hpp"
|
|
#include "llarp/iwp/sendbuf.hpp"
|
|
#include "llarp/time.h"
|
|
|
|
void
|
|
transit_message::clear()
|
|
{
|
|
frags.clear();
|
|
lastfrag.clear();
|
|
}
|
|
|
|
// calculate acked bitmask
|
|
uint32_t
|
|
transit_message::get_bitmask() const
|
|
{
|
|
uint32_t bitmask = 0;
|
|
uint8_t idx = 0;
|
|
while(idx < 32)
|
|
{
|
|
bitmask |= (status.test(idx) ? (1 << idx) : 0);
|
|
++idx;
|
|
}
|
|
return bitmask;
|
|
}
|
|
|
|
// outbound
|
|
transit_message::transit_message(llarp_buffer_t buf, const byte_t *hash,
|
|
uint64_t id, uint16_t mtu)
|
|
{
|
|
started = llarp_time_now_ms();
|
|
put_message(buf, hash, id, mtu);
|
|
}
|
|
|
|
// inbound
|
|
transit_message::transit_message(const xmit &x) : msginfo(x)
|
|
{
|
|
started = llarp_time_now_ms();
|
|
byte_t fragidx = 0;
|
|
uint16_t fragsize = x.fragsize();
|
|
while(fragidx < x.numfrags())
|
|
{
|
|
frags[fragidx].resize(fragsize);
|
|
++fragidx;
|
|
}
|
|
status.reset();
|
|
}
|
|
|
|
/// ack packets based off a bitmask
|
|
void
|
|
transit_message::ack(uint32_t bitmask)
|
|
{
|
|
uint8_t idx = 0;
|
|
while(idx < 32)
|
|
{
|
|
if(bitmask & (1 << idx))
|
|
{
|
|
status.set(idx);
|
|
}
|
|
++idx;
|
|
}
|
|
lastAck = llarp_time_now_ms();
|
|
}
|
|
|
|
bool
|
|
transit_message::should_send_ack(llarp_time_t now) const
|
|
{
|
|
if(now < started)
|
|
return false;
|
|
if(msginfo.numfrags() == 0)
|
|
return true;
|
|
if(status.count() == 0)
|
|
return true;
|
|
if(now < lastRetransmit)
|
|
return false;
|
|
return now - lastRetransmit > 200;
|
|
}
|
|
|
|
bool
|
|
transit_message::should_resend_xmit(llarp_time_t now) const
|
|
{
|
|
if(now < started)
|
|
return false;
|
|
return lastAck == 0 && now - started > 1000;
|
|
}
|
|
|
|
bool
|
|
transit_message::should_resend_frags(llarp_time_t now) const
|
|
{
|
|
if(now < started || now < lastAck)
|
|
return false;
|
|
return lastAck > 0 && now - lastAck > 500 && !completed();
|
|
}
|
|
|
|
bool
|
|
transit_message::completed() const
|
|
{
|
|
for(byte_t idx = 0; idx < msginfo.numfrags(); ++idx)
|
|
{
|
|
if(!status.test(idx))
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// template < typename T >
|
|
void
|
|
transit_message::generate_xmit(sendqueue_t &queue, byte_t flags)
|
|
{
|
|
uint16_t sz = lastfrag.size() + sizeof(msginfo.buffer);
|
|
auto pkt = std::unique_ptr< sendbuf_t >(new sendbuf_t(sz + 6));
|
|
auto body_ptr = init_sendbuf(pkt.get(), eXMIT, sz, flags);
|
|
memcpy(body_ptr, msginfo.buffer, sizeof(msginfo.buffer));
|
|
body_ptr += sizeof(msginfo.buffer);
|
|
memcpy(body_ptr, lastfrag.data(), lastfrag.size());
|
|
queue.Put(pkt);
|
|
}
|
|
|
|
// template < typename T >
|
|
void
|
|
transit_message::retransmit_frags(sendqueue_t &queue, byte_t flags)
|
|
{
|
|
auto msgid = msginfo.msgid();
|
|
auto fragsize = msginfo.fragsize();
|
|
for(auto &frag : frags)
|
|
{
|
|
if(status.test(frag.first))
|
|
continue;
|
|
uint16_t sz = 9 + fragsize;
|
|
auto pkt = std::unique_ptr< sendbuf_t >(new sendbuf_t(sz + 6));
|
|
auto body_ptr = init_sendbuf(pkt.get(), eFRAG, sz, flags);
|
|
htobe64buf(body_ptr, msgid);
|
|
body_ptr[8] = frag.first;
|
|
memcpy(body_ptr + 9, frag.second.data(), fragsize);
|
|
queue.Put(pkt);
|
|
}
|
|
lastRetransmit = llarp_time_now_ms();
|
|
}
|
|
|
|
bool
|
|
transit_message::reassemble(std::vector< byte_t > &buffer)
|
|
{
|
|
auto total = msginfo.totalsize();
|
|
buffer.resize(total);
|
|
auto fragsz = msginfo.fragsize();
|
|
auto ptr = &buffer[0];
|
|
for(byte_t idx = 0; idx < msginfo.numfrags(); ++idx)
|
|
{
|
|
if(!status.test(idx))
|
|
return false;
|
|
memcpy(ptr, frags[idx].data(), fragsz);
|
|
ptr += fragsz;
|
|
}
|
|
memcpy(ptr, lastfrag.data(), lastfrag.size());
|
|
return true;
|
|
}
|
|
|
|
void
|
|
transit_message::put_message(llarp_buffer_t buf, const byte_t *hash,
|
|
uint64_t id, uint16_t mtu)
|
|
{
|
|
status.reset();
|
|
uint8_t fragid = 0;
|
|
uint16_t fragsize = mtu;
|
|
size_t left = buf.sz;
|
|
while(left > fragsize)
|
|
{
|
|
auto &frag = frags[fragid];
|
|
frag.resize(fragsize);
|
|
memcpy(frag.data(), buf.cur, fragsize);
|
|
buf.cur += fragsize;
|
|
fragid++;
|
|
left -= fragsize;
|
|
}
|
|
uint16_t lastfrag = buf.sz - (buf.cur - buf.base);
|
|
// set info for xmit
|
|
msginfo.set_info(hash, id, fragsize, lastfrag, fragid);
|
|
put_lastfrag(buf.cur, lastfrag);
|
|
}
|
|
|
|
void
|
|
transit_message::put_lastfrag(byte_t *buf, size_t sz)
|
|
{
|
|
lastfrag.resize(sz);
|
|
memcpy(lastfrag.data(), buf, sz);
|
|
}
|
|
|
|
bool
|
|
transit_message::put_frag(byte_t fragno, byte_t *buf)
|
|
{
|
|
auto itr = frags.find(fragno);
|
|
if(itr == frags.end())
|
|
return false;
|
|
memcpy(itr->second.data(), buf, msginfo.fragsize());
|
|
status.set(fragno);
|
|
return true;
|
|
}
|