fix windows port and make it compile

This commit is contained in:
jeff 2019-10-02 09:06:14 -04:00
parent c3451fc77a
commit 3c1d5518d8
15 changed files with 213 additions and 84 deletions

View File

@ -36,7 +36,7 @@ void
llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
std::shared_ptr< llarp::Logic > logic) std::shared_ptr< llarp::Logic > logic)
{ {
ev->give_logic(logic); ev->set_logic(logic);
while(ev->running()) while(ev->running())
{ {
ev->update_time(); ev->update_time();

View File

@ -69,6 +69,10 @@ llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr &ev);
void void
llarp_ev_loop_stop(const llarp_ev_loop_ptr &ev); llarp_ev_loop_stop(const llarp_ev_loop_ptr &ev);
/// list of packets we recv'd
/// forward declared
struct llarp_pkt_list;
/// UDP handling configuration /// UDP handling configuration
struct llarp_udp_io struct llarp_udp_io
{ {
@ -88,6 +92,11 @@ struct llarp_udp_io
size_t); size_t);
}; };
/// get all packets recvieved last tick
/// return true if we got packets return false if we didn't
bool
llarp_ev_udp_recvmany(struct llarp_udp_io *udp, struct llarp_pkt_list *pkts);
/// add UDP handler /// add UDP handler
int int
llarp_ev_add_udp(struct llarp_ev_loop *ev, struct llarp_udp_io *udp, llarp_ev_add_udp(struct llarp_ev_loop *ev, struct llarp_udp_io *udp,

View File

@ -1,6 +1,7 @@
#ifndef LLARP_EV_HPP #ifndef LLARP_EV_HPP
#define LLARP_EV_HPP #define LLARP_EV_HPP
#include <net/net_addr.hpp>
#include <ev/ev.h> #include <ev/ev.h>
#include <util/buffer.hpp> #include <util/buffer.hpp>
#include <util/codel.hpp> #include <util/codel.hpp>
@ -784,7 +785,7 @@ struct llarp_ev_loop
} }
/// give this event loop a logic thread for calling /// give this event loop a logic thread for calling
virtual void give_logic(std::shared_ptr< llarp::Logic >) = 0; virtual void set_logic(std::shared_ptr< llarp::Logic >) = 0;
/// register event listener /// register event listener
virtual bool virtual bool
@ -801,7 +802,7 @@ struct llarp_ev_loop
std::list< std::unique_ptr< llarp::ev_io > > handlers; std::list< std::unique_ptr< llarp::ev_io > > handlers;
void virtual void
tick_listeners() tick_listeners()
{ {
auto itr = handlers.begin(); auto itr = handlers.begin();
@ -818,4 +819,63 @@ struct llarp_ev_loop
} }
}; };
struct PacketBuffer
{
PacketBuffer(PacketBuffer&& other)
{
_ptr = other._ptr;
_sz = other._sz;
other._ptr = nullptr;
other._sz = 0;
}
PacketBuffer() : PacketBuffer(nullptr, 0){};
explicit PacketBuffer(size_t sz) : PacketBuffer(new char[sz], sz)
{
}
PacketBuffer(char* buf, size_t sz) : _ptr{buf}, _sz{sz}
{
}
~PacketBuffer()
{
if(_ptr)
delete[] _ptr;
}
byte_t*
data()
{
return (byte_t*)_ptr;
}
size_t
size()
{
return _sz;
}
byte_t& operator[](size_t sz)
{
return data()[sz];
}
void
reserve(size_t sz)
{
if(_ptr)
delete[] _ptr;
_ptr = new char[sz];
}
private:
char* _ptr = nullptr;
size_t _sz = 0;
};
struct PacketEvent
{
llarp::Addr remote = {};
PacketBuffer pkt = {};
};
struct llarp_pkt_list : public std::vector< PacketEvent >
{
};
#endif #endif

View File

