pull/5/head
Ryan Tharp 6 years ago
commit dd3c04cff5

@ -57,6 +57,7 @@
"io": "cpp",
"strstream": "cpp",
"numeric": "cpp",
"valarray": "cpp"
"valarray": "cpp",
"*.ipp": "cpp"
}
}

@ -22,8 +22,8 @@ extern "C" {
struct llarp_link;
/**
* wire layer transport session for point to point communication between us and
* another
* wire layer transport session for point to point communication between us
* and another
*/
struct llarp_link_session;
@ -75,8 +75,9 @@ struct llarp_link
const char *(*name)(void);
void (*get_our_address)(struct llarp_link *, struct llarp_ai *);
/*
int (*register_listener)(struct llarp_link *, struct llarp_link_ev_listener);
void (*deregister_listener)(struct llarp_link *, int);
int (*register_listener)(struct llarp_link *, struct
llarp_link_ev_listener); void (*deregister_listener)(struct llarp_link *,
int);
*/
bool (*configure)(struct llarp_link *, struct llarp_ev_loop *, const char *,
int, uint16_t);
@ -101,7 +102,8 @@ llarp_link_initialized(struct llarp_link *link);
struct llarp_link_session
{
void *impl;
/** send an entire message, splits up into smaller pieces and does encryption
/** send an entire message, splits up into smaller pieces and does
* encryption
*/
bool (*sendto)(struct llarp_link_session *, llarp_buffer_t);
/** return true if this session is timed out */

@ -94,7 +94,7 @@ struct llarp_async_verify_rc
struct llarp_threadpool *cryptoworker;
struct llarp_threadpool *diskworker;
/// router contact (should this be a pointer?)
/// router contact
struct llarp_rc rc;
/// result
bool valid;

@ -53,6 +53,9 @@ llarp_rc_verify_sig(struct llarp_crypto *crypto, struct llarp_rc *rc);
void
llarp_rc_copy(struct llarp_rc *dst, const struct llarp_rc *src);
bool
llarp_rc_is_public_router(const struct llarp_rc *const rc);
void
llarp_rc_set_addrs(struct llarp_rc *rc, struct llarp_alloc *mem,
struct llarp_ai_list *addr);

@ -10,7 +10,7 @@
#endif
#ifndef LLARP_VERSION_PATCH
#define LLARP_VERSION_PATCH "0"
#define LLARP_VERSION_PATCH "1"
#endif
#ifndef LLARP_VERSION_NUM

@ -218,6 +218,16 @@ llarp_ai_list_pushback(struct llarp_ai_list *l, struct llarp_ai *ai)
l->list.push_back(*ai);
}
size_t
llarp_ai_list_size(struct llarp_ai_list *l)
{
if(l)
{
return l->list.size();
}
return 0;
}
void
llarp_ai_list_iterate(struct llarp_ai_list *l, struct llarp_ai_list_iter *itr)
{

@ -15,12 +15,10 @@ namespace llarp
{
};
template < typename Mutex_t >
struct DummyLock
{
DummyLock(const Mutex_t& mtx){
DummyLock(const DummyMutex& mtx){};
};
~DummyLock()
{
}
@ -64,8 +62,9 @@ namespace llarp
firstPut = GetTime()(i);
}
template < typename Queue_t >
void
Process(std::queue< T >& result)
Process(Queue_t& result)
{
llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL;
// auto start = llarp_time_now_ms();

@ -443,9 +443,10 @@ namespace iwp
}
bool
operator<(const InboundMessage &other) const
operator>(const InboundMessage &other) const
{
return msgid < other.msgid;
// order in ascending order for codel queue
return msgid > other.msgid;
}
llarp_buffer_t
@ -463,6 +464,15 @@ namespace iwp
}
};
struct OrderCompare
{
bool
operator()(const InboundMessage *left, const InboundMessage *right)
{
return left->msgid < right->msgid;
}
};
struct PutTime
{
void
@ -486,8 +496,7 @@ namespace iwp
typedef std::queue< sendbuf_t * > sendqueue_t;
typedef llarp::util::CoDelQueue<
InboundMessage *, InboundMessage::GetTime, InboundMessage::PutTime,
llarp::util::DummyMutex,
llarp::util::DummyLock< llarp::util::DummyMutex > >
llarp::util::DummyMutex, llarp::util::DummyLock >
recvqueue_t;
llarp_router *router = nullptr;
@ -511,17 +520,27 @@ namespace iwp
bool
process_inbound_queue()
{
std::queue< InboundMessage * > q;
std::priority_queue< InboundMessage *, std::vector< InboundMessage * >,
InboundMessage::OrderCompare >
q;
recvqueue.Process(q);
bool increment = false;
while(q.size())
{
// TODO: is this right?
nextMsgID = std::max(nextMsgID, q.front()->msgid);
if(!router->HandleRecvLinkMessage(parent, q.front()->Buffer()))
llarp::Warn("failed to process inbound message ", q.front()->msgid);
delete q.front();
auto &front = q.top();
// the items are already sorted anyways so this doesn't really do much
nextMsgID = std::max(nextMsgID, front->msgid);
if(!router->HandleRecvLinkMessage(parent, front->Buffer()))
{
llarp::Warn("failed to process inbound message ", front->msgid);
}
delete front;
q.pop();
increment = true;
}
if(increment)
++nextMsgID;
// TODO: this isn't right
return true;
}
@ -1557,11 +1576,11 @@ namespace iwp
{
server *serv = static_cast< server * >(l->impl);
{
lock_t lock(serv->m_Connected_Mutex);
// lock_t lock(serv->m_Connected_Mutex);
auto itr = serv->m_Connected.find(pubkey);
if(itr != serv->m_Connected.end())
{
lock_t innerlock(serv->m_sessions_Mutex);
// lock_t innerlock(serv->m_sessions_Mutex);
auto inner_itr = serv->m_sessions.find(itr->second);
if(inner_itr != serv->m_sessions.end())
{
@ -1774,9 +1793,10 @@ namespace iwp
if(id == nextMsgID)
{
session *impl = static_cast< session * >(parent->impl);
success = router->HandleRecvLinkMessage(parent, buf);
if(id == 0)
{
success = router->HandleRecvLinkMessage(parent, buf);
if(impl->CheckRCValid())
{
if(!impl->IsEstablished())
@ -1784,6 +1804,7 @@ namespace iwp
impl->send_LIM();
impl->session_established();
}
++nextMsgID;
}
else
{
@ -1792,15 +1813,17 @@ namespace iwp
impl->parent->close(impl->parent);
success = false;
}
++nextMsgID;
}
else if(recvqueue.Size() > 2)
else
{
return process_inbound_queue();
recvqueue.Put(new InboundMessage(id, msg));
success = process_inbound_queue();
}
}
else
{
llarp::Warn("out of order message expected ", nextMsgID, " but got ",
id);
recvqueue.Put(new InboundMessage(id, msg));
success = true;
}

@ -2,6 +2,7 @@
#include <llarp/router_contact.h>
#include <llarp/messages/link_intro.hpp>
#include "logger.hpp"
#include "router.hpp"
namespace llarp
{
@ -71,7 +72,7 @@ namespace llarp
bool
LinkIntroMessage::HandleMessage(llarp_router* router) const
{
llarp::Info("got LIM from ", remote);
router->async_verify_RC(RC, !llarp_rc_is_public_router(RC));
return true;
}
}
} // namespace llarp

@ -309,7 +309,7 @@ namespace llarp
Path::Tick(llarp_time_t now, llarp_router* r)
{
auto dlt = now - m_LastLatencyTestTime;
if(dlt > 5000)
if(dlt > 5000 && m_LastLatencyTestID == 0)
{
llarp::routing::PathLatencyMessage latency;
latency.T = rand();
@ -432,6 +432,7 @@ namespace llarp
Latency = llarp_time_now_ms() - m_LastLatencyTestTime;
llarp::Info("path latency is ", Latency, " ms for tx=", TXID(),
" rx=", RXID());
m_LastLatencyTestID = 0;
return true;
}
return false;

@ -348,13 +348,15 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job)
llarp_dht_put_peer(router->dht, &router->validRouters[pk]);
// this was an outbound establish job
if(ctx->establish_job->session)
if(ctx->establish_job)
{
auto session = ctx->establish_job->session;
router->FlushOutboundFor(pk, session->get_parent(session));
// this frees the job
router->pendingEstablishJobs.erase(pk);
}
else // this was an inbound session
router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk));
}
void
@ -482,6 +484,19 @@ llarp_router::SessionClosed(const llarp::RouterID &remote)
validRouters.erase(itr);
}
llarp_link *
llarp_router::GetLinkWithSessionByPubkey(const llarp::RouterID &pubkey)
{
for(auto &link : inboundLinks)
{
if(link->has_session_to(link, pubkey))
return link;
}
if(outboundLink->has_session_to(outboundLink, pubkey))
return outboundLink;
return nullptr;
}
void
llarp_router::FlushOutboundFor(const llarp::RouterID &remote,
llarp_link *chosen)
@ -492,6 +507,11 @@ llarp_router::FlushOutboundFor(const llarp::RouterID &remote,
{
return;
}
if(!chosen)
{
DiscardOutboundFor(remote);
return;
}
while(itr->second.size())
{
auto buf = llarp::StackBuffer< decltype(linkmsg_buffer) >(linkmsg_buffer);
@ -526,7 +546,7 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job)
{
// llarp::Debug("try_connect got session");
auto session = job->session;
router->async_verify_RC(session, false, job);
router->async_verify_RC(session->get_remote_router(session), false, job);
return;
}
// llarp::Debug("try_connect no session");
@ -563,8 +583,7 @@ llarp_router::DiscardOutboundFor(const llarp::RouterID &remote)
}
void
llarp_router::async_verify_RC(llarp_link_session *session,
bool isExpectingClient,
llarp_router::async_verify_RC(llarp_rc *rc, bool isExpectingClient,
llarp_link_establish_job *establish_job)
{
llarp_async_verify_rc *job = new llarp_async_verify_rc;
@ -579,7 +598,7 @@ llarp_router::async_verify_RC(llarp_link_session *session,
job->cryptoworker = tp;
job->diskworker = disk;
llarp_rc_copy(&job->rc, session->get_remote_router(session));
llarp_rc_copy(&job->rc, rc);
if(isExpectingClient)
job->hook = &llarp_router::on_verify_client_rc;
else

@ -190,8 +190,11 @@ struct llarp_router
void
ScheduleTicker(uint64_t i = 1000);
llarp_link *
GetLinkWithSessionByPubkey(const llarp::RouterID &remote);
void
async_verify_RC(llarp_link_session *session, bool isExpectingClient,
async_verify_RC(llarp_rc *rc, bool isExpectingClient,
llarp_link_establish_job *job = nullptr);
static bool

@ -101,6 +101,12 @@ llarp_rc_decode_dict(struct dict_reader *r, llarp_buffer_t *key)
return false;
}
bool
llarp_rc_is_public_router(const struct llarp_rc *const rc)
{
return rc->addrs && llarp_ai_list_size(rc->addrs) > 0;
}
void
llarp_rc_copy(struct llarp_rc *dst, const struct llarp_rc *src)
{

Loading…
Cancel
Save