make it work

pull/3/head
Jeff Becker 6 years ago
parent 5d9fe74e39
commit 6633fb151d
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -66,7 +66,7 @@ testnet-build: testnet-configure
testnet: testnet-build
mkdir -p $(TESTNET_ROOT)
python3 contrib/testnet/genconf.py --bin=$(REPO)/llarpd --svc=30 --clients=300 --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF)
python3 contrib/testnet/genconf.py --bin=$(REPO)/llarpd --svc=5 --clients=10 --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF)
supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF)
test: debug-configure

@ -50,11 +50,10 @@ namespace llarp
private:
typedef std::map< PathID_t, Path* > PathMap_t;
// (tx,rx)
typedef std::tuple< PathMap_t, PathMap_t > PathContainer_t;
size_t m_NumPaths;
PathContainer_t m_Paths;
PathMap_t m_Tx;
PathMap_t m_Rx;
};
} // namespace path

@ -10,7 +10,7 @@ namespace llarp
{
namespace util
{
template < typename T, typename GetTime, llarp_time_t dropMs = 20,
template < typename T, typename GetTime, llarp_time_t dropMs = 5,
llarp_time_t initialIntervalMs = 100 >
struct CoDelQueue
{
@ -47,7 +47,8 @@ namespace llarp
if(lowest > dropMs)
{
// drop
nextTickInterval = initialIntervalMs / std::sqrt(++dropNum);
nextTickInterval += initialIntervalMs / std::sqrt(++dropNum);
llarp::Info("CoDel drop ", nextTickInterval, " ms next interval");
m_Queue.pop();
return;
}

@ -653,6 +653,7 @@ namespace iwp
case eFRAG:
return got_frag(hdr, sz - 6);
default:
llarp::Warn("invalid message header");
return false;
}
}
@ -693,13 +694,10 @@ namespace iwp
uint32_t frames = 0;
bool working = false;
llarp::util::CoDelQueue< iwp_async_frame, FrameGetTime > inboundFrames;
llarp::util::CoDelQueue< iwp_async_frame, FrameGetTime > outboundFrames;
std::mutex m_DecryptedFramesMutex;
std::queue< iwp_async_frame > decryptedFrames;
std::mutex m_EncryptedFramesMutex;
std::queue< iwp_async_frame > encryptedFrames;
uint32_t pump_send_timer_id = 0;
uint32_t pump_recv_timer_id = 0;
@ -794,8 +792,6 @@ namespace iwp
return false;
}
static void
handle_codel_inbound_pump(void *u, uint64_t orig, uint64_t left);
static void
handle_codel_outbound_pump(void *u, uint64_t orig, uint64_t left);
@ -803,12 +799,7 @@ namespace iwp
PumpCrypto();
void
PumpCodelInbound()
{
pump_recv_timer_id = llarp_logic_call_later(
logic,
{inboundFrames.nextTickInterval, this, &handle_codel_inbound_pump});
}
HandleCodelOutboundPump();
void
PumpCodelOutbound()
@ -831,6 +822,7 @@ namespace iwp
frame.pop_next_frame();
}
PumpCrypto();
HandleCodelOutboundPump();
}
// this is called from net thread
@ -885,11 +877,10 @@ namespace iwp
crypto->shorthash(digest, buf);
auto id = frame.txids++;
auto msg = new transit_message(buf, digest, id);
// enter state
EnterState(eLIMSent);
// put into outbound send queue
add_outbound_message(id, msg);
// enter state
EnterState(eLIMSent);
}
else
llarp::Error("LIM Encode failed");
@ -1022,12 +1013,13 @@ namespace iwp
handle_frame_decrypt(iwp_async_frame *frame)
{
session *self = static_cast< session * >(frame->user);
llarp::Debug("rx ", frame->sz, " frames=", self->frames);
llarp::Debug("rx ", frame->sz);
if(frame->success)
{
if(self->frame.process(frame->buf + 64, frame->sz - 64))
{
self->frame.alive();
self->pump();
}
else
llarp::Error("invalid frame from ", self->addr);
@ -1041,7 +1033,10 @@ namespace iwp
{
if(sz > 64)
{
alloc_frame(inboundFrames, buf, sz);
llarp::Debug("decrypt frame ", sz);
auto f = alloc_frame(buf, sz);
f->hook = &handle_frame_decrypt;
iwp_call_async_frame_decrypt(iwp, f);
}
else
llarp::Warn("short packet of ", sz, " bytes");
@ -1050,42 +1045,18 @@ namespace iwp
static void
handle_crypto_pump(void *u);
void
DecryptInboundFrames()
{
std::queue< iwp_async_frame > outq;
std::queue< iwp_async_frame > inq;
inboundFrames.Process(inq);
while(inq.size())
{
auto &front = inq.front();
if(iwp_decrypt_frame(&front))
outq.push(front);
inq.pop();
}
{
std::unique_lock< std::mutex > lock(m_DecryptedFramesMutex);
while(outq.size())
{
decryptedFrames.push(outq.front());
outq.pop();
}
}
}
static void
handle_frame_encrypt(iwp_async_frame *frame)
{
session *self = static_cast< session * >(frame->user);
llarp::Debug("tx ", frame->sz, " frames=", self->frames);
llarp::Debug("tx ", frame->sz);
if(llarp_ev_udp_sendto(self->udp, self->addr, frame->buf, frame->sz)
== -1)
llarp::Warn("sendto failed");
}
template < typename Queue >
iwp_async_frame *
alloc_frame(Queue &q, const void *buf, size_t sz)
alloc_frame(const void *buf, size_t sz)
{
// TODO don't hard code 1500
if(sz > 1500)
@ -1098,9 +1069,7 @@ namespace iwp
frame->sz = sz;
frame->user = this;
frame->sessionkey = sessionkey;
/// TODO: this could be rather slow
frame->created = now;
q.Put(frame);
frame->created = now;
return frame;
}
@ -1108,12 +1077,13 @@ namespace iwp
encrypt_frame_async_send(const void *buf, size_t sz)
{
// 64 bytes frame overhead for nonce and hmac
iwp_async_frame *frame = alloc_frame(outboundFrames, nullptr, sz + 64);
iwp_async_frame *frame = alloc_frame(nullptr, sz + 64);
memcpy(frame->buf + 64, buf, sz);
auto padding = rand() % MAX_PAD;
if(padding)
crypto->randbytes(frame->buf + 64 + sz, padding);
frame->sz += padding;
outboundFrames.Put(frame);
}
void
@ -1136,6 +1106,10 @@ namespace iwp
encryptedFrames.push(q.front());
q.pop();
}
if(encryptedFrames.size() && pump_send_timer_id == 0)
{
PumpCodelOutbound();
}
}
}
@ -1273,8 +1247,7 @@ namespace iwp
state = st;
if(state == eLIMSent || state == eSessionStartSent)
{
PumpCodelInbound();
PumpCodelOutbound();
HandleCodelOutboundPump();
}
}
};
@ -1811,47 +1784,30 @@ namespace iwp
}
void
session::handle_codel_outbound_pump(void *u, uint64_t orig, uint64_t left)
session::HandleCodelOutboundPump()
{
if(left)
return;
session *self = static_cast< session * >(u);
self->pump_send_timer_id = 0;
if(self->timedout(llarp_time_now_ms()))
return;
{
std::unique_lock< std::mutex > lock(self->m_EncryptedFramesMutex);
while(self->encryptedFrames.size())
std::unique_lock< std::mutex > lock(m_EncryptedFramesMutex);
while(encryptedFrames.size())
{
auto &front = self->encryptedFrames.front();
auto &front = encryptedFrames.front();
handle_frame_encrypt(&front);
self->encryptedFrames.pop();
encryptedFrames.pop();
}
}
self->PumpCodelOutbound();
self->PumpCrypto();
PumpCodelOutbound();
}
void
session::handle_codel_inbound_pump(void *u, uint64_t orig, uint64_t left)
session::handle_codel_outbound_pump(void *u, uint64_t orig, uint64_t left)
{
if(left)
return;
session *self = static_cast< session * >(u);
self->pump_recv_timer_id = 0;
self->pump_send_timer_id = 0;
if(self->timedout(llarp_time_now_ms()))
return;
{
std::unique_lock< std::mutex > lock(self->m_DecryptedFramesMutex);
while(self->decryptedFrames.size())
{
auto &front = self->decryptedFrames.front();
handle_frame_decrypt(&front);
self->decryptedFrames.pop();
}
}
self->PumpCodelInbound();
self->PumpCrypto();
self->HandleCodelOutboundPump();
}
void
@ -1865,7 +1821,6 @@ namespace iwp
{
session *self = static_cast< session * >(u);
self->EncryptOutboundFrames();
self->DecryptInboundFrames();
}
void

