pull/3/head
Ryan Tharp 6 years ago
commit 07f0653e83

@ -143,6 +143,7 @@ set(LIB_SRC
llarp/api/parser.cpp
llarp/routing/message_parser.cpp
llarp/routing/path_confirm.cpp
llarp/routing/path_latency.cpp
vendor/cppbackport-master/lib/fs/rename.cpp
vendor/cppbackport-master/lib/fs/filestatus.cpp
vendor/cppbackport-master/lib/fs/filetype.cpp

@ -66,7 +66,7 @@ testnet-build: testnet-configure
testnet: testnet-build
mkdir -p $(TESTNET_ROOT)
python3 contrib/testnet/genconf.py --bin=$(REPO)/llarpd --svc=30 --clients=300 --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF)
python3 contrib/testnet/genconf.py --bin=$(REPO)/llarpd --svc=10 --clients=1 --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF)
supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF)
test: debug-configure

@ -528,8 +528,6 @@ sent inside a TDFM encrypted to the hidden service's public encryption key.
transfer data fragment message (TDFM)
variant 1 (with path id):
transfer data between paths.
{
@ -544,12 +542,12 @@ transfer data to another path with id P on the local router place Y and T values
into y and z values into a LRDM message (respectively) and send it in the
downstream direction.
variant 2 (no path id):
transfer ip traffic message (TITM)
transfer ip traffic for exit
{
A: "T",
A: "E",
V: 0,
X: "<N bytes ipv6 packet>",
Y: "<16 bytes nounce>",
@ -563,7 +561,7 @@ cannot be found or the signature is invalid this message is dropped, otherwise
the X value is sent on the appropriate exit network interface.
When we recieve an ip packet from the internet to an exit address, we put it
into a TDFM, signed with the exit info's signing key and send it downstream the
into a TITM, signed with the exit info's signing key and send it downstream the
corrisponding path in an LRDM.
update exit path message (UXPM)

@ -50,6 +50,8 @@ struct llarp_udp_io
void *user;
void *impl;
struct llarp_ev_loop *parent;
/// called every event loop tick after reads
void (*tick)(struct llarp_udp_io *);
void (*recvfrom)(struct llarp_udp_io *, const struct sockaddr *, const void *,
ssize_t);
};

@ -25,7 +25,7 @@ namespace llarp
BDecode(llarp_buffer_t* buf);
bool
HandleMessage(IMessageHandler* h) const;
HandleMessage(IMessageHandler* h, llarp_router* r) const;
};
} // namespace routing
} // namespace llarp

@ -20,7 +20,7 @@ namespace llarp
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
bool
HandleMessage(IMessageHandler* r) const;
HandleMessage(IMessageHandler* h, llarp_router* r) const;
};
} // namespace routing
} // namespace llarp

@ -0,0 +1,22 @@
#ifndef LLARP_MESSAGES_PATH_TRANSFER_HPP
#define LLARP_MESSAGES_PATH_TRANSFER_HPP
#include <llarp/crypto.hpp>
#include <llarp/encrypted.hpp>
#include <llarp/routing/message.hpp>
namespace llarp
{
namespace routing
{
struct PathTransferMessage : public IMessage
{
PathID_t P;
Encrypted T;
TunnelNonce Y;
};
} // namespace routing
} // namespace llarp
#endif

@ -112,9 +112,10 @@ namespace llarp
llarp_router* r) = 0;
};
struct TransitHop : public IHopHandler
struct TransitHop : public IHopHandler,
public llarp::routing::IMessageHandler
{
TransitHop() = default;
TransitHop();
TransitHop(const TransitHop& other);
@ -125,6 +126,8 @@ namespace llarp
llarp_time_t lifetime = DEFAULT_PATH_LIFETIME;
llarp_proto_version_t version;
llarp::routing::InboundMessageParser m_MessageParser;
friend std::ostream&
operator<<(std::ostream& out, const TransitHop& h)
{
@ -135,9 +138,31 @@ namespace llarp
bool
Expired(llarp_time_t now) const;
// send routing message when end of path
bool
SendRoutingMessage(const llarp::routing::IMessage* msg, llarp_router* r);
// handle routing message when end of path
bool
HandleRoutingMessage(const llarp::routing::IMessage* msg,
llarp_router* r);
bool
HandlePathConfirmMessage(const llarp::routing::PathConfirmMessage* msg,
llarp_router* r);
bool
HandlePathTransferMessage(const llarp::routing::PathTransferMessage* msg,
llarp_router* r);
bool
HandlePathLatencyMessage(const llarp::routing::PathLatencyMessage* msg,
llarp_router* r);
bool
HandleDHTMessage(const llarp::dht::IMessage* msg, llarp_router* r);
bool
HandleHiddenServiceData(llarp_buffer_t buf, llarp_router* r);
// handle data in upstream direction
bool
HandleUpstream(llarp_buffer_t X, const TunnelNonce& Y, llarp_router* r);
@ -190,19 +215,25 @@ namespace llarp
SendRoutingMessage(const llarp::routing::IMessage* msg, llarp_router* r);
bool
HandlePathConfirmMessage(const llarp::routing::PathConfirmMessage* msg);
HandlePathConfirmMessage(const llarp::routing::PathConfirmMessage* msg,
llarp_router* r);
bool
HandlePathLatencyMessage(const llarp::routing::PathLatencyMessage* msg);
HandlePathLatencyMessage(const llarp::routing::PathLatencyMessage* msg,
llarp_router* r);
bool
HandleDHTMessage(const llarp::dht::IMessage* msg);
HandlePathTransferMessage(const llarp::routing::PathTransferMessage* msg,
llarp_router* r);
bool
HandleDHTMessage(const llarp::dht::IMessage* msg, llarp_router* r);
bool
HandleRoutingMessage(llarp_buffer_t buf, llarp_router* r);
bool
HandleHiddenServiceData(llarp_buffer_t buf);
HandleHiddenServiceData(llarp_buffer_t buf, llarp_router* r);
// handle data in upstream direction
bool
@ -223,11 +254,15 @@ namespace llarp
RouterID
Upstream() const;
llarp_time_t Latency = 0;
protected:
llarp::routing::InboundMessageParser m_InboundMessageParser;
private:
BuildResultHookFunc m_BuiltHook;
llarp_time_t m_LastLatencyTestTime = 0;
uint64_t m_LastLatencyTestID = 0;
};
enum PathBuildStatus

@ -49,12 +49,11 @@ namespace llarp
ShouldBuildMore() const;
private:
typedef std::map< PathID_t, Path* > PathMap_t;
// (tx,rx)
typedef std::tuple< PathMap_t, PathMap_t > PathContainer_t;
typedef std::pair< RouterID, PathID_t > PathInfo_t;
typedef std::map< PathInfo_t, Path* > PathMap_t;
size_t m_NumPaths;
PathContainer_t m_Paths;
PathMap_t m_Paths;
};
} // namespace path

