add custom single threaded allocator for utp buffers

fix up test net stuff
pull/601/head
Jeff Becker 5 years ago
parent 85fcb4bd84
commit 018dd008ec
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -171,7 +171,7 @@ testnet-build: testnet-configure
testnet:
cp $(EXE) $(TESTNET_EXE)
mkdir -p $(TESTNET_ROOT)
$(PYTHON) $(REPO)/contrib/testnet/genconf.py --bin=$(TESTNET_EXE) --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) --connect=4 --ifname=$(TESTNET_IFNAME) --baseport=$(TESTNET_BASEPORT) --ip=$(TESTNET_IP) --netid=$(TESTNET_NETID)
$(PYTHON) $(REPO)/contrib/testnet/genconf.py --bin=$(TESTNET_EXE) --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) --ifname=$(TESTNET_IFNAME) --baseport=$(TESTNET_BASEPORT) --ip=$(TESTNET_IP) --netid=$(TESTNET_NETID)
LLARP_DEBUG=$(TESTNET_DEBUG) supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF)
$(TEST_EXE): debug

@ -45,7 +45,8 @@ def main():
config['router'] = {
'net-threads': '1',
'worker-threads': '4',
'nickname': svcNodeName(nodeid)
'nickname': svcNodeName(nodeid),
'min-connections': "{}".format(args.connect)
}
if args.netid:
config['router']['netid'] = args.netid
@ -72,11 +73,8 @@ def main():
fp = os.path.join(d, 'daemon.ini')
with open(fp, 'w') as f:
config.write(f)
if nodeid == 0:
otherID = 1
else:
otherID = nodeid - 1
f.write("[bootstrap]\nadd-node={}\n".format(os.path.join(basedir,svcNodeName(otherID), 'rc.signed')))
for n in range(args.connect):
f.write("[bootstrap]\nadd-node={}\n".format(os.path.join(basedir,svcNodeName((nodeid + 1 + n) % args.svc), 'rc.signed')))
for nodeid in range(args.clients):
@ -106,14 +104,12 @@ def main():
config['services'] = {
'testnet': hiddenservice
}
fp = os.path.join(d, 'daemon.ini')
fp = os.path.join(d, 'client.ini')
with open(fp, 'w') as f:
config.write(f)
if nodeid == 0:
otherID = 1
else:
otherID = nodeid - 1
f.write("[bootstrap]\nadd-node={}\n".format(os.path.join(basedir,svcNodeName(otherID), 'rc.signed')))
for n in range(args.connect):
otherID = (n + nodeid) % args.svc
f.write("[bootstrap]\nadd-node={}\n".format(os.path.join(basedir,svcNodeName(otherID), 'rc.signed')))
with open(hiddenservice, 'w') as f:
f.write('''[test-service]
tag=test
@ -133,9 +129,9 @@ stdout_logfile_maxbytes=0
process_name = svc-node-%(process_num)03d
numprocs = {}
'''.format(os.path.join(args.dir, 'svc-node-%(process_num)03d'), exe, args.dir, args.svc))
f.write('''[program:client-node]
f.write('''[program:Client-node]
directory = {}
command = {} daemon.ini
command = bash -c "sleep 5 && {} client.ini"
autorestart=true
redirect_stderr=true
#stdout_logfile=/dev/fd/1

@ -648,7 +648,7 @@ namespace llarp
// ourKey should never be in the connected list
// requester is likely in the connected list
// 4 or connection nodes (minus a potential requestor), whatever is less
const size_t want = std::min(size_t(4), nodeCount - 1);
const size_t want = std::min(size_t(4), nodeCount);
llarp::LogDebug("We want ", want, " connected nodes in the DHT");
if(!_nodes->GetManyNearExcluding(t, found, want,
std::set< Key_t >{ourKey, requester}))

@ -39,6 +39,7 @@ namespace llarp
struct TryConnectJob
{
llarp_time_t lastAttempt = 0;
llarp::RouterContact rc;
llarp::ILinkLayer *link;
llarp::Router *router;
@ -53,6 +54,13 @@ struct TryConnectJob
{
}
bool
TimeoutReached() const
{
const auto now = router->Now();
return now > lastAttempt && now - lastAttempt > 5000;
}
void
Failed()
{
@ -69,13 +77,12 @@ struct TryConnectJob
router->FlushOutboundFor(rc.pubkey, link);
}
void
AttemptTimedout()
bool
Timeout()
{
if(ShouldRetry())
{
Attempt();
return;
return Attempt();
}
router->routerProfiling().MarkConnectTimeout(rc.pubkey);
if(router->routerProfiling().IsBad(rc.pubkey))
@ -83,19 +90,19 @@ struct TryConnectJob
if(!router->IsBootstrapNode(rc.pubkey))
router->nodedb()->Remove(rc.pubkey);
}
// delete this
router->pendingEstablishJobs.erase(rc.pubkey);
return true;
}
void
bool
Attempt()
{
--triesLeft;
if(!link->TryEstablishTo(rc))
{
// delete this
router->pendingEstablishJobs.erase(rc.pubkey);
return true;
}
lastAttempt = router->Now();
return false;
}
bool
@ -110,7 +117,8 @@ on_try_connecting(void *u)
{
TryConnectJob *j = static_cast< TryConnectJob * >(u);
j->Attempt();
if(!j->Attempt())
j->router->pendingEstablishJobs.erase(j->rc.pubkey);
}
bool
@ -689,7 +697,8 @@ namespace llarp
auto itr = pendingEstablishJobs.find(session->GetPubKey());
if(itr != pendingEstablishJobs.end())
{
itr->second->AttemptTimedout();
if(itr->second->Timeout())
pendingEstablishJobs.erase(itr);
}
}
@ -1249,6 +1258,16 @@ namespace llarp
}
// expire transit paths
paths.ExpirePaths(now);
{
auto itr = pendingEstablishJobs.begin();
while(itr != pendingEstablishJobs.end())
{
if(itr->second->TimeoutReached() && itr->second->Timeout())
itr = pendingEstablishJobs.erase(itr);
else
++itr;
}
}
{
auto itr = m_PersistingSessions.begin();

@ -701,6 +701,22 @@ namespace llarp
m_PendingServiceLookups.erase(addr);
}
void
Endpoint::HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg,
llarp_async_verify_rc* j)
{
auto itr = m_PendingRouters.find(msg->R[0].pubkey);
if(itr != m_PendingRouters.end())
{
if(j->valid)
itr->second.InformResult(msg->R);
else
itr->second.InformResult({});
m_PendingRouters.erase(itr);
}
delete j;
}
bool
Endpoint::HandleGotRouterMessage(dht::GotRouterMessage_constptr msg)
{
@ -711,16 +727,9 @@ namespace llarp
job->cryptoworker = m_Router->threadpool();
job->diskworker = m_Router->diskworker();
job->logic = m_Router->logic();
job->hook = [=](llarp_async_verify_rc* j) {
auto itr = m_PendingRouters.find(msg->R[0].pubkey);
if(j->valid)
itr->second.InformResult(msg->R);
else
itr->second.InformResult({});
m_PendingRouters.erase(itr);
delete j;
};
job->rc = msg->R[0];
job->hook = std::bind(&Endpoint::HandleVerifyGotRouter, this, msg,
std::placeholders::_1);
job->rc = msg->R[0];
llarp_nodedb_async_verify(job);
}
else