@ -312,6 +312,7 @@ nodedb_async_load_rc(void *user)
job->loaded = job->nodedb->loadfile(fpath);
if(job->loaded)
{
llarp_rc_clear(&job->rc);
llarp_rc_copy(&job->rc, job->nodedb->getRC(job->pubkey));
}
llarp_logic_queue_job(job->logic, {job, &nodedb_inform_load_rc});

@ -139,7 +139,8 @@ namespace llarp
llarp::Error("failed to send LRCM");
return;
}
ctx->path->status = llarp::path::ePathBuilding;
ctx->path->status = llarp::path::ePathBuilding;
ctx->path->buildStarted = llarp_time_now_ms();
router->paths.AddOwnPath(ctx->pathset, ctx->path);
ctx->user->pathBuildStarted(ctx->user);
}

@ -12,34 +12,36 @@ namespace llarp
bool
PathSet::ShouldBuildMore() const
{
return std::get< 0 >(m_Paths).size() < m_NumPaths;
return m_Tx.size() < m_NumPaths;
}
void
PathSet::ExpirePaths(llarp_time_t now)
{
{
auto& map = std::get< 0 >(m_Paths);
auto itr = map.begin();
while(itr != map.end())
auto itr = m_Rx.begin();
while(itr != m_Rx.end())
{
if(itr->second->Expired(now))
{
itr = map.erase(itr);
itr = m_Rx.erase(itr);
}
else
++itr;
}
}
{
auto& map = std::get< 1 >(m_Paths);
auto itr = map.begin();
while(itr != map.end())
auto itr = m_Tx.begin();
while(itr != m_Tx.end())
{
if(itr->second->Expired(now))
{
// delete path on second iteration
delete itr->second;
itr = map.erase(itr);
itr = m_Tx.erase(itr);
}
else
++itr;
}
}
}
@ -48,9 +50,8 @@ namespace llarp
PathSet::NumInStatus(PathStatus st) const
{
size_t count = 0;
auto& map = std::get< 0 >(m_Paths);
auto itr = map.begin();
while(itr != map.end())
auto itr = m_Tx.begin();
while(itr != m_Tx.end())
{
if(itr->second->status == st)
++count;
@ -62,23 +63,22 @@ namespace llarp
void
PathSet::AddPath(Path* path)
{
std::get< 0 >(m_Paths).emplace(path->TXID(), path);
std::get< 1 >(m_Paths).emplace(path->RXID(), path);
m_Tx.emplace(path->TXID(), path);
m_Rx.emplace(path->RXID(), path);
}
void
PathSet::RemovePath(Path* path)
{
std::get< 0 >(m_Paths).erase(path->TXID());
std::get< 1 >(m_Paths).erase(path->RXID());
m_Tx.erase(path->TXID());
m_Rx.erase(path->RXID());
}
Path*
PathSet::GetByUpstream(const RouterID& remote, const PathID_t& rxid)
{
auto& set = std::get< 1 >(m_Paths);
auto itr = set.begin();
while(itr != set.end())
auto itr = m_Rx.begin();
while(itr != m_Rx.end())
{
if(itr->second->Upstream() == remote)
return itr->second;

@ -356,19 +356,6 @@ llarp_router::HandleExploritoryPathBuildStarted(llarp_pathbuild_job *job)
delete job;
}
// TODO: do we still need this?
void
llarp_router::BuildExploritoryPath()
{
llarp_pathbuild_job *job = new llarp_pathbuild_job;
job->context = explorePool;
job->selectHop = selectHopFunc;
job->hops.numHops = 4;
job->user = this;
job->pathBuildStarted = &HandleExploritoryPathBuildStarted;
llarp_pathbuilder_build_path(job);
}
void
llarp_router::Tick()
{

Loading…
Cancel
Save