@ -2,28 +2,37 @@
#define LLARP_ROUTING_HANDLER_HPP
#include <llarp/buffer.h>
#include <llarp/router.h>
#include <llarp/dht.hpp>
#include <llarp/messages/path_confirm.hpp>
#include <llarp/messages/path_latency.hpp>
#include <llarp/messages/path_transfer.hpp>
namespace llarp
{
namespace routing
{
// handles messages on owned paths
// handles messages on the routing level
struct IMessageHandler
{
virtual bool
HandleHiddenServiceData(llarp_buffer_t buf) = 0;
HandlePathTransferMessage(const PathTransferMessage* msg,
llarp_router* r) = 0;
virtual bool
HandlePathConfirmMessage(const PathConfirmMessage* msg) = 0;
HandleHiddenServiceData(llarp_buffer_t buf, llarp_router* r) = 0;
virtual bool
HandlePathLatencyMessage(const PathLatencyMessage* msg) = 0;
HandlePathConfirmMessage(const PathConfirmMessage* msg,
llarp_router* r) = 0;
virtual bool
HandleDHTMessage(const llarp::dht::IMessage* msg) = 0;
HandlePathLatencyMessage(const PathLatencyMessage* msg,
llarp_router* r) = 0;
virtual bool
HandleDHTMessage(const llarp::dht::IMessage* msg, llarp_router* r) = 0;
};
} // namespace routing
} // namespace llarp

@ -17,14 +17,15 @@ namespace llarp
llarp::PathID_t from;
virtual bool
HandleMessage(IMessageHandler* r) const = 0;
HandleMessage(IMessageHandler* h, llarp_router* r) const = 0;
};
struct InboundMessageParser
{
InboundMessageParser();
bool
ParseMessageBuffer(llarp_buffer_t buf, IMessageHandler* handler);
ParseMessageBuffer(llarp_buffer_t buf, IMessageHandler* handler,
llarp_router* r);
private:
static bool

@ -5,46 +5,53 @@
#include <functional>
#include <mutex>
#include <queue>
#include <string>
namespace llarp
{
namespace util
{
template < typename T, typename GetTime, llarp_time_t dropMs = 200,
llarp_time_t initialIntervalMs = 100 >
template < typename T, typename GetTime, typename PutTime,
llarp_time_t dropMs = 20, llarp_time_t initialIntervalMs = 100 >
struct CoDelQueue
{
CoDelQueue(const std::string& name) : m_name(name)
{
}
struct CoDelCompare
{
GetTime getTime = GetTime();
bool
operator()(const T& left, const T& right) const
{
return getTime(left) < getTime(right);
return GetTime()(left) < GetTime()(right);
}
};
void
Put(T* item)
Put(const T& i)
{
std::unique_lock< std::mutex > lock(m_QueueMutex);
m_Queue.push(*item);
//llarp::Info("CoDelQueue::Put - adding item, queue now has ", m_Queue.size(), " items at ", getTime(*item));
PutTime()(i);
m_Queue.push(i);
if(firstPut == 0)
firstPut = GetTime()(i);
}
void
Process(std::queue< T >& result)
{
llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL;
auto start = llarp_time_now_ms();
//auto start = llarp_time_now_ms();
//llarp::Info("CoDelQueue::Process - start at ", start);
std::unique_lock< std::mutex > lock(m_QueueMutex);
auto start = firstPut;
while(m_Queue.size())
{
//llarp::Info("CoDelQueue::Process - queue has ", m_Queue.size());
const auto& item = m_Queue.top();
auto dlt = start - getTime(item);
auto dlt = start - GetTime()(item);
//llarp::Info("CoDelQueue::Process - dlt ", dlt);
lowest = std::min(dlt, lowest);
if(m_Queue.size() == 1)
@ -53,10 +60,12 @@ namespace llarp
if(lowest > dropMs)
{
// drop
llarp::Error("CoDelQueue::Process - dropping");
nextTickInterval = initialIntervalMs / std::sqrt(++dropNum);
nextTickInterval += initialIntervalMs / std::sqrt(++dropNum);
llarp::Warn("CoDel queue ", m_name, " drop ", nextTickInterval,
" ms next interval lowest=", lowest);
delete item;
m_Queue.pop();
return;
break;
}
else
{
@ -68,13 +77,15 @@ namespace llarp
result.push(item);
m_Queue.pop();
}
firstPut = 0;
}
GetTime getTime = GetTime();
llarp_time_t firstPut = 0;
size_t dropNum = 0;
llarp_time_t nextTickInterval = initialIntervalMs;
std::mutex m_QueueMutex;
std::priority_queue< T, std::vector< T >, CoDelCompare > m_Queue;
std::string m_name;
};
} // namespace util
} // namespace llarp

@ -13,7 +13,6 @@
#endif
extern "C" {
void
llarp_ev_loop_alloc(struct llarp_ev_loop **ev)
{

@ -3,6 +3,7 @@
#include <llarp/ev.h>
#include <unistd.h>
#include <list>
namespace llarp
{
@ -12,6 +13,7 @@ namespace llarp
ev_io(int f) : fd(f){};
virtual int
read(void* buf, size_t sz) = 0;
virtual int
sendto(const sockaddr* dst, const void* data, size_t sz) = 0;
virtual ~ev_io()
@ -42,6 +44,8 @@ struct llarp_ev_loop
close_ev(llarp::ev_io* ev) = 0;
virtual ~llarp_ev_loop(){};
std::list< llarp_udp_io* > udp_listeners;
};
#endif

@ -131,6 +131,9 @@ struct llarp_epoll_loop : public llarp_ev_loop
++idx;
}
}
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
return result;
}
@ -166,6 +169,9 @@ struct llarp_epoll_loop : public llarp_ev_loop
++idx;
}
}
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
} while(epollfd != -1);
return result;
}
@ -238,6 +244,7 @@ struct llarp_epoll_loop : public llarp_ev_loop
return false;
}
l->impl = listener;
udp_listeners.push_back(l);
return true;
}
@ -252,6 +259,7 @@ struct llarp_epoll_loop : public llarp_ev_loop
close_ev(listener);
l->impl = nullptr;
delete listener;
udp_listeners.remove(l);
}
return ret;
}