@ -1,6 +1,7 @@
#ifndef LLARP_SERVICE_ENDPOINT_HPP
#define LLARP_SERVICE_ENDPOINT_HPP
#include <dht/messages/gotrouter.hpp>
#include <ev/ev.h>
#include <exit/session.hpp>
#include <net/net.hpp>
@ -21,6 +22,8 @@
#define MIN_SHIFT_INTERVAL (5 * 1000)
#endif
struct llarp_async_verify_rc;
namespace llarp
{
namespace service
@ -322,6 +325,10 @@ namespace llarp
RunIsolatedMainLoop(void*);
private:
void
HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg,
llarp_async_verify_rc* j);
bool
OnLookup(const service::Address& addr, const IntroSet* i,
const RouterID& endpoint); /* */
@ -374,11 +381,11 @@ namespace llarp
{
if(now < started)
return false;
return now - started > 5000;
return now - started > 30000;
}
void
InformResult(const std::vector< RouterContact >& result)
InformResult(std::vector< RouterContact > result)
{
if(handler)
handler(result);

@ -0,0 +1,109 @@
#ifndef LLARP_UTIL_ALLOC_HPP
#define LLARP_UTIL_ALLOC_HPP
#include <memory>
#include <experimental/memory_resource>
#include <bitset>
#include <array>
namespace llarp
{
namespace util
{
/// simple single threaded allocatable super type template
template < typename Value_t, std::size_t maxEntries >
struct AllocPool
{
using Ptr_t = Value_t *;
AllocPool()
{
mem = new Memory();
}
~AllocPool()
{
delete mem;
}
Ptr_t
NewPtr()
{
void *ptr = mem->allocate();
return new(ptr) Value_t();
}
void
DelPtr(Ptr_t p)
{
p->~Value_t();
mem->deallocate(p);
}
bool
Full() const
{
return mem->full();
}
bool
HasRoomFor(size_t numItems)
{
return mem->hasRoomFor(numItems);
}
private:
struct Memory
{
uint8_t _buffer[maxEntries * sizeof(Value_t)];
std::bitset< maxEntries > _allocated = {0};
std::size_t _pos = 0;
bool
full() const
{
return _allocated.size() == _allocated.count();
}
bool
hasRoomFor(size_t num)
{
return _allocated.count() + num <= _allocated.size();
}
void
deallocate(void *ptr)
{
if(ptr == nullptr)
throw std::bad_alloc();
uint8_t *v_ptr = (uint8_t *)ptr;
const std::size_t _idx = (v_ptr - _buffer) / sizeof(Value_t);
_allocated.reset(_idx);
}
[[nodiscard]] void *
allocate()
{
const std::size_t _started = _pos;
while(_allocated.test(_pos))
{
_pos = (_pos + 1) % maxEntries;
if(_pos == _started)
{
// we are full
throw std::bad_alloc();
}
}
_allocated.set(_pos);
Value_t *ptr = (Value_t *)&_buffer[_pos * sizeof(Value_t)];
_pos = (_pos + 1) % maxEntries;
return ptr;
}
};
Memory *mem;
};
} // namespace util
} // namespace llarp
#endif

@ -46,7 +46,7 @@ namespace llarp
void
OStreamLogStream::Print(LogLevel, const char*, const std::string& msg)
{
m_Out << msg;
m_Out << msg << std::flush;
}
} // namespace llarp