@ -371,14 +371,14 @@ namespace libuv
uv_check_t m_Ticker; uv_check_t m_Ticker;
llarp_udp_io* const m_UDP; llarp_udp_io* const m_UDP;
llarp::Addr m_Addr; llarp::Addr m_Addr;
bool gotpkts; llarp_pkt_list m_LastPackets;
std::array< char, 1500 > m_Buffer;
udp_glue(uv_loop_t* loop, llarp_udp_io* udp, const sockaddr* src) udp_glue(uv_loop_t* loop, llarp_udp_io* udp, const sockaddr* src)
: m_UDP(udp), m_Addr(*src) : m_UDP(udp), m_Addr(*src)
{ {
m_Handle.data = this; m_Handle.data = this;
m_Ticker.data = this; m_Ticker.data = this;
gotpkts = false;
uv_udp_init(loop, &m_Handle); uv_udp_init(loop, &m_Handle);
uv_check_init(loop, &m_Ticker); uv_check_init(loop, &m_Ticker);
} }
@ -386,27 +386,44 @@ namespace libuv
static void static void
Alloc(uv_handle_t*, size_t suggested_size, uv_buf_t* buf) Alloc(uv_handle_t*, size_t suggested_size, uv_buf_t* buf)
{ {
buf->base = new char[suggested_size]; const size_t sz = std::min(suggested_size, size_t{1500});
buf->len = suggested_size; buf->base = new char[sz];
buf->len = sz;
} }
static void static void
OnRecv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, OnRecv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf,
const struct sockaddr* addr, unsigned) const struct sockaddr* addr, unsigned)
{ {
udp_glue* glue = static_cast< udp_glue* >(handle->data);
if(addr) if(addr)
static_cast< udp_glue* >(handle->data)->RecvFrom(nread, buf, addr); glue->RecvFrom(nread, buf, addr);
delete[] buf->base; if(glue->m_UDP == nullptr || glue->m_UDP->recvfrom != nullptr)
delete[] buf->base;
}
bool
RecvMany(llarp_pkt_list* pkts)
{
*pkts = std::move(m_LastPackets);
m_LastPackets = llarp_pkt_list();
return pkts->size() > 0;
} }
void void
RecvFrom(ssize_t sz, const uv_buf_t* buf, const struct sockaddr* fromaddr) RecvFrom(ssize_t sz, const uv_buf_t* buf, const struct sockaddr* fromaddr)
{ {
if(sz >= 0 && m_UDP && m_UDP->recvfrom) if(sz > 0 && m_UDP)
{ {
const size_t pktsz = sz; const size_t pktsz = sz;
const llarp_buffer_t pkt{(const byte_t*)buf->base, pktsz}; const llarp_buffer_t pkt{(const byte_t*)buf->base, pktsz};
m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt}); if(m_UDP->recvfrom)
m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt});
else
{
m_LastPackets.emplace_back(
PacketEvent{llarp::Addr(*fromaddr), PacketBuffer(buf->base, sz)});
}
} }
} }
@ -855,3 +872,9 @@ namespace libuv
} }
} // namespace libuv } // namespace libuv
bool
llarp_ev_udp_recvmany(struct llarp_udp_io* u, struct llarp_pkt_list* pkts)
{
return static_cast< libuv::udp_glue* >(u->impl)->RecvMany(pkts);
}

View File

@ -90,7 +90,7 @@ namespace libuv
} }
void void
give_logic(std::shared_ptr< llarp::Logic > l) override set_logic(std::shared_ptr< llarp::Logic > l) override
{ {
m_Logic = l; m_Logic = l;
} }

View File

@ -366,10 +366,25 @@ namespace llarp
if(static_cast< size_t >(ret) > sz) if(static_cast< size_t >(ret) > sz)
return -1; return -1;
b.sz = ret; b.sz = ret;
udp->recvfrom(udp, addr, ManagedBuffer{b}); if(udp->recvfrom)
udp->recvfrom(udp, addr, ManagedBuffer{b});
else
{
m_RecvPackets.emplace_back(
PacketEvent{llarp::Addr(*addr), PacketBuffer(ret)});
std::copy_n(buf, ret, m_RecvPackets.back().pkt.data());
}
return 0; return 0;
} }
bool
udp_listener::RecvMany(llarp_pkt_list* pkts)
{
*pkts = std::move(m_RecvPackets);
m_RecvPackets = llarp_pkt_list();
return pkts->size() > 0;
}
static int static int
UDPSendTo(llarp_udp_io* udp, const sockaddr* to, const byte_t* ptr, size_t sz) UDPSendTo(llarp_udp_io* udp, const sockaddr* to, const byte_t* ptr, size_t sz)
{ {
@ -696,4 +711,18 @@ llarp_win32_loop::stop()
llarp::LogDebug("destroy upoll"); llarp::LogDebug("destroy upoll");
} }
void
llarp_win32_loop::tick_listeners()
{
llarp_ev_loop::tick_listeners();
for(auto& func : m_Tickers)
m_Logic->queue_func([func]() { func(); });
}
bool
llarp_ev_udp_recvmany(struct llarp_udp_io* u, struct llarp_pkt_list* pkts)
{
return static_cast< llarp::udp_listener* >(u->impl)->RecvMany(pkts);
}
#endif #endif