@ -123,6 +123,9 @@ struct llarp_kqueue_loop : public llarp_ev_loop
++idx;
}
}
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
return result;
}
@ -151,6 +154,9 @@ struct llarp_kqueue_loop : public llarp_ev_loop
++idx;
}
}
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
} while(result != -1);
return result;
}
@ -235,7 +241,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop
delete listener;
return false;
}
udp_listeners.push_back(l);
l->impl = listener;
return true;
}
@ -250,6 +256,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop
ret = close_ev(listener);
delete listener;
l->impl = nullptr;
udp_listeners.remove(l);
}
return ret;
}

@ -557,7 +557,7 @@ namespace iwp
if(itr == rx.end())
{
llarp::Warn("no such RX fragment, msgid=", msgid);
return false;
return true;
}
auto fragsize = itr->second->msginfo.fragsize();
if(fragsize != sz - 9)
@ -657,7 +657,7 @@ namespace iwp
llarp::Debug("iwp_link::frame_state::process Got frag");
return got_frag(hdr, sz - 6);
default:
llarp::Error("iwp_link::frame_state::process - unknown header message type: ", (int)hdr.msgtype());
llarp::Warn("iwp_link::frame_state::process - unknown header message type: ", (int)hdr.msgtype());
return false;
}
}
@ -667,9 +667,18 @@ namespace iwp
struct FrameGetTime
{
llarp_time_t
operator()(const iwp_async_frame &frame) const
operator()(const iwp_async_frame *frame) const
{
return frame->created;
}
};
struct FramePutTime
{
void
operator()(iwp_async_frame *frame) const
{
return frame.created;
frame->created = llarp_time_now_ms();
}
};
@ -693,18 +702,20 @@ namespace iwp
llarp_link_establish_job *establish_job = nullptr;
/// cached timestamp for frame creation
llarp_time_t now;
llarp_time_t now, inboundNow;
uint32_t establish_job_id = 0;
uint32_t frames = 0;
bool working = false;
llarp::util::CoDelQueue< iwp_async_frame, FrameGetTime > inboundFrames;
llarp::util::CoDelQueue< iwp_async_frame, FrameGetTime > outboundFrames;
std::mutex m_DecryptedFramesMutex;
std::queue< iwp_async_frame > decryptedFrames;
llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime >
outboundFrames;
/*
std::mutex m_EncryptedFramesMutex;
std::queue< iwp_async_frame > encryptedFrames;
llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime >
decryptedFrames;
*/
uint32_t pump_send_timer_id = 0;
uint32_t pump_recv_timer_id = 0;
@ -713,6 +724,7 @@ namespace iwp
iwp_async_introack introack;
iwp_async_session_start start;
frame_state frame;
bool started_inbound_codel = false;
byte_t token[32];
byte_t workbuf[MAX_PAD + 128];
@ -734,7 +746,14 @@ namespace iwp
session(llarp_udp_io *u, llarp_async_iwp *i, llarp_crypto *c,
llarp_logic *l, const byte_t *seckey, const llarp::Addr &a)
: udp(u), crypto(c), iwp(i), logic(l), addr(a), state(eInitial)
: udp(u)
, crypto(c)
, iwp(i)
, logic(l)
, outboundFrames("iwp_outbound")
//, decryptedFrames("iwp_inbound")
, addr(a)
, state(eInitial)
{
eph_seckey = seckey;
llarp::Zero(&remote_router, sizeof(llarp_rc));
@ -779,6 +798,7 @@ namespace iwp
frame.queue_tx(id, msg);
pump();
PumpCryptoOutbound();
}
static void
@ -803,30 +823,74 @@ namespace iwp
return false;
}
static void
handle_codel_inbound_pump(void *u, uint64_t orig, uint64_t left);
static void
handle_codel_outbound_pump(void *u, uint64_t orig, uint64_t left);
void
PumpCrypto();
PumpCryptoOutbound();
/*
void
HandleInboundCodel()
{
std::queue< iwp_async_frame * > outq;
decryptedFrames.Process(outq);
while(outq.size())
{
auto &front = outq.front();
handle_frame_decrypt(front);
delete front;
outq.pop();
}
PumpCryptoOutbound();
}
void
PumpCodelInbound()
{
pump_recv_timer_id = llarp_logic_call_later(
logic,
{inboundFrames.nextTickInterval, this, &handle_codel_inbound_pump});
}
static void
handle_inbound_codel_delayed(void *user, uint64_t orig, uint64_t left)
{
if(left)
return;
session *self = static_cast< session * >(user);
self->pump_recv_timer_id = 0;
self->HandleInboundCodel();
self->PumpCodelInbound();
}
void
PumpCodelOutbound()
{
pump_send_timer_id = llarp_logic_call_later(
logic,
{outboundFrames.nextTickInterval, this, &handle_codel_outbound_pump});
}
static void
handle_start_inbound_codel(void *user)
{
session *self = static_cast< session * >(user);
self->HandleInboundCodel();
self->PumpCodelInbound();
}
void
StartInboundCodel()
{
if(started_inbound_codel)
return;
started_inbound_codel = true;
llarp_logic_queue_job(logic, {this, &handle_start_inbound_codel});
}
static void
handle_pump_inbound_codel(void *user)
{
session *self = static_cast< session * >(user);
self->HandleInboundCodel();
}
void
ManualPumpInboundCodel()
{
llarp_logic_queue_job(logic, {this, &handle_pump_inbound_codel});
}
void
PumpCodelInbound()
{
pump_recv_timer_id =
llarp_logic_call_later(logic,
{decryptedFrames.nextTickInterval, this,
&handle_inbound_codel_delayed});
}
*/
void
pump()
{
@ -840,7 +904,6 @@ namespace iwp
encrypt_frame_async_send(buf.base, buf.sz);
frame.pop_next_frame();
}
PumpCrypto();
}
// this is called from net thread
@ -907,11 +970,10 @@ namespace iwp
crypto->shorthash(digest, buf);
auto id = frame.txids++;
auto msg = new transit_message(buf, digest, id);
// enter state
EnterState(eLIMSent);
// put into outbound send queue
add_outbound_message(id, msg);
// enter state
EnterState(eLIMSent);
}
else
llarp::Error("LIM Encode failed");
@ -1046,12 +1108,13 @@ namespace iwp
handle_frame_decrypt(iwp_async_frame *frame)
{
session *self = static_cast< session * >(frame->user);
llarp::Debug("rx ", frame->sz, " frames=", self->frames);
llarp::Debug("rx ", frame->sz);
if(frame->success)
{
if(self->frame.process(frame->buf + 64, frame->sz - 64))
{
self->frame.alive();
self->pump();
}
else
llarp::Error("invalid frame from ", self->addr);
@ -1065,16 +1128,38 @@ namespace iwp
{
if(sz > 64)
{
auto frame = alloc_frame(inboundFrames, buf, sz);
inboundFrames.Put(frame);
//auto frame = alloc_frame(inboundFrames, buf, sz);
//inboundFrames.Put(frame);
auto f = alloc_frame(buf, sz);
/*
if(iwp_decrypt_frame(f))
{
decryptedFrames.Put(f);
if(state == eEstablished)
{
if(pump_recv_timer_id == 0)
PumpCodelInbound();
}
else
ManualPumpInboundCodel();
}
else
llarp::Warn("decrypt frame fail");
*/
f->hook = &handle_frame_decrypt;
iwp_call_async_frame_decrypt(iwp, f);
}
else
llarp::Warn("short packet of ", sz, " bytes");
}
//static void
//handle_crypto_pump(void *u);
static void
handle_crypto_pump(void *u);
handle_crypto_outbound(void *u);
/*
void
DecryptInboundFrames()
{
@ -1103,20 +1188,20 @@ namespace iwp
}
}
}
*/
static void
handle_frame_encrypt(iwp_async_frame *frame)
{
session *self = static_cast< session * >(frame->user);
llarp::Debug("tx ", frame->sz, " frames=", self->frames);
llarp::Debug("tx ", frame->sz);
if(llarp_ev_udp_sendto(self->udp, self->addr, frame->buf, frame->sz)
== -1)
llarp::Warn("sendto failed");
}
template < typename Queue >
iwp_async_frame *
alloc_frame(Queue &q, const void *buf, size_t sz)
alloc_frame(const void *buf, size_t sz)
{
// TODO don't hard code 1500
if(sz > 1500)
@ -1133,7 +1218,7 @@ namespace iwp
frame->user = this;
frame->sessionkey = sessionkey;
/// TODO: this could be rather slow
frame->created = now;
//frame->created = now;
//llarp::Info("alloc_frame putting into q");
//q.Put(frame);
return frame;
@ -1143,7 +1228,7 @@ namespace iwp
encrypt_frame_async_send(const void *buf, size_t sz)
{
// 64 bytes frame overhead for nonce and hmac
iwp_async_frame *frame = alloc_frame(outboundFrames, nullptr, sz + 64);
iwp_async_frame *frame = alloc_frame(nullptr, sz + 64);
memcpy(frame->buf + 64, buf, sz);
// maybe add upto 128 random bytes to the packet
auto padding = rand() % MAX_PAD;
@ -1157,25 +1242,19 @@ namespace iwp
void
EncryptOutboundFrames()
{
std::queue< iwp_async_frame > q;
std::queue< iwp_async_frame > outq;
std::queue< iwp_async_frame * > outq;
outboundFrames.Process(outq);
while(outq.size())
{
auto &front = outq.front();
if(iwp_encrypt_frame(&front))
q.push(front);
//if(iwp_encrypt_frame(&front))
//q.push(front);
if(iwp_encrypt_frame(front))
handle_frame_encrypt(front);
delete front;
outq.pop();
}
{
std::unique_lock< std::mutex > lock(m_EncryptedFramesMutex);
while(q.size())
{
encryptedFrames.push(q.front());
q.pop();
}
}
}
static void
@ -1315,14 +1394,16 @@ namespace iwp
llarp::Debug("EnterState - entering state: ", st, state==eLIMSent?"eLIMSent":"", state==eSessionStartSent?"eSessionStartSent":"");
frame.alive();
state = st;
if(state == eLIMSent || state == eSessionStartSent)
if(state == eSessionStartSent || state == eIntroAckSent)
{
//llarp::Info("EnterState - ", state==eLIMSent?"eLIMSent":"", state==eSessionStartSent?"eSessionStartSent":"");
PumpCodelInbound();
PumpCodelOutbound();
//PumpCodelInbound();
//PumpCodelOutbound();
PumpCryptoOutbound();
// StartInboundCodel();
}
}
};
}; // namespace iwp
struct server
{
@ -1571,7 +1652,6 @@ namespace iwp
server *link = static_cast< server * >(l);
link->timeout_job_id = 0;
link->TickSessions();
// TODO: exponential backoff for cleanup timer ?
link->issue_cleanup_timer(orig);
}
@ -1752,6 +1832,8 @@ namespace iwp
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
self->now = llarp_time_now_ms();
self->encrypt_frame_async_send(buf.base, buf.sz);
self->pump();
self->PumpCryptoOutbound();
}
bool
@ -1778,8 +1860,8 @@ namespace iwp
auto itr = tx.find(msgid);
if(itr == tx.end())
{
llarp::Error("ACK for missing TX frame msgid=", msgid);
return false;
llarp::Debug("ACK for missing TX frame msgid=", msgid);
return true;
}
transit_message *msg = itr->second;
@ -1855,67 +1937,23 @@ namespace iwp
//llarp::Debug("Tick - pumping and retransmitting because we're eEstablished");
frame.retransmit();
pump();
PumpCryptoOutbound();
}
// TODO: determine if we are too idle
return false;
}
void
session::handle_codel_outbound_pump(void *u, uint64_t orig, uint64_t left)
{
if(left)
return;
session *self = static_cast< session * >(u);
self->pump_send_timer_id = 0;
if(self->timedout(llarp_time_now_ms()))
return;
{
std::unique_lock< std::mutex > lock(self->m_EncryptedFramesMutex);
while(self->encryptedFrames.size())
{
auto &front = self->encryptedFrames.front();
handle_frame_encrypt(&front);
self->encryptedFrames.pop();
}
}
self->PumpCodelOutbound();
self->PumpCrypto();
}
void
session::handle_codel_inbound_pump(void *u, uint64_t orig, uint64_t left)
{
if(left)
return;
session *self = static_cast< session * >(u);
self->pump_recv_timer_id = 0;
if(self->timedout(llarp_time_now_ms()))
return;
{
std::unique_lock< std::mutex > lock(self->m_DecryptedFramesMutex);
while(self->decryptedFrames.size())
{
auto &front = self->decryptedFrames.front();
handle_frame_decrypt(&front);
self->decryptedFrames.pop();
}
}
self->PumpCodelInbound();
self->PumpCrypto();
}
void
session::PumpCrypto()
session::PumpCryptoOutbound()
{
llarp_threadpool_queue_job(serv->worker, {this, &handle_crypto_pump});
llarp_threadpool_queue_job(serv->worker, {this, &handle_crypto_outbound});
}
void
session::handle_crypto_pump(void *u)
session::handle_crypto_outbound(void *u)
{
session *self = static_cast< session * >(u);
self->EncryptOutboundFrames();
self->DecryptInboundFrames();
}
void
@ -2030,6 +2068,7 @@ namespace iwp
link->netloop = netloop;
link->udp.recvfrom = &server::handle_recvfrom;
link->udp.user = link;
link->udp.tick = nullptr;
llarp::Debug("bind IWP link to ", link->addr);
if(llarp_ev_add_udp(link->netloop, &link->udp, link->addr) == -1)
{
@ -2048,7 +2087,7 @@ namespace iwp
link->timeout_job_id = 0;
link->logic = logic;
// start cleanup timer
link->issue_cleanup_timer(1000);
link->issue_cleanup_timer(100);
return true;
}