@ -7,13 +7,13 @@ namespace llarp
namespace utp
{
bool
InboundMessage::IsExpired(llarp_time_t now) const
_InboundMessage::IsExpired(llarp_time_t now) const
{
return now > lastActive && now - lastActive >= 2000;
}
bool
InboundMessage::AppendData(const byte_t* ptr, uint16_t sz)
_InboundMessage::AppendData(const byte_t* ptr, uint16_t sz)
{
if(buffer.size_left() < sz)
return false;
@ -21,6 +21,8 @@ namespace llarp
buffer.cur += sz;
return true;
}
IBMsgPool_t IBPool;
} // namespace utp
} // namespace llarp

@ -9,6 +9,8 @@
#include <string.h>
#include <util/alloc.hpp>
namespace llarp
{
namespace utp
@ -46,7 +48,7 @@ namespace llarp
using MessageBuffer = AlignedBuffer< MAX_LINK_MSG_SIZE >;
/// pending inbound message being received
struct InboundMessage
struct _InboundMessage
{
/// timestamp of last activity
llarp_time_t lastActive;
@ -56,17 +58,6 @@ namespace llarp
/// for accessing message buffer
llarp_buffer_t buffer;
InboundMessage() : lastActive(0), _msg(), buffer(_msg)
{
}
InboundMessage(const InboundMessage& other)
: lastActive(other.lastActive), _msg(other._msg), buffer(_msg)
{
buffer.cur = buffer.base + (other.buffer.cur - other.buffer.base);
buffer.sz = other.buffer.sz;
}
/// return true if this inbound message can be removed due to expiration
bool
IsExpired(llarp_time_t now) const;
@ -77,13 +68,23 @@ namespace llarp
/// return true on success
bool
AppendData(const byte_t* ptr, uint16_t sz);
_InboundMessage() : lastActive(0), _msg(), buffer(_msg)
{
}
};
inline bool
operator==(const InboundMessage& lhs, const InboundMessage& rhs)
operator==(const _InboundMessage& lhs, const _InboundMessage& rhs)
{
return lhs.buffer.base == rhs.buffer.base;
}
using IBMsgPool_t = util::AllocPool< _InboundMessage, 1024 >;
extern IBMsgPool_t IBPool;
using InboundMessage = _InboundMessage*;
} // namespace utp
} // namespace llarp