View File

@ -5,6 +5,7 @@
#include <net/net.h> #include <net/net.h>
#include <net/net.hpp> #include <net/net.hpp>
#include <util/buffer.hpp> #include <util/buffer.hpp>
#include <util/thread/logic.hpp>
#include <windows.h> #include <windows.h>
#include <process.h> #include <process.h>
@ -36,6 +37,7 @@ namespace llarp
struct udp_listener : public ev_io struct udp_listener : public ev_io
{ {
llarp_udp_io* udp; llarp_udp_io* udp;
llarp_pkt_list m_RecvPackets;
udp_listener(int fd, llarp_udp_io* u) : ev_io(fd), udp(u){}; udp_listener(int fd, llarp_udp_io* u) : ev_io(fd), udp(u){};
@ -43,6 +45,9 @@ namespace llarp
{ {
} }
bool
RecvMany(llarp_pkt_list*);
bool bool
tick(); tick();
@ -99,6 +104,8 @@ struct win32_tun_io
struct llarp_win32_loop : public llarp_ev_loop struct llarp_win32_loop : public llarp_ev_loop
{ {
upoll_t* upollfd; upoll_t* upollfd;
std::shared_ptr< llarp::Logic > m_Logic;
std::vector< std::function< void(void) > > m_Tickers;
llarp_win32_loop() : upollfd(nullptr) llarp_win32_loop() : upollfd(nullptr)
{ {
@ -148,6 +155,22 @@ struct llarp_win32_loop : public llarp_ev_loop
void void
stop(); stop();
bool
add_ticker(std::function< void(void) > func) override
{
m_Tickers.emplace_back(func);
return true;
}
void
set_logic(std::shared_ptr< llarp::Logic > l) override
{
m_Logic = l;
}
void
tick_listeners() override;
}; };
#endif #endif

View File

@ -81,7 +81,7 @@ namespace llarp
auto itr = m_AuthedAddrs.find(from); auto itr = m_AuthedAddrs.find(from);
if(itr == m_AuthedAddrs.end()) if(itr == m_AuthedAddrs.end())
{ {
// ACQUIRE_LOCK(Lock_t lock , m_PendingMutex); ACQUIRE_LOCK(Lock_t lock, m_PendingMutex);
if(m_Pending.count(from) == 0) if(m_Pending.count(from) == 0)
{ {
if(not permitInbound) if(not permitInbound)
@ -92,7 +92,7 @@ namespace llarp
} }
else else
{ {
Lock lock(&m_AuthedLinksMutex); ACQUIRE_LOCK(Lock_t lock, m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(itr->second); auto range = m_AuthedLinks.equal_range(itr->second);
session = range.first->second; session = range.first->second;
} }

View File

@ -26,7 +26,7 @@ namespace llarp
htobe16buf(xmit.data() + CommandOverhead + PacketOverhead, m_Data.size()); htobe16buf(xmit.data() + CommandOverhead + PacketOverhead, m_Data.size());
htobe64buf(xmit.data() + 2 + CommandOverhead + PacketOverhead, m_MsgID); htobe64buf(xmit.data() + 2 + CommandOverhead + PacketOverhead, m_MsgID);
std::copy_n(m_Digest.begin(), m_Digest.size(), std::copy_n(m_Digest.begin(), m_Digest.size(),
xmit.begin() + 10 + CommandOverhead + PacketOverhead); xmit.data() + 10 + CommandOverhead + PacketOverhead);
return xmit; return xmit;
} }
@ -70,7 +70,7 @@ namespace llarp
htobe16buf(frag.data() + 2 + PacketOverhead, idx); htobe16buf(frag.data() + 2 + PacketOverhead, idx);
htobe64buf(frag.data() + 4 + PacketOverhead, m_MsgID); htobe64buf(frag.data() + 4 + PacketOverhead, m_MsgID);
std::copy(m_Data.begin() + idx, m_Data.begin() + idx + fragsz, std::copy(m_Data.begin() + idx, m_Data.begin() + idx + fragsz,
frag.begin() + PacketOverhead + Overhead + 2); frag.data() + PacketOverhead + Overhead + 2);
sendpkt(std::move(frag)); sendpkt(std::move(frag));
} }
idx += FragmentSize; idx += FragmentSize;

View File

@ -363,7 +363,7 @@ namespace llarp
ILinkSession::Packet_t req(Introduction::SIZE + PacketOverhead); ILinkSession::Packet_t req(Introduction::SIZE + PacketOverhead);
const auto pk = m_Parent->GetOurRC().pubkey; const auto pk = m_Parent->GetOurRC().pubkey;
const auto e_pk = m_Parent->RouterEncryptionSecret().toPublic(); const auto e_pk = m_Parent->RouterEncryptionSecret().toPublic();
auto itr = req.begin() + PacketOverhead; auto itr = req.data() + PacketOverhead;
std::copy_n(pk.begin(), pk.size(), itr); std::copy_n(pk.begin(), pk.size(), itr);
itr += pk.size(); itr += pk.size();
std::copy_n(e_pk.begin(), e_pk.size(), itr); std::copy_n(e_pk.begin(), e_pk.size(), itr);
@ -373,9 +373,9 @@ namespace llarp
llarp_buffer_t signbuf(req.data() + PacketOverhead, llarp_buffer_t signbuf(req.data() + PacketOverhead,
Introduction::SIZE - Signature::SIZE); Introduction::SIZE - Signature::SIZE);
m_Parent->Sign(Z, signbuf); m_Parent->Sign(Z, signbuf);
std::copy_n(Z.begin(), Z.size(), std::copy_n(
req.begin() + PacketOverhead Z.begin(), Z.size(),
+ (Introduction::SIZE - Signature::SIZE)); req.data() + PacketOverhead + (Introduction::SIZE - Signature::SIZE));
CryptoManager::instance()->randbytes(req.data() + HMACSIZE, TUNNONCESIZE); CryptoManager::instance()->randbytes(req.data() + HMACSIZE, TUNNONCESIZE);
EncryptAndSend(std::move(req)); EncryptAndSend(std::move(req));
m_State = State::Introduction; m_State = State::Introduction;
@ -404,7 +404,7 @@ namespace llarp
token.size() + PacketOverhead, " from ", m_RemoteAddr); token.size() + PacketOverhead, " from ", m_RemoteAddr);
return; return;
} }
const auto begin = pkt.begin() + PacketOverhead; const auto begin = pkt.data() + PacketOverhead;
if(not std::equal(begin, begin + token.size(), token.begin())) if(not std::equal(begin, begin + token.size(), token.begin()))
{ {
LogError("token missmatch from ", m_RemoteAddr); LogError("token missmatch from ", m_RemoteAddr);
@ -450,12 +450,12 @@ namespace llarp
m_RemoteAddr); m_RemoteAddr);
return; return;
} }
std::vector< byte_t > reply(token.size() + PacketOverhead); Packet_t reply(token.size() + PacketOverhead);
// random nonce // random nonce
CryptoManager::instance()->randbytes(reply.data() + HMACSIZE, CryptoManager::instance()->randbytes(reply.data() + HMACSIZE,
TUNNONCESIZE); TUNNONCESIZE);
// set token // set token
std::copy_n(token.begin(), token.size(), reply.begin() + PacketOverhead); std::copy_n(token.begin(), token.size(), reply.data() + PacketOverhead);
m_LastRX = m_Parent->Now(); m_LastRX = m_Parent->Now();
EncryptAndSend(std::move(reply)); EncryptAndSend(std::move(reply));
LogDebug("sent intro ack to ", m_RemoteAddr); LogDebug("sent intro ack to ", m_RemoteAddr);
@ -471,15 +471,15 @@ namespace llarp
token.size() + PacketOverhead, " from ", m_RemoteAddr); token.size() + PacketOverhead, " from ", m_RemoteAddr);
return; return;
} }
std::vector< byte_t > reply(token.size() + PacketOverhead); Packet_t reply(token.size() + PacketOverhead);
if(not DecryptMessageInPlace(pkt)) if(not DecryptMessageInPlace(pkt))
{ {
LogError("intro ack decrypt failed from ", m_RemoteAddr); LogError("intro ack decrypt failed from ", m_RemoteAddr);
return; return;
} }
m_LastRX = m_Parent->Now(); m_LastRX = m_Parent->Now();
std::copy_n(pkt.begin() + PacketOverhead, token.size(), token.begin()); std::copy_n(pkt.data() + PacketOverhead, token.size(), token.begin());
std::copy_n(token.begin(), token.size(), reply.begin() + PacketOverhead); std::copy_n(token.begin(), token.size(), reply.data() + PacketOverhead);
// random nounce // random nounce
CryptoManager::instance()->randbytes(reply.data() + HMACSIZE, CryptoManager::instance()->randbytes(reply.data() + HMACSIZE,
TUNNONCESIZE); TUNNONCESIZE);
@ -533,13 +533,13 @@ namespace llarp
{ {
if(m_DecryptNext == nullptr) if(m_DecryptNext == nullptr)
m_DecryptNext = std::make_shared< CryptoQueue_t >(); m_DecryptNext = std::make_shared< CryptoQueue_t >();
m_DecryptNext->emplace_back(pkt); m_DecryptNext->emplace_back(std::move(pkt));
} }
void void
Session::DecryptWorker(CryptoQueue_ptr msgs) Session::DecryptWorker(CryptoQueue_ptr msgs)
{ {
CryptoQueue_t recvMsgs; CryptoQueue_ptr recvMsgs = std::make_shared< CryptoQueue_t >();
for(auto& pkt : *msgs) for(auto& pkt : *msgs)
{ {
if(not DecryptMessageInPlace(pkt)) if(not DecryptMessageInPlace(pkt))
@ -553,17 +553,17 @@ namespace llarp
" != ", LLARP_PROTO_VERSION); " != ", LLARP_PROTO_VERSION);
continue; continue;
} }
recvMsgs.emplace_back(std::move(pkt)); recvMsgs->emplace_back(std::move(pkt));
} }
LogDebug("decrypted ", recvMsgs.size(), " packets from ", m_RemoteAddr); LogDebug("decrypted ", recvMsgs->size(), " packets from ", m_RemoteAddr);
m_Parent->logic()->queue_func(std::bind( m_Parent->logic()->queue_func(
&Session::HandlePlaintext, shared_from_this(), std::move(recvMsgs))); std::bind(&Session::HandlePlaintext, shared_from_this(), recvMsgs));
} }
void void
Session::HandlePlaintext(CryptoQueue_t msgs) Session::HandlePlaintext(CryptoQueue_ptr msgs)
{ {
for(auto& result : msgs) for(auto& result : *msgs)
{ {
LogDebug("Command ", int(result[PacketOverhead + 1])); LogDebug("Command ", int(result[PacketOverhead + 1]));
switch(result[PacketOverhead + 1]) switch(result[PacketOverhead + 1])
@ -598,7 +598,7 @@ namespace llarp
} }
void void
Session::HandleMACK(std::vector< byte_t > data) Session::HandleMACK(Packet_t data)
{ {
if(data.size() < 3 + PacketOverhead) if(data.size() < 3 + PacketOverhead)
{ {
@ -634,7 +634,7 @@ namespace llarp
} }
void void
Session::HandleNACK(std::vector< byte_t > data) Session::HandleNACK(Packet_t data)
{ {
if(data.size() < CommandOverhead + sizeof(uint64_t) + PacketOverhead) if(data.size() < CommandOverhead + sizeof(uint64_t) + PacketOverhead)
{ {
@ -653,7 +653,7 @@ namespace llarp
} }
void void
Session::HandleXMIT(std::vector< byte_t > data) Session::HandleXMIT(Packet_t data)
{ {
if(data.size() < CommandOverhead + PacketOverhead + sizeof(uint16_t) if(data.size() < CommandOverhead + PacketOverhead + sizeof(uint16_t)
+ sizeof(uint64_t) + ShortHash::SIZE) + sizeof(uint64_t) + ShortHash::SIZE)
@ -689,7 +689,7 @@ namespace llarp
} }
void void
Session::HandleDATA(std::vector< byte_t > data) Session::HandleDATA(Packet_t data)
{ {
if(data.size() <= CommandOverhead + sizeof(uint16_t) + sizeof(uint64_t) if(data.size() <= CommandOverhead + sizeof(uint16_t) + sizeof(uint64_t)
+ PacketOverhead) + PacketOverhead)
@ -744,7 +744,7 @@ namespace llarp
} }
void void
Session::HandleACKS(std::vector< byte_t > data) Session::HandleACKS(Packet_t data)
{ {
if(data.size() < 11 + PacketOverhead) if(data.size() < 11 + PacketOverhead)
{ {
@ -775,13 +775,13 @@ namespace llarp
} }
} }
void Session::HandleCLOS(std::vector< byte_t >) void Session::HandleCLOS(Packet_t)
{ {
LogInfo("remote closed by ", m_RemoteAddr); LogInfo("remote closed by ", m_RemoteAddr);
Close(); Close();
} }
void Session::HandlePING(std::vector< byte_t >) void Session::HandlePING(Packet_t)
{ {
m_LastRX = m_Parent->Now(); m_LastRX = m_Parent->Now();
} }
@ -791,8 +791,7 @@ namespace llarp
{ {
if(m_State == State::Ready) if(m_State == State::Ready)
{ {
auto pkt = CreatePacket(Command::ePING, 0); EncryptAndSend(CreatePacket(Command::ePING, 0));
EncryptAndSend(std::move(pkt));
return true; return true;
} }
return false; return false;

View File

@ -5,6 +5,7 @@
#include <iwp/linklayer.hpp> #include <iwp/linklayer.hpp>
#include <iwp/message_buffer.hpp> #include <iwp/message_buffer.hpp>
#include <unordered_set> #include <unordered_set>
#include <deque>
namespace llarp namespace llarp
{ {
@ -167,7 +168,7 @@ namespace llarp
/// set of rx messages to send in next round of multiacks /// set of rx messages to send in next round of multiacks
std::unordered_set< uint64_t > m_SendMACKs; std::unordered_set< uint64_t > m_SendMACKs;
using CryptoQueue_t = std::list< Packet_t >; using CryptoQueue_t = std::vector< Packet_t >;
using CryptoQueue_ptr = std::shared_ptr< CryptoQueue_t >; using CryptoQueue_ptr = std::shared_ptr< CryptoQueue_t >;
CryptoQueue_ptr m_EncryptNext; CryptoQueue_ptr m_EncryptNext;
CryptoQueue_ptr m_DecryptNext; CryptoQueue_ptr m_DecryptNext;
@ -179,7 +180,7 @@ namespace llarp
DecryptWorker(CryptoQueue_ptr msgs); DecryptWorker(CryptoQueue_ptr msgs);
void void
HandlePlaintext(CryptoQueue_t msgs); HandlePlaintext(CryptoQueue_ptr msgs);
void void
HandleGotIntro(Packet_t pkt); HandleGotIntro(Packet_t pkt);
@ -218,25 +219,25 @@ namespace llarp
SendOurLIM(ILinkSession::CompletionHandler h = nullptr); SendOurLIM(ILinkSession::CompletionHandler h = nullptr);
void void
HandleXMIT(std::vector< byte_t > msg); HandleXMIT(Packet_t msg);
void void
HandleDATA(std::vector< byte_t > msg); HandleDATA(Packet_t msg);
void void
HandleACKS(std::vector< byte_t > msg); HandleACKS(Packet_t msg);
void void
HandleNACK(std::vector< byte_t > msg); HandleNACK(Packet_t msg);
void void
HandlePING(std::vector< byte_t > msg); HandlePING(Packet_t msg);
void void
HandleCLOS(std::vector< byte_t > msg); HandleCLOS(Packet_t msg);
void void
HandleMACK(std::vector< byte_t > msg); HandleMACK(Packet_t msg);
}; };
} // namespace iwp } // namespace iwp
} // namespace llarp } // namespace llarp

View File

@ -1,5 +1,5 @@
#include <link/server.hpp> #include <link/server.hpp>
#include <ev/ev.hpp>
#include <crypto/crypto.hpp> #include <crypto/crypto.hpp>
#include <util/fs.hpp> #include <util/fs.hpp>
#include <utility> #include <utility>
@ -107,7 +107,7 @@ namespace llarp
{ {
m_Loop = loop; m_Loop = loop;
m_udp.user = this; m_udp.user = this;
m_udp.recvfrom = &ILinkLayer::udp_recv_from; m_udp.recvfrom = nullptr;
m_udp.tick = &ILinkLayer::udp_tick; m_udp.tick = &ILinkLayer::udp_tick;
if(ifname == "*") if(ifname == "*")
{ {
@ -357,7 +357,7 @@ namespace llarp
ILinkLayer::SendTo(const RouterID& remote, const llarp_buffer_t& buf, ILinkLayer::SendTo(const RouterID& remote, const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completed) ILinkSession::CompletionHandler completed)
{ {
ILinkSession* s = nullptr; std::shared_ptr< ILinkSession > s;
{ {
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex); ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(remote); auto range = m_AuthedLinks.equal_range(remote);
@ -367,10 +367,10 @@ namespace llarp
while(itr != range.second) while(itr != range.second)
{ {
auto backlog = itr->second->SendQueueBacklog(); const auto backlog = itr->second->SendQueueBacklog();
if(backlog < min) if(backlog < min)
{ {
s = itr->second.get(); s = itr->second;
min = backlog; min = backlog;
} }
++itr; ++itr;
@ -463,31 +463,18 @@ namespace llarp
ILinkLayer::udp_tick(llarp_udp_io* udp) ILinkLayer::udp_tick(llarp_udp_io* udp)
{ {
ILinkLayer* link = static_cast< ILinkLayer* >(udp->user); ILinkLayer* link = static_cast< ILinkLayer* >(udp->user);
if(link->m_Recv == nullptr) auto pkts = std::make_shared< llarp_pkt_list >();
return; llarp_ev_udp_recvmany(&link->m_udp, pkts.get());
link->logic()->queue_func([traffic = std::move(link->m_Recv), l = link]() {
auto itr = traffic->begin(); link->logic()->queue_func([pkts, link]() {
while(itr != traffic->end()) auto itr = pkts->begin();
while(itr != pkts->end())
{ {
l->RecvFrom(itr->first, std::move(itr->second)); link->RecvFrom(itr->remote, std::move(itr->pkt));
++itr; ++itr;
} }
l->Pump(); link->Pump();
}); });
link->m_Recv.reset(new TrafficQueue_t());
}
void
ILinkLayer::udp_recv_from(llarp_udp_io* udp, const sockaddr* from,
ManagedBuffer buf)
{
ILinkLayer* link = static_cast< ILinkLayer* >(udp->user);
if(link->m_Recv == nullptr)
return;
link->m_Recv->emplace_back(
std::make_pair(Addr(*from), ILinkSession::Packet_t(buf.underlying.sz)));
std::copy_n(buf.underlying.base, buf.underlying.sz,
link->m_Recv->back().second.begin());
} }
} // namespace llarp } // namespace llarp

View File

@ -79,9 +79,6 @@ namespace llarp
static void static void
udp_tick(llarp_udp_io* udp); udp_tick(llarp_udp_io* udp);
static void
udp_recv_from(llarp_udp_io* udp, const sockaddr* from, ManagedBuffer buf);
void void
SendTo_LL(const llarp::Addr& to, const llarp_buffer_t& pkt) SendTo_LL(const llarp::Addr& to, const llarp_buffer_t& pkt)
{ {

View File

@ -3,6 +3,7 @@
#include <crypto/types.hpp> #include <crypto/types.hpp>
#include <net/net.hpp> #include <net/net.hpp>
#include <ev/ev.hpp>
#include <router_contact.hpp> #include <router_contact.hpp>
#include <util/types.hpp> #include <util/types.hpp>
@ -44,7 +45,7 @@ namespace llarp
/// message delivery result hook function /// message delivery result hook function
using CompletionHandler = std::function< void(DeliveryStatus) >; using CompletionHandler = std::function< void(DeliveryStatus) >;
using Packet_t = std::vector< byte_t >; using Packet_t = PacketBuffer;
using Message_t = std::vector< byte_t >; using Message_t = std::vector< byte_t >;
/// send a message buffer to the remote endpoint /// send a message buffer to the remote endpoint

View File

@ -8,7 +8,7 @@
#include <ctime> #include <ctime>
#include <iomanip> #include <iomanip>
#include <sstream> #include <sstream>
#include <thread> #include <util/thread/threading.hpp>
namespace llarp namespace llarp
{ {