@ -312,6 +312,7 @@ nodedb_async_load_rc(void *user)
job->loaded = job->nodedb->loadfile(fpath);
if(job->loaded)
{
llarp_rc_clear(&job->rc);
llarp_rc_copy(&job->rc, job->nodedb->getRC(job->pubkey));
}
llarp_logic_queue_job(job->logic, {job, &nodedb_inform_load_rc});

@ -266,10 +266,16 @@ namespace llarp
hops[idx].txID.Randomize();
hops[idx].rxID.Randomize();
}
/*
for(size_t idx = (h->numHops - 1); idx > 0; --idx)
{
hops[idx].txID = hops[idx - 1].rxID;
}
*/
for(size_t idx = 0; idx < h->numHops - 1; ++idx)
{
hops[idx].txID = hops[idx + 1].rxID;
}
}
void
@ -334,7 +340,7 @@ namespace llarp
}
bool
Path::HandleHiddenServiceData(llarp_buffer_t buf)
Path::HandleHiddenServiceData(llarp_buffer_t buf, llarp_router* r)
{
// TODO: implement me
return false;
@ -343,7 +349,7 @@ namespace llarp
bool
Path::HandleRoutingMessage(llarp_buffer_t buf, llarp_router* r)
{
if(!m_InboundMessageParser.ParseMessageBuffer(buf, this))
if(!m_InboundMessageParser.ParseMessageBuffer(buf, this, r))
{
llarp::Warn("Failed to parse inbound routing message");
return false;
@ -368,18 +374,32 @@ namespace llarp
return HandleUpstream(buf, N, r);
}
bool
Path::HandlePathTransferMessage(
const llarp::routing::PathTransferMessage* msg, llarp_router* r)
{
llarp::Warn("unwarrented path transfer message on tx=", TXID(),
" rx=", RXID());
return false;
}
bool
Path::HandlePathConfirmMessage(
const llarp::routing::PathConfirmMessage* msg)
const llarp::routing::PathConfirmMessage* msg, llarp_router* r)
{
if(status == ePathBuilding)
{
// confirm that we build the path
status = ePathEstablished;
llarp::Info("path is confirmed rx=", RXID(), " tx=", TXID());
if(m_BuiltHook)
m_BuiltHook(this);
m_BuiltHook = nullptr;
return true;
llarp::routing::PathLatencyMessage latency;
latency.T = rand();
m_LastLatencyTestID = latency.T;
m_LastLatencyTestTime = llarp_time_now_ms();
return SendRoutingMessage(&latency, r);
}
llarp::Warn("got unwarrented path confirm message on rx=", RXID(),
" tx=", TXID());
@ -388,14 +408,19 @@ namespace llarp
bool
Path::HandlePathLatencyMessage(
const llarp::routing::PathLatencyMessage* msg)
const llarp::routing::PathLatencyMessage* msg, llarp_router* r)
{
// TODO: implement me
if(msg->L == m_LastLatencyTestID)
{
Latency = llarp_time_now_ms() - m_LastLatencyTestTime;
llarp::Info("path latency is ", Latency, " ms");
return true;
}
return false;
}
bool
Path::HandleDHTMessage(const llarp::dht::IMessage* msg)
Path::HandleDHTMessage(const llarp::dht::IMessage* msg, llarp_router* r)
{
// TODO: implement me
return false;

@ -139,7 +139,8 @@ namespace llarp
llarp::Error("failed to send LRCM");
return;
}
ctx->path->status = llarp::path::ePathBuilding;
ctx->path->status = llarp::path::ePathBuilding;
ctx->path->buildStarted = llarp_time_now_ms();
router->paths.AddOwnPath(ctx->pathset, ctx->path);
ctx->user->pathBuildStarted(ctx->user);
}