@ -9,6 +9,10 @@ namespace llarp
{
namespace utp
{
using SendBufferPool = util::AllocPool< FragmentBuffer, 1024 * 4 >;
static SendBufferPool OBPool;
using namespace std::placeholders;
void
@ -35,8 +39,11 @@ namespace llarp
std::vector< utp_iovec > send;
for(const auto& vec : vecq)
{
expect += vec.iov_len;
send.emplace_back(vec);
if(vec.iov_len)
{
expect += vec.iov_len;
send.emplace_back(vec);
}
}
if(expect)
{
@ -72,8 +79,11 @@ namespace llarp
auto itr = m_RecvMsgs.begin();
while(itr != m_RecvMsgs.end())
{
if(itr->second.IsExpired(now))
if(itr->second->IsExpired(now))
{
IBPool.DelPtr(itr->second);
itr = m_RecvMsgs.erase(itr);
}
else
++itr;
}
@ -264,7 +274,12 @@ namespace llarp
// this means we're stalled
return false;
}
size_t sz = buf.sz;
size_t sz = buf.sz;
if(!OBPool.HasRoomFor(sz / FragmentBodyPayloadSize))
{
LogError("Send buffers are full");
return false;
}
byte_t* ptr = buf.base;
uint32_t msgid = m_NextTXMsgID++;
while(sz)
@ -357,6 +372,8 @@ namespace llarp
utp_set_userdata(sock, nullptr);
sock = nullptr;
}
for(auto& item : m_RecvMsgs)
IBPool.DelPtr(item.second);
}
bool
@ -364,14 +381,15 @@ namespace llarp
uint16_t remaining)
{
sendq.emplace_back();
sendq.emplace_back(OBPool.NewPtr(),
[](FragmentBuffer* ptr) { OBPool.DelPtr(ptr); });
auto& buf = sendq.back();
vecq.emplace_back();
auto& vec = vecq.back();
vec.iov_base = buf.data();
vec.iov_base = buf->data();
vec.iov_len = FragmentBufferSize;
buf.Randomize();
byte_t* noncePtr = buf.data() + FragmentHashSize;
buf->Randomize();
byte_t* noncePtr = buf->data() + FragmentHashSize;
byte_t* body = noncePtr + FragmentNonceSize;
byte_t* base = body;
AlignedBuffer< 24 > A(base);
@ -402,7 +420,7 @@ namespace llarp
payload.cur = payload.base;
payload.sz = FragmentBufferSize - FragmentHashSize;
// key'd hash
if(!OurCrypto()->hmac(buf.data(), payload, txKey))
if(!OurCrypto()->hmac(buf->data(), payload, txKey))
return false;
return MutateKey(txKey, A);
}
@ -539,14 +557,19 @@ namespace llarp
// get message
if(m_RecvMsgs.find(msgid) == m_RecvMsgs.end())
{
m_RecvMsgs.emplace(msgid, InboundMessage{});
if(IBPool.Full())
{
LogError("inbound buffer mempool full");
return false;
}
m_RecvMsgs.emplace(msgid, IBPool.NewPtr());
}
auto itr = m_RecvMsgs.find(msgid);
// add message activity
itr->second.lastActive = parent->Now();
itr->second->lastActive = parent->Now();
// append data
if(!itr->second.AppendData(out.cur, length))
if(!itr->second->AppendData(out.cur, length))
{
LogError("inbound buffer is full");
return false; // not enough room
@ -561,8 +584,8 @@ namespace llarp
if(remaining == 0)
{
// we done with this guy, prune next tick
itr->second.lastActive = 0;
ManagedBuffer buf(itr->second.buffer);
itr->second->lastActive = 0;
ManagedBuffer buf(itr->second->buffer);
// resize
buf.underlying.sz = buf.underlying.cur - buf.underlying.base;
// rewind

@ -14,6 +14,8 @@ namespace llarp
{
struct LinkLayer;
using SendFragmentBuffer = std::shared_ptr< FragmentBuffer >;
struct Session : public ILinkSession
{
/// remote router's rc
@ -42,7 +44,7 @@ namespace llarp
/// send queue for utp
std::deque< utp_iovec > vecq;
/// tx fragment queue
std::deque< FragmentBuffer > sendq;
std::deque< SendFragmentBuffer > sendq;
/// current rx fragment buffer
FragmentBuffer recvBuf;
/// current offset in current rx fragment buffer

Loading…
Cancel
Save