mirror of
https://github.com/oxen-io/lokinet.git
synced 2024-11-07 15:20:31 +00:00
6f055eca4f
to rebase to rebase
367 lines
8.1 KiB
C++
367 lines
8.1 KiB
C++
#include "llarp/iwp/frame_state.hpp"
|
|
#include "llarp/iwp/inbound_message.hpp"
|
|
#include "llarp/iwp/session.hpp"
|
|
|
|
#include "buffer.hpp"
|
|
#include "llarp/crypto.hpp"
|
|
#include "llarp/logger.hpp"
|
|
#include "router.hpp"
|
|
|
|
bool
|
|
frame_state::process_inbound_queue()
|
|
{
|
|
std::priority_queue< InboundMessage *, std::vector< InboundMessage * >,
|
|
InboundMessage::OrderCompare >
|
|
q;
|
|
recvqueue.Process(q);
|
|
bool increment = false;
|
|
while(q.size())
|
|
{
|
|
// TODO: is this right?
|
|
auto &front = q.top();
|
|
// the items are already sorted anyways so this doesn't really do much
|
|
nextMsgID = std::max(nextMsgID, front->msgid);
|
|
if(!router->HandleRecvLinkMessage(parent, front->Buffer()))
|
|
{
|
|
llarp::Warn("failed to process inbound message ", front->msgid);
|
|
}
|
|
delete front;
|
|
q.pop();
|
|
increment = true;
|
|
}
|
|
if(increment)
|
|
++nextMsgID;
|
|
// TODO: this isn't right
|
|
return true;
|
|
}
|
|
|
|
bool
|
|
frame_state::flags_agree(byte_t flags) const
|
|
{
|
|
return ((rxflags & flags) & (txflags & flags)) == flags;
|
|
}
|
|
|
|
void
|
|
frame_state::clear()
|
|
{
|
|
auto _rx = rx;
|
|
auto _tx = tx;
|
|
for(auto &item : _rx)
|
|
delete item.second;
|
|
for(auto &item : _tx)
|
|
delete item.second;
|
|
rx.clear();
|
|
tx.clear();
|
|
}
|
|
|
|
bool
|
|
frame_state::got_xmit(frame_header hdr, size_t sz)
|
|
{
|
|
if(hdr.size() > sz)
|
|
{
|
|
// overflow
|
|
llarp::Warn("invalid XMIT frame size ", hdr.size(), " > ", sz);
|
|
return false;
|
|
}
|
|
sz = hdr.size();
|
|
|
|
// extract xmit data
|
|
xmit x(hdr.data());
|
|
|
|
const auto bufsz = sizeof(x.buffer);
|
|
|
|
if(sz - bufsz < x.lastfrag())
|
|
{
|
|
// bad size of last fragment
|
|
llarp::Warn("XMIT frag size missmatch ", sz - bufsz, " < ", x.lastfrag());
|
|
return false;
|
|
}
|
|
|
|
// check LSB set on flags
|
|
if(x.flags() & 0x01)
|
|
{
|
|
auto id = x.msgid();
|
|
auto itr = rx.find(id);
|
|
if(itr == rx.end())
|
|
{
|
|
auto msg = new transit_message(x);
|
|
rx[id] = msg;
|
|
llarp::Debug("got message XMIT with ", (int)x.numfrags(), " fragments");
|
|
// inserted, put last fragment
|
|
msg->put_lastfrag(hdr.data() + sizeof(x.buffer), x.lastfrag());
|
|
push_ackfor(id, 0);
|
|
if(x.numfrags() == 0)
|
|
{
|
|
return inbound_frame_complete(id);
|
|
}
|
|
return true;
|
|
}
|
|
else
|
|
llarp::Warn("duplicate XMIT msgid=", x.msgid());
|
|
}
|
|
else
|
|
llarp::Warn("LSB not set on flags");
|
|
return false;
|
|
}
|
|
|
|
bool
|
|
frame_state::got_frag(frame_header hdr, size_t sz)
|
|
{
|
|
if(hdr.size() > sz)
|
|
{
|
|
// overflow
|
|
llarp::Warn("invalid FRAG frame size ", hdr.size(), " > ", sz);
|
|
return false;
|
|
}
|
|
sz = hdr.size();
|
|
|
|
if(sz <= 9)
|
|
{
|
|
// underflow
|
|
llarp::Warn("invalid FRAG frame size ", sz, " <= 9");
|
|
return false;
|
|
}
|
|
|
|
uint64_t msgid;
|
|
byte_t fragno;
|
|
// assumes big endian
|
|
// TODO: implement little endian
|
|
memcpy(&msgid, hdr.data(), 8);
|
|
memcpy(&fragno, hdr.data() + 8, 1);
|
|
|
|
auto itr = rx.find(msgid);
|
|
if(itr == rx.end())
|
|
{
|
|
llarp::Warn("no such RX fragment, msgid=", msgid);
|
|
return true;
|
|
}
|
|
auto fragsize = itr->second->msginfo.fragsize();
|
|
if(fragsize != sz - 9)
|
|
{
|
|
llarp::Warn("RX fragment size missmatch ", fragsize, " != ", sz - 9);
|
|
return false;
|
|
}
|
|
llarp::Debug("RX got fragment ", (int)fragno, " msgid=", msgid);
|
|
if(!itr->second->put_frag(fragno, hdr.data() + 9))
|
|
{
|
|
llarp::Warn("inbound message does not have fragment msgid=", msgid,
|
|
" fragno=", (int)fragno);
|
|
return false;
|
|
}
|
|
auto mask = itr->second->get_bitmask();
|
|
if(itr->second->completed())
|
|
{
|
|
push_ackfor(msgid, mask);
|
|
return inbound_frame_complete(msgid);
|
|
}
|
|
else if(itr->second->should_send_ack(llarp_time_now_ms()))
|
|
{
|
|
push_ackfor(msgid, mask);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void
|
|
frame_state::push_ackfor(uint64_t id, uint32_t bitmask)
|
|
{
|
|
llarp::Debug("ACK for msgid=", id, " mask=", bitmask);
|
|
sendqueue.push(new sendbuf_t(12 + 6));
|
|
auto body_ptr = init_sendbuf(sendqueue.back(), eACKS, 12, txflags);
|
|
// TODO: this assumes big endian
|
|
memcpy(body_ptr, &id, 8);
|
|
memcpy(body_ptr + 8, &bitmask, 4);
|
|
}
|
|
|
|
bool
|
|
frame_state::inbound_frame_complete(uint64_t id)
|
|
{
|
|
bool success = false;
|
|
std::vector< byte_t > msg;
|
|
auto rxmsg = rx[id];
|
|
if(rxmsg->reassemble(msg))
|
|
{
|
|
llarp::ShortHash digest;
|
|
auto buf = llarp::Buffer< decltype(msg) >(msg);
|
|
router->crypto.shorthash(digest, buf);
|
|
if(memcmp(digest, rxmsg->msginfo.hash(), 32))
|
|
{
|
|
llarp::Warn("message hash missmatch ", llarp::AlignedBuffer< 32 >(digest),
|
|
" != ", llarp::AlignedBuffer< 32 >(rxmsg->msginfo.hash()));
|
|
return false;
|
|
}
|
|
if(id == nextMsgID)
|
|
{
|
|
llarp_link_session *impl = parent;
|
|
|
|
if(id == 0)
|
|
{
|
|
success = router->HandleRecvLinkMessage(parent, buf);
|
|
if(impl->CheckRCValid())
|
|
{
|
|
if(!impl->IsEstablished())
|
|
{
|
|
impl->send_LIM();
|
|
impl->session_established();
|
|
}
|
|
++nextMsgID;
|
|
}
|
|
else
|
|
{
|
|
llarp::PubKey k = impl->remote_router.pubkey;
|
|
llarp::Warn("spoofed LIM from ", k);
|
|
impl->parent->close();
|
|
success = false;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
recvqueue.Put(new InboundMessage(id, msg));
|
|
success = process_inbound_queue();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
llarp::Warn("out of order message expected ", nextMsgID, " but got ", id);
|
|
recvqueue.Put(new InboundMessage(id, msg));
|
|
success = true;
|
|
}
|
|
}
|
|
delete rxmsg;
|
|
rx.erase(id);
|
|
|
|
if(!success)
|
|
llarp::Warn("Failed to process inbound message ", id);
|
|
|
|
return success;
|
|
}
|
|
|
|
bool
|
|
frame_state::got_acks(frame_header hdr, size_t sz)
|
|
{
|
|
if(hdr.size() > sz)
|
|
{
|
|
llarp::Error("invalid ACKS frame size ", hdr.size(), " > ", sz);
|
|
return false;
|
|
}
|
|
sz = hdr.size();
|
|
if(sz < 12)
|
|
{
|
|
llarp::Error("invalid ACKS frame size ", sz, " < 12");
|
|
return false;
|
|
}
|
|
|
|
auto ptr = hdr.data();
|
|
uint64_t msgid;
|
|
uint32_t bitmask;
|
|
memcpy(&msgid, ptr, 8);
|
|
memcpy(&bitmask, ptr + 8, 4);
|
|
|
|
auto itr = tx.find(msgid);
|
|
if(itr == tx.end())
|
|
{
|
|
llarp::Debug("ACK for missing TX frame msgid=", msgid);
|
|
return true;
|
|
}
|
|
|
|
transit_message *msg = itr->second;
|
|
|
|
msg->ack(bitmask);
|
|
|
|
if(msg->completed())
|
|
{
|
|
llarp::Debug("message transmitted msgid=", msgid);
|
|
tx.erase(msgid);
|
|
delete msg;
|
|
}
|
|
else
|
|
{
|
|
llarp::Debug("message ", msgid, " retransmit fragments");
|
|
msg->retransmit_frags(sendqueue, txflags);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool
|
|
frame_state::process(byte_t *buf, size_t sz)
|
|
{
|
|
frame_header hdr(buf);
|
|
if(hdr.flags() & eSessionInvalidated)
|
|
{
|
|
rxflags |= eSessionInvalidated;
|
|
}
|
|
switch(hdr.msgtype())
|
|
{
|
|
case eALIV:
|
|
llarp::Debug("iwp_link::frame_state::process Got alive");
|
|
if(rxflags & eSessionInvalidated)
|
|
{
|
|
txflags |= eSessionInvalidated;
|
|
}
|
|
return true;
|
|
case eXMIT:
|
|
llarp::Debug("iwp_link::frame_state::process Got xmit");
|
|
return got_xmit(hdr, sz - 6);
|
|
case eACKS:
|
|
llarp::Debug("iwp_link::frame_state::process Got ack");
|
|
return got_acks(hdr, sz - 6);
|
|
case msgtype::eFRAG:
|
|
llarp::Debug("iwp_link::frame_state::process Got frag");
|
|
return got_frag(hdr, sz - 6);
|
|
default:
|
|
llarp::Warn(
|
|
"iwp_link::frame_state::process - unknown header message type: ",
|
|
(int)hdr.msgtype());
|
|
return false;
|
|
}
|
|
}
|
|
|
|
bool
|
|
frame_state::next_frame(llarp_buffer_t *buf)
|
|
{
|
|
auto left = sendqueue.size();
|
|
llarp::Debug("next frame, ", left, " frames left in send queue");
|
|
if(left)
|
|
{
|
|
sendbuf_t *send = sendqueue.front();
|
|
buf->base = send->data();
|
|
buf->cur = send->data();
|
|
buf->sz = send->size();
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
void
|
|
frame_state::pop_next_frame()
|
|
{
|
|
sendbuf_t *buf = sendqueue.front();
|
|
sendqueue.pop();
|
|
delete buf;
|
|
}
|
|
|
|
void
|
|
frame_state::queue_tx(uint64_t id, transit_message *msg)
|
|
{
|
|
tx.insert(std::make_pair(id, msg));
|
|
msg->generate_xmit(sendqueue, txflags);
|
|
}
|
|
|
|
void
|
|
frame_state::retransmit(llarp_time_t now)
|
|
{
|
|
for(auto &item : tx)
|
|
{
|
|
if(item.second->should_resend_xmit(now))
|
|
{
|
|
item.second->generate_xmit(sendqueue, txflags);
|
|
}
|
|
item.second->retransmit_frags(sendqueue, txflags);
|
|
}
|
|
}
|
|
|
|
void
|
|
frame_state::alive()
|
|
{
|
|
lastEvent = llarp_time_now_ms();
|
|
} |