@ -12,35 +12,22 @@ namespace llarp
bool
PathSet::ShouldBuildMore() const
{
return std::get< 0 >(m_Paths).size() < m_NumPaths;
return m_Paths.size() < m_NumPaths;
}
void
PathSet::ExpirePaths(llarp_time_t now)
{
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
auto& map = std::get< 0 >(m_Paths);
auto itr = map.begin();
while(itr != map.end())
if(itr->second->Expired(now))
{
if(itr->second->Expired(now))
{
itr = map.erase(itr);
}
}
}
{
auto& map = std::get< 1 >(m_Paths);
auto itr = map.begin();
while(itr != map.end())
{
if(itr->second->Expired(now))
{
// delete path on second iteration
delete itr->second;
itr = map.erase(itr);
}
delete itr->second;
itr = m_Paths.erase(itr);
}
else
++itr;
}
}
@ -48,9 +35,8 @@ namespace llarp
PathSet::NumInStatus(PathStatus st) const
{
size_t count = 0;
auto& map = std::get< 0 >(m_Paths);
auto itr = map.begin();
while(itr != map.end())
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
if(itr->second->status == st)
++count;
@ -62,29 +48,22 @@ namespace llarp
void
PathSet::AddPath(Path* path)
{
std::get< 0 >(m_Paths).emplace(path->TXID(), path);
std::get< 1 >(m_Paths).emplace(path->RXID(), path);
m_Paths.emplace(std::make_pair(path->Upstream(), path->RXID()), path);
}
void
PathSet::RemovePath(Path* path)
{
std::get< 0 >(m_Paths).erase(path->TXID());
std::get< 1 >(m_Paths).erase(path->RXID());
m_Paths.erase({path->Upstream(), path->RXID()});
}
Path*
PathSet::GetByUpstream(const RouterID& remote, const PathID_t& rxid)
{
auto& set = std::get< 1 >(m_Paths);
auto itr = set.begin();
while(itr != set.end())
{
if(itr->second->Upstream() == remote)
return itr->second;
++itr;
}
return nullptr;
auto itr = m_Paths.find({remote, rxid});
if(itr == m_Paths.end())
return nullptr;
return itr->second;
}
void

@ -257,7 +257,7 @@ namespace llarp
// we pop the front element it was ours
self->frames.pop_front();
// put our response on the end
self->frames.emplace_back(sz);
self->frames.emplace_back(sz - EncryptedFrame::OverheadSize);
// random junk for now
self->frames.back().Randomize();

@ -21,20 +21,51 @@ namespace llarp
bool
RelayUpstreamMessage::BEncode(llarp_buffer_t *buf) const
{
// TODO: implement me
return false;
if(!bencode_start_dict(buf))
return false;
if(!BEncodeWriteDictMsgType(buf, "a", "u"))
return false;
if(!BEncodeWriteDictEntry("p", pathid, buf))
return false;
if(!BEncodeWriteDictInt(buf, "v", LLARP_PROTO_VERSION))
return false;
if(!BEncodeWriteDictEntry("x", X, buf))
return false;
if(!BEncodeWriteDictEntry("y", Y, buf))
return false;
return bencode_end(buf);
}
bool
RelayUpstreamMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *buf)
{
return false;
bool read = false;
if(!BEncodeMaybeReadDictEntry("p", pathid, read, key, buf))
return false;
if(!BEncodeMaybeReadVersion("v", version, LLARP_PROTO_VERSION, read, key,
buf))
return false;
if(!BEncodeMaybeReadDictEntry("x", X, read, key, buf))
return false;
if(!BEncodeMaybeReadDictEntry("y", Y, read, key, buf))
return false;
return read;
}
bool
RelayUpstreamMessage::HandleMessage(llarp_router *router) const
{
return false;
auto path = router->paths.GetByDownstream(remote, pathid);
if(path)
{
return path->HandleUpstream(X.Buffer(), Y, router);
}
else
{
llarp::Warn("No such path downstream=", remote, " pathid=", pathid);
return false;
}
}
RelayDownstreamMessage::RelayDownstreamMessage(const RouterID &from)

