add link level keepalive

remove debugging messages

start handling more messages
pull/1/head
Jeff Becker 6 years ago
parent 93f0e03958
commit c51d29a0c6
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -62,6 +62,8 @@ typedef bool (*llarp_sign_func)(byte_t *, const byte_t *, llarp_buffer_t);
typedef bool (*llarp_verify_func)(const byte_t *, llarp_buffer_t,
const byte_t *);
typedef bool (*llarp_frame_crypto_func)(llarp_buffer_t *, const byte_t *);
/// library crypto configuration
struct llarp_crypto
{
@ -75,6 +77,8 @@ struct llarp_crypto
llarp_hmac_func hmac;
llarp_sign_func sign;
llarp_verify_func verify;
llarp_frame_crypto_func encrypt_frame;
llarp_frame_crypto_func decrypt_frame;
void (*randomize)(llarp_buffer_t);
void (*randbytes)(void *, size_t);
void (*identity_keygen)(byte_t *);

@ -0,0 +1,12 @@
#ifndef LLARP_ENCRYPTED_FRAME_HPP
#define LLARP_ENCRYPTED_FRAME_HPP
#include <llarp/mem.h>
#include <vector>
namespace llarp
{
typedef std::vector< byte_t > EncryptedFrame;
}
#endif

@ -3,13 +3,14 @@
#include <llarp/bencode.h>
#include <llarp/link.h>
#include <llarp/relay_commit.hpp>
#include <queue>
#include <vector>
namespace llarp
{
typedef std::vector< byte_t > OutboundMessage;
typedef std::vector< byte_t > Message;
struct InboundMessageHandler
{
@ -19,19 +20,37 @@ namespace llarp
static bool
OnKey(dict_reader* r, llarp_buffer_t* buf);
/// start processig message from a link session
bool
ProcessFrom(llarp_link_session* from, llarp_buffer_t buf);
/// called when the message is fully read
/// return true when the message was accepted otherwise returns false
bool
MessageDone();
/// called to send any replies
bool
FlushReplies();
private:
bool
DecodeLIM(llarp_buffer_t key, llarp_buffer_t* buff);
bool
DecodeDHT(llarp_buffer_t key, llarp_buffer_t* buff);
bool
DecodeLRCM(llarp_buffer_t key, llarp_buffer_t* buff);
private:
char msgtype;
bool firstkey;
uint64_t proto;
llarp_router* router;
llarp_link_session* from;
std::queue< OutboundMessage > sendq;
llarp::LR_CommitMessage lrcm;
std::queue< Message > sendq;
};
}

@ -0,0 +1,102 @@
#ifndef LLARP_RELAY_COMMIT_HPP
#define LLARP_RELAY_COMMIT_HPP
#include <llarp/crypto.h>
#include <llarp/encrypted_frame.hpp>
#include <vector>
namespace llarp
{
struct LR_CommitRecord
{
llarp_pubkey_t commkey;
llarp_pubkey_t nextHop;
llarp_tunnel_nonce_t nonce;
uint64_t lifetime;
uint64_t pathid;
uint64_t version;
bool
BDecode(llarp_buffer_t *buf);
bool
BEncode(llarp_buffer_t *buf);
};
struct LR_AcceptRecord
{
uint64_t pathid;
uint64_t version;
std::vector< byte_t > padding;
bool
BDecode(llarp_buffer_t *buf);
bool
BEncode(llarp_buffer_t *buf);
};
struct LR_StatusMessage
{
std::vector< EncryptedFrame > replies;
uint64_t version;
bool
BDecode(llarp_buffer_t *buf);
bool
BEncode(llarp_buffer_t *buf);
};
struct LR_CommitMessage
{
std::vector< EncryptedFrame > frames;
uint64_t version;
void
Clear();
bool
BDecode(llarp_buffer_t *buf);
bool
BEncode(llarp_buffer_t *buf);
};
struct AsyncPathDecryption
{
static void
Decrypted(void *data);
LR_CommitMessage lrcm;
LR_CommitRecord ourRecord;
llarp_threadpool *worker = nullptr;
llarp_crypto *crypto = nullptr;
llarp_logic *logic = nullptr;
llarp_thread_job result;
void
AsyncDecryptOurHop();
};
struct AsyncPathEncryption
{
static void
EncryptedFrame(void *data);
std::vector< LR_CommitRecord > hops;
LR_CommitMessage lrcm;
llarp_threadpool *worker = nullptr;
llarp_crypto *crypto = nullptr;
llarp_logic *logic = nullptr;
llarp_thread_job result;
void
AsyncEncrypt();
private:
void
EncryptNext();
};
}
#endif

@ -0,0 +1,31 @@
#ifndef LLARP_ENCODE_HPP
#define LLARP_ENCODE_HPP
#include "buffer.hpp"
namespace llarp
{
/// encode V as hex to stack
/// null terminate
/// return pointer to base of stack buffer on success otherwise returns
/// nullptr
template < typename V, typename Stack >
const char*
HexEncode(const V& value, Stack& stack)
{
size_t idx = 0;
char* ptr = &stack[0];
char* end = ptr + sizeof(stack);
while(idx < value.size())
{
auto wrote = snprintf(ptr, end - ptr, "%.2x", value[idx]);
if(wrote == -1)
return nullptr;
ptr += wrote;
idx++;
}
*ptr = 0;
return &stack[0];
}
}
#endif

@ -40,17 +40,14 @@ namespace llarp
switch(to->sa_family)
{
case AF_INET:
printf("af_inet\n");
slen = sizeof(struct sockaddr_in);
break;
case AF_INET6:
printf("af_inet6\n");
slen = sizeof(struct sockaddr_in6);
break;
default:
return -1;
}
printf("send %ld via %d\n", sz, fd);
ssize_t sent = ::sendto(fd, data, sz, SOCK_NONBLOCK, to, slen);
if(sent == -1)
perror("sendto()");
@ -90,7 +87,6 @@ struct llarp_epoll_loop : public llarp_ev_loop
result = epoll_wait(epollfd, events, 1024, -1);
if(result > 0)
{
printf("epoll %d\n", result);
int idx = 0;
while(idx < result)
{

@ -290,7 +290,6 @@ namespace iwp
reassemble(std::vector< byte_t > &buffer)
{
auto total = msginfo.totalsize();
printf("reassemble message of size %d\n", total);
buffer.resize(total);
auto fragsz = msginfo.fragsize();
auto ptr = buffer.data();
@ -358,29 +357,7 @@ namespace iwp
}
bool
inbound_frame_complete(uint64_t id)
{
bool success = false;
std::vector< byte_t > msg;
if(rx[id].reassemble(msg))
{
printf("handle message of size: %ld\n", msg.size());
auto buf = llarp::Buffer< decltype(msg) >(msg);
success = router->HandleRecvLinkMessage(parent, buf);
if(success)
{
alive();
if(id == 0)
parent->established(parent);
}
}
else
{
printf("failed to reassemble message %ld\n", id);
}
rx.erase(id);
return success;
}
inbound_frame_complete(uint64_t id);
void
push_ackfor(uint64_t id, uint32_t bitmask)
@ -392,7 +369,6 @@ namespace iwp
// TODO: this assumes big endian
memcpy(buf.data() + 6, &id, 8);
memcpy(buf.data() + 14, &bitmask, 4);
printf("ACK for %ld %d\n", id, bitmask);
}
bool
@ -405,8 +381,6 @@ namespace iwp
return false;
}
sz = hdr.size();
// mark we are alive
alive();
// extract xmit data
xmit x(hdr.data());
@ -431,15 +405,11 @@ namespace iwp
// inserted, put last fragment
itr.first->second.put_lastfrag(hdr.data() + sizeof(x.buffer),
x.lastfrag());
alive();
if(x.numfrags() == 0)
{
printf("short XMIT\n");
push_ackfor(id, 0);
return inbound_frame_complete(id);
}
else
printf("got XMIT with %d fragments\n", x.numfrags());
return true;
}
else
@ -463,51 +433,7 @@ namespace iwp
}
bool
got_acks(frame_header &hdr, size_t sz)
{
if(hdr.size() > sz)
{
printf("invalid ACKS frame size %d > %ld\n", hdr.size(), sz);
return false;
}
sz = hdr.size();
if(sz < 12)
{
printf("invalid ACKS frame size %ld < 12\n", sz);
return false;
}
auto ptr = hdr.data();
uint64_t msgid;
uint32_t bitmask;
memcpy(&msgid, ptr, 8);
memcpy(&bitmask, ptr + 8, 4);
auto itr = tx.find(msgid);
if(itr == tx.end())
{
printf("ACK for missing TX frame: %ld\n", msgid);
return false;
}
alive();
itr->second->ack(bitmask);
if(itr->second->completed())
{
printf("message %ld acknoleged\n", msgid);
delete itr->second;
tx.erase(itr);
}
else
{
printf("message %ld retransmit fragments\n", msgid);
itr->second->retransmit_frags(sendqueue);
}
return true;
}
got_acks(frame_header &hdr, size_t sz);
// queue new outbound message
void
@ -585,7 +511,8 @@ namespace iwp
llarp_link_establish_job *establish_job = nullptr;
uint32_t establish_job_id = 0;
uint32_t establish_job_id = 0;
uint32_t keepalive_timer_id = 0;
llarp::Addr addr;
iwp_async_intro intro;
@ -594,7 +521,7 @@ namespace iwp
frame_state frame;
byte_t token[32];
byte_t workbuf[2048];
byte_t workbuf[256];
enum State
{
@ -711,7 +638,6 @@ namespace iwp
printf("session start verify fail\n");
return;
}
printf("session start verified, sending LIM\n");
self->send_LIM();
}
@ -742,12 +668,50 @@ namespace iwp
printf("failed to encode LIM\n");
}
static void
send_keepalive(void *user)
{
session *self = static_cast< session * >(user);
// all zeros means keepalive
byte_t tmp[64] = {0};
// 8 bytes iwp header overhead
int padsz = rand() % (sizeof(tmp) - 8);
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
if(padsz)
self->crypto->randbytes(buf.base + 8, padsz);
buf.sz -= padsz;
// send frame after encrypting
self->encrypt_frame_async_send(buf.base, buf.sz);
// send another keepalive
self->schedule_keepalive();
}
static void
handle_keepalive_timer(void *user, uint64_t orig, uint64_t left)
{
session *self = static_cast< session * >(user);
self->keepalive_timer_id = 0;
// timeout cancelled
if(left)
return;
llarp_logic_queue_job(self->logic, {self, &send_keepalive});
}
void
schedule_keepalive()
{
keepalive_timer_id = llarp_logic_call_later(
logic, {1000UL, this, &handle_keepalive_timer});
}
void
session_established()
{
printf("session established\n");
EnterState(eEstablished);
llarp_logic_cancel_call(logic, establish_job_id);
schedule_keepalive();
}
void
@ -771,7 +735,8 @@ namespace iwp
bool
timedout(llarp_time_t now, llarp_time_t timeout = SESSION_TIMEOUT)
{
return now - frame.lastEvent >= timeout;
auto diff = now - frame.lastEvent;
return diff >= timeout;
}
static bool
@ -781,10 +746,18 @@ namespace iwp
return static_cast< session * >(s->impl)->timedout(now);
}
static void
handle_session_established(void *user)
{
session *impl = static_cast< session * >(user);
impl->session_established();
}
static void
set_established(llarp_link_session *s)
{
static_cast< session * >(s->impl)->session_established();
session *impl = static_cast< session * >(s->impl);
llarp_logic_queue_job(impl->logic, {impl, &handle_session_established});
}
static void
@ -803,7 +776,6 @@ namespace iwp
printf("introack validation failed\n");
return;
}
printf("introack validated\n");
link->EnterState(eIntroAckRecv);
// copy decrypted token
memcpy(link->token, introack->token, 32);
@ -816,7 +788,6 @@ namespace iwp
session *link = static_cast< session * >(start->user);
llarp_ev_udp_sendto(link->udp, link->addr, start->buf, start->sz);
link->EnterState(eSessionStartSent);
printf("session start sent\n");
}
void
@ -847,6 +818,7 @@ namespace iwp
{
if(self->frame.process(frame->buf + 64, frame->sz - 64))
{
self->frame.alive();
self->pump();
}
else
@ -863,7 +835,6 @@ namespace iwp
{
if(sz > 64)
{
printf("decrypt frame of size %ld\n", sz);
auto frame = alloc_frame(buf, sz);
frame->hook = &handle_frame_decrypt;
iwp_call_async_frame_decrypt(iwp, frame);
@ -876,7 +847,6 @@ namespace iwp
handle_frame_encrypt(iwp_async_frame *frame)
{
session *self = static_cast< session * >(frame->user);
printf("sendto %ld\n", frame->sz);
llarp_ev_udp_sendto(self->udp, self->addr, frame->buf, frame->sz);
delete frame;
}
@ -900,7 +870,6 @@ namespace iwp
void
encrypt_frame_async_send(const void *buf, size_t sz)
{
printf("encrypt frame of size %ld\n", sz);
// 64 bytes frame overhead for nonce and hmac
auto frame = alloc_frame(nullptr, sz + 64);
memcpy(frame->buf + 64, buf, sz);
@ -927,7 +896,6 @@ namespace iwp
session *link = static_cast< session * >(i->user);
if(i->buf)
{
printf("sending introack...\n");
llarp_ev_udp_sendto(link->udp, link->addr, i->buf, i->sz);
link->EnterState(eIntroAckSent);
}
@ -967,7 +935,6 @@ namespace iwp
void
on_intro(const void *buf, size_t sz)
{
printf("iwp intro\n");
if(sz >= sizeof(workbuf))
{
// too big?
@ -997,7 +964,6 @@ namespace iwp
void
on_intro_ack(const void *buf, size_t sz)
{
printf("iwp intro ack\n");
if(sz >= sizeof(workbuf))
{
// too big?
@ -1026,7 +992,6 @@ namespace iwp
session *link = static_cast< session * >(i->user);
if(i->buf)
{
printf("sending intro...\n");
llarp_ev_udp_sendto(link->udp, link->addr, i->buf, i->sz);
link->EnterState(eIntroSent);
}
@ -1065,7 +1030,6 @@ namespace iwp
// randomize w0
if(w0sz)
{
printf("random padding %ld bytes\n", w0sz);
crypto->randbytes(intro.buf + (32 * 3), w0sz);
}
@ -1205,7 +1169,6 @@ namespace iwp
{
auto now = llarp_time_now_ms();
std::set< llarp::Addr > remove;
printf("cleanup dead at %ld\n", now);
{
lock_t lock(m_sessions_Mutex);
for(auto &itr : m_sessions)
@ -1223,6 +1186,8 @@ namespace iwp
printf("remove session for %s\n", addr.to_string().c_str());
session *s = static_cast< session * >(itr->second.impl);
m_sessions.erase(addr);
if(s->keepalive_timer_id)
llarp_logic_cancel_call(logic, s->keepalive_timer_id);
delete s;
}
}
@ -1288,7 +1253,6 @@ namespace iwp
{
server *link = static_cast< server * >(udp->user);
llarp::Addr addr(*saddr);
printf("got %ld from %s\n", sz, addr.to_string().c_str());
session *s = link->ensure_session(addr);
s->recv(buf, sz);
}
@ -1311,6 +1275,87 @@ namespace iwp
}
};
bool
frame_state::inbound_frame_complete(uint64_t id)
{
bool success = false;
std::vector< byte_t > msg;
if(rx[id].reassemble(msg))
{
printf("handle message of size: %ld\n", msg.size());
auto buf = llarp::Buffer< decltype(msg) >(msg);
success = router->HandleRecvLinkMessage(parent, buf);
if(success)
{
alive();
session *impl = static_cast< session * >(parent->impl);
if(id == 0 && impl->state == session::eSessionStartSent)
{
// send our LIM
impl->send_LIM();
}
}
}
else
{
printf("failed to reassemble message %ld\n", id);
}
rx.erase(id);
return success;
}
bool
frame_state::got_acks(frame_header &hdr, size_t sz)
{
if(hdr.size() > sz)
{
printf("invalid ACKS frame size %d > %ld\n", hdr.size(), sz);
return false;
}
sz = hdr.size();
if(sz < 12)
{
printf("invalid ACKS frame size %ld < 12\n", sz);
return false;
}
auto ptr = hdr.data();
uint64_t msgid;
uint32_t bitmask;
memcpy(&msgid, ptr, 8);
memcpy(&bitmask, ptr + 8, 4);
auto itr = tx.find(msgid);
if(itr == tx.end())
{
printf("ACK for missing TX frame: %ld\n", msgid);
return false;
}
alive();
itr->second->ack(bitmask);
if(itr->second->completed())
{
delete itr->second;
tx.erase(itr);
session *impl = static_cast< session * >(parent->impl);
if(impl->state == session::eLIMSent && msgid == 0)
{
// first message acked we are established?
impl->session_established();
}
}
else
{
printf("message %ld retransmit fragments\n", msgid);
itr->second->retransmit_frags(sendqueue);
}
return true;
}
server *
link_alloc(struct llarp_router *router, const char *keyfile,
struct llarp_crypto *crypto, struct llarp_logic *logic,

@ -0,0 +1,41 @@
#ifndef LLARP_LINK_ENCODER_HPP
#define LLARP_LINK_ENCODER_HPP
#include <llarp/bencode.h>
#include <llarp/buffer.h>
#include <llarp/router_contact.h>
namespace llarp
{
/// encode Link Introduce Message onto a buffer
/// if router is nullptr then the LIM's r member is omitted.
bool
EncodeLIM(llarp_buffer_t* buff, llarp_rc* router)
{
if(!bencode_start_dict(buff))
return false;
// message type
if(!bencode_write_bytestring(buff, "a", 1))
return false;
if(!bencode_write_bytestring(buff, "i", 1))
return false;
// router contact
if(router)
{
if(!bencode_write_bytestring(buff, "r", 1))
return false;
if(!llarp_rc_bencode(router, buff))
return false;
}
// version
if(!bencode_write_version_entry(buff))
return false;
return bencode_end(buff);
}
}
#endif

@ -1,6 +1,7 @@
#include <llarp/router_contact.h>
#include <llarp/link_message.hpp>
#include "buffer.hpp"
#include "router.hpp"
namespace llarp
{
@ -46,47 +47,82 @@ namespace llarp
handler->firstkey = false;
return true;
}
// check for not the last element
// check for last element
if(!key)
return true;
return handler->MessageDone();
switch(handler->msgtype)
{
// LIM
// link introduce
case 'i':
if(llarp_buffer_eq(*key, "r"))
{
if(!llarp_rc_bdecode(handler->from->get_remote_router(handler->from),
r->buffer))
{
printf("failed to decode RC\n");
return false;
}
printf("decoded rc\n");
return true;
}
else if(llarp_buffer_eq(*key, "v"))
{
if(!bdecode_read_integer(r->buffer, &handler->proto))
return false;
if(handler->proto != LLARP_PROTO_VERSION)
{
printf("llarp protocol version missmatch\n");
return false;
}
return true;
}
else
{
printf("invalid LIM key: %c\n", *key->cur);
return false;
}
return handler->DecodeLIM(*key, r->buffer);
// immidate dht
case 'd':
return handler->DecodeDHT(*key, r->buffer);
// relay commit
case 'c':
return handler->DecodeLRCM(*key, r->buffer);
// unknown message type
default:
printf("unknown link message type: %c\n", handler->msgtype);
return false;
}
}
bool
InboundMessageHandler::DecodeLIM(llarp_buffer_t key, llarp_buffer_t* buff)
{
if(llarp_buffer_eq(key, "r"))
{
if(!llarp_rc_bdecode(from->get_remote_router(from), buff))
{
printf("failed to decode RC\n");
return false;
}
printf("decoded rc\n");
return true;
}
else if(llarp_buffer_eq(key, "v"))
{
if(!bdecode_read_integer(buff, &proto))
return false;
if(proto != LLARP_PROTO_VERSION)
{
printf("llarp protocol version missmatch\n");
return false;
}
return true;
}
else
{
printf("invalid LIM key: %c\n", *key.cur);
return false;
}
}
bool
InboundMessageHandler::DecodeDHT(llarp_buffer_t key, llarp_buffer_t* buf)
{
return false;
}
bool
InboundMessageHandler::DecodeLRCM(llarp_buffer_t key, llarp_buffer_t* buf)
{
return false;
}
bool
InboundMessageHandler::MessageDone()
{
switch(msgtype)
{
case 'c':
return router->ProcessLRCM(lrcm);
default:
return true;
}
}
bool
InboundMessageHandler::ProcessFrom(llarp_link_session* src,
llarp_buffer_t buf)

@ -6,6 +6,7 @@
#include <llarp/link_message.hpp>
#include "buffer.hpp"
#include "encode.hpp"
#include "net.hpp"
#include "str.hpp"
@ -41,6 +42,12 @@ llarp_router::HandleRecvLinkMessage(llarp_link_session *session,
return false;
}
bool
llarp_router::ProcessLRCM(llarp::LR_CommitMessage msg)
{
return false;
}
void
llarp_router::try_connect(fs::path rcfile)
{
@ -151,9 +158,31 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job)
{
printf("on_try_connect_result\n");
if(job->session)
printf("session made\n");
else
printf("session not made\n");
{
llarp_rc *remote = job->session->get_remote_router(job->session);
if(remote)
{
llarp_router *router = static_cast< llarp_router * >(job->user);
llarp::pubkey pubkey;
memcpy(&pubkey[0], remote->pubkey, 32);
char tmp[68] = {0};
printf("session made with %s\n",
llarp::HexEncode< decltype(pubkey), decltype(tmp) >(pubkey, tmp));
auto itr = router->pendingMessages.find(pubkey);
if(itr != router->pendingMessages.end())
{
// flush pending
for(auto &msg : itr->second)
{
auto buf = llarp::Buffer< decltype(msg) >(msg);
job->session->sendto(job->session, buf);
}
router->pendingMessages.erase(itr);
}
return;
}
}
printf("session not made\n");
}
void
@ -181,6 +210,14 @@ llarp_router::Run()
}
printf("saved router contact\n");
char tmp[68] = {0};
llarp::pubkey ourPubkey;
memcpy(&ourPubkey[0], pubkey(), 32);
printf("we are %s\n",
llarp::HexEncode< llarp::pubkey, decltype(tmp) >(ourPubkey, tmp));
// start links
for(auto link : links)
{

@ -10,6 +10,7 @@
#include <llarp/link_message.hpp>
#include "crypto.hpp"
#include "fs.hpp"
#include "mem.hpp"
@ -53,6 +54,8 @@ struct llarp_router
std::list< llarp_link * > links;
std::map< llarp::pubkey, std::vector< llarp::Message > > pendingMessages;
llarp_router(llarp_alloc *mem);
~llarp_router();
@ -89,6 +92,12 @@ struct llarp_router
bool
has_session_to(const uint8_t *pubkey) const;
void
QueueSendTo(const byte_t *pubkey, const std::vector< llarp::Message > &msgs);
bool
ProcessLRCM(llarp::LR_CommitMessage msg);
static bool
iter_try_connect(llarp_router_link_iter *i, llarp_router *router,
llarp_link *l);

Loading…
Cancel
Save