handle frag messages in iwp

pull/1/head
Jeff Becker 6 years ago
parent 6ce74fe130
commit 55be8ee6a3
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -214,7 +214,8 @@ namespace iwp
{
session *parent = nullptr;
xmit msginfo;
std::bitset< 16 > status;
std::bitset< 32 > status;
uint16_t times_acked = 0;
std::map< uint8_t, fragment_t > frags;
fragment_t lastfrag;
@ -228,6 +229,20 @@ namespace iwp
frags.clear();
}
// calculate acked bitmask
uint32_t
get_bitmask() const
{
uint32_t bitmask = 0;
uint8_t idx = 0;
while(idx < 32)
{
bitmask |= (status.test(idx) ? (1 << idx) : 0);
++idx;
}
return bitmask;
}
// inbound
transit_message(const xmit &x) : msginfo(x)
{
@ -238,11 +253,12 @@ namespace iwp
{
}
/// ack packets based off a bitmask
void
ack(uint32_t bitmask)
{
uint8_t idx = 0;
while(idx < 16)
while(idx < 32)
{
if(bitmask & (1 << idx))
{
@ -250,6 +266,15 @@ namespace iwp
}
++idx;
}
++times_acked;
}
bool
should_send_ack() const
{
if(msginfo.numfrags() == 0)
return true;
return times_acked % (1 + (msginfo.numfrags() / 3)) == 0;
}
bool
@ -334,11 +359,21 @@ namespace iwp
}
void
put_lastfrag(uint8_t *buf, size_t sz)
put_lastfrag(byte_t *buf, size_t sz)
{
lastfrag.resize(sz);
memcpy(lastfrag.data(), buf, sz);
}
bool
put_frag(byte_t fragno, byte_t *buf)
{
auto itr = frags.find(fragno);
if(itr == frags.end())
return false;
memcpy(itr->second.data(), buf, msginfo.fragsize());
return true;
}
};
struct frame_state
@ -371,6 +406,7 @@ namespace iwp
void
push_ackfor(uint64_t id, uint32_t bitmask)
{
llarp::Debug(__FILE__, "ACK for msgid=", id, " mask=", bitmask);
sendqueue.emplace();
auto &buf = sendqueue.back();
// TODO: set flags to nonzero as needed
@ -439,7 +475,60 @@ namespace iwp
bool
got_frag(frame_header &hdr, size_t sz)
{
return false;
if(hdr.size() > sz)
{
// overflow
llarp::Warn(__FILE__, "invalid FRAG frame size ", hdr.size(), " > ",
sz);
return false;
}
sz = hdr.size();
if(sz <= 9)
{
// underflow
llarp::Warn(__FILE__, "invalid FRAG frame size ", sz, " <= 9");
return false;
}
uint64_t msgid;
byte_t fragno;
// assumes big endian
// TODO: implement little endian
memcpy(&msgid, hdr.data(), 8);
memcpy(&fragno, hdr.data() + 8, 1);
auto itr = rx.find(msgid);
if(itr == rx.end())
{
llarp::Warn(__FILE__, "no such RX fragment, msgid=", msgid);
return false;
}
auto fragsize = itr->second.msginfo.fragsize();
if(fragsize != sz - 9)
{
llarp::Warn(__FILE__, "RX fragment size missmatch ", fragsize,
" != ", sz - 9);
return false;
}
if(!itr->second.put_frag(fragno, hdr.data() + 9))
{
llarp::Warn(__FILE__,
"inbound message does not have fragment msgid=", msgid,
" fragno=", (int)fragno);
return false;
}
auto mask = itr->second.get_bitmask();
if(itr->second.completed())
{
push_ackfor(msgid, mask);
return inbound_frame_complete(msgid);
}
else if(itr->second.should_send_ack())
{
push_ackfor(msgid, mask);
}
return true;
}
bool
@ -1392,6 +1481,7 @@ namespace iwp
if(itr->second->completed())
{
llarp::Debug(__FILE__, "message transmitted msgid=", msgid);
delete itr->second;
tx.erase(itr);
session *impl = static_cast< session * >(parent->impl);

Loading…
Cancel
Save