@ -32,7 +32,7 @@ llarp_router::llarp_router()
: ready(false)
, paths(this)
, dht(llarp_dht_context_new(this))
, inbound_msg_parser(this)
, inbound_link_msg_parser(this)
, explorePool(llarp_pathbuilder_context_new(this, dht))
{
@ -49,7 +49,7 @@ bool
llarp_router::HandleRecvLinkMessage(llarp_link_session *session,
llarp_buffer_t buf)
{
return inbound_msg_parser.ProcessFrom(session, buf);
return inbound_link_msg_parser.ProcessFrom(session, buf);
}
bool
@ -371,19 +371,6 @@ llarp_router::HandleExploritoryPathBuildStarted(llarp_pathbuild_job *job)
delete job;
}
// TODO: do we still need this?
void
llarp_router::BuildExploritoryPath()
{
llarp_pathbuild_job *job = new llarp_pathbuild_job;
job->context = explorePool;
job->selectHop = selectHopFunc;
job->hops.numHops = 4;
job->user = this;
job->pathBuildStarted = &HandleExploritoryPathBuildStarted;
llarp_pathbuilder_build_path(job);
}
void
llarp_router::Tick()
{

@ -14,6 +14,7 @@
#include <llarp/dht.hpp>
#include <llarp/link_message.hpp>
#include <llarp/routing/handler.hpp>
#include "crypto.hpp"
#include "fs.hpp"
@ -27,6 +28,11 @@ namespace llarp
llarp_ai addr;
};
// forward declare
namespace path
{
struct TransitHop;
}
} // namespace llarp
/// c++
@ -74,7 +80,8 @@ struct llarp_router
uint32_t ticker_job_id = 0;
llarp::InboundMessageParser inbound_msg_parser;
llarp::InboundMessageParser inbound_link_msg_parser;
llarp::routing::InboundMessageParser inbound_routing_msg_parser;
llarp_pathbuilder_select_hop_func selectHopFunc = nullptr;
llarp_pathbuilder_context *explorePool = nullptr;
@ -93,7 +100,7 @@ struct llarp_router
std::map< llarp::PubKey, llarp_link_establish_job > pendingEstablishJobs;
llarp_router();
~llarp_router();
virtual ~llarp_router();
bool
HandleRecvLinkMessage(struct llarp_link_session *from, llarp_buffer_t msg);
@ -174,9 +181,6 @@ struct llarp_router
void
Tick();
void
BuildExploritoryPath();
/// schedule ticker to call i ms from now
void
ScheduleTicker(uint64_t i = 1000);

@ -1,4 +1,5 @@
#include <llarp/messages/path_confirm.hpp>
#include <llarp/messages/path_latency.hpp>
#include <llarp/routing/message.hpp>
namespace llarp
@ -36,6 +37,9 @@ namespace llarp
return false;
switch(*strbuf.cur)
{
case 'L':
self->msg = new PathLatencyMessage;
break;
case 'P':
self->msg = new PathConfirmMessage;
break;
@ -53,16 +57,19 @@ namespace llarp
bool
InboundMessageParser::ParseMessageBuffer(llarp_buffer_t buf,
IMessageHandler* h)
IMessageHandler* h,
llarp_router* r)
{
bool result = false;
msg = nullptr;
firstKey = true;
if(bencode_read_dict(&buf, &reader))
{
result = msg->HandleMessage(h);
result = msg->HandleMessage(h, r);
delete msg;
}
else
llarp::Error("read dict failed");
return result;
}
} // namespace routing

@ -42,9 +42,9 @@ namespace llarp
}
bool
PathConfirmMessage::HandleMessage(IMessageHandler* h) const
PathConfirmMessage::HandleMessage(IMessageHandler* h, llarp_router* r) const
{
return h && h->HandlePathConfirmMessage(this);
return h && h->HandlePathConfirmMessage(this, r);
}
} // namespace routing

@ -1,4 +1,5 @@
#include <llarp/messages/path_latency.hpp>
#include <llarp/routing/handler.hpp>
namespace llarp
{
@ -7,5 +8,43 @@ namespace llarp
PathLatencyMessage::PathLatencyMessage()
{
}
bool
PathLatencyMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t* val)
{
bool read = false;
if(!BEncodeMaybeReadDictInt("L", L, read, key, val))
return false;
if(!BEncodeMaybeReadDictInt("T", T, read, key, val))
return false;
return read;
}
bool
PathLatencyMessage::BEncode(llarp_buffer_t* buf) const
{
if(!bencode_start_dict(buf))
return false;
if(!BEncodeWriteDictMsgType(buf, "A", "L"))
return false;
if(L)
{
if(!BEncodeWriteDictInt(buf, "L", L))
return false;
}
if(T)
{
if(!BEncodeWriteDictInt(buf, "T", T))
return false;
}
return bencode_end(buf);
}
bool
PathLatencyMessage::HandleMessage(IMessageHandler* h, llarp_router* r) const
{
return h && h->HandlePathLatencyMessage(this, r);
}
} // namespace routing
} // namespace llarp

@ -1,4 +1,5 @@
#include <llarp/path.hpp>
#include <llarp/routing/handler.hpp>
#include "buffer.hpp"
#include "router.hpp"
@ -6,6 +7,10 @@ namespace llarp
{
namespace path
{
TransitHop::TransitHop()
{
}
bool
TransitHop::Expired(llarp_time_t now) const
{
@ -62,7 +67,7 @@ namespace llarp
llarp_router* r)
{
RelayDownstreamMessage* msg = new RelayDownstreamMessage;
msg->pathid = info.txID;
msg->pathid = info.rxID;
msg->Y = Y;
r->crypto.xchacha20(buf, pathKey, Y);
@ -76,15 +81,69 @@ namespace llarp
TransitHop::HandleUpstream(llarp_buffer_t buf, const TunnelNonce& Y,
llarp_router* r)
{
RelayUpstreamMessage* msg = new RelayUpstreamMessage;
msg->pathid = info.rxID;
msg->Y = Y;
r->crypto.xchacha20(buf, pathKey, Y);
msg->X = buf;
llarp::Info("relay ", msg->X.size(), " bytes upstream from ",
info.downstream, " to ", info.upstream);
return r->SendToOrQueue(info.upstream, msg);
if(info.upstream == RouterID(r->pubkey()))
{
return m_MessageParser.ParseMessageBuffer(buf, this, r);
}
else
{
RelayUpstreamMessage* msg = new RelayUpstreamMessage;
msg->pathid = info.txID;
msg->Y = Y;
msg->X = buf;
llarp::Info("relay ", msg->X.size(), " bytes upstream from ",
info.downstream, " to ", info.upstream);
return r->SendToOrQueue(info.upstream, msg);
}
}
bool
TransitHop::HandleDHTMessage(const llarp::dht::IMessage* msg,
llarp_router* r)
{
// TODO: implement me
return false;
}
bool
TransitHop::HandlePathLatencyMessage(
const llarp::routing::PathLatencyMessage* msg, llarp_router* r)
{
llarp::routing::PathLatencyMessage reply;
reply.L = msg->T;
llarp::Info("got latency message ", msg->T);
return SendRoutingMessage(&reply, r);
}
bool
TransitHop::HandlePathConfirmMessage(
const llarp::routing::PathConfirmMessage* msg, llarp_router* r)
{
llarp::Warn("unwarrented path confirm message on ", info);
return false;
}
bool
TransitHop::HandlePathTransferMessage(
const llarp::routing::PathTransferMessage* msg, llarp_router* r)
{
auto path = r->paths.GetByDownstream(r->pubkey(), msg->P);
if(path)
{
return path->HandleDownstream(msg->T.Buffer(), msg->Y, r);
}
llarp::Warn("No such path for path transfer pathid=", msg->P);
return false;
}
bool
TransitHop::HandleHiddenServiceData(llarp_buffer_t buf, llarp_router* r)
{
llarp::Warn("unwarrented hidden service data on ", info);
return false;
}
} // namespace path
} // namespace llarp

Loading…
Cancel
Save