un break (kinda)

pull/15/head
Jeff Becker 6 years ago
parent 60d5277351
commit 6ef89ea8b2
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -43,7 +43,7 @@ clean:
rm -f *.a *.so
debug-configure:
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_COMPILER=$(CC) -DCMAKE_CXX_COMPILER=$(CXX) -DTUNTAP=ON -DJEMALLOC=ON
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_COMPILER=$(CC) -DCMAKE_CXX_COMPILER=$(CXX) -DTUNTAP=ON
release-configure: clean
cmake -GNinja -DSTATIC_LINK=ON -DCMAKE_BUILD_TYPE=Release -DRELEASE_MOTTO="$(shell cat motto.txt)" -DCMAKE_C_COMPILER=$(CC) -DCMAKE_CXX_COMPILER=$(CXX) -DTUNTAP=ON

@ -18,6 +18,7 @@ def clientNodeName(id): return 'client-node-%03d' % id
def main():
ap = AP()
ap.add_argument('--valgrind', type=bool, default=False)
ap.add_argument('--dir', type=str, default='testnet_tmp')
ap.add_argument('--svc', type=int, default=20,
help='number of service nodes')
@ -31,6 +32,11 @@ def main():
args = ap.parse_args()
if args.valgrind:
exe = 'valgrind {}'.format(args.bin)
else:
exe = args.bin
basedir = os.path.abspath(args.dir)
for nodeid in range(args.svc):
@ -107,7 +113,7 @@ stdout_logfile={}/svc-node-%(process_num)03d-log.txt
stdout_logfile_maxbytes=0
process_name = svc-node-%(process_num)03d
numprocs = {}
'''.format(os.path.join(args.dir, 'svc-node-%(process_num)03d'), args.bin, args.dir, args.svc))
'''.format(os.path.join(args.dir, 'svc-node-%(process_num)03d'), exe, args.dir, args.svc))
f.write('''[program:client-node]
directory = {}
command = {}
@ -118,7 +124,7 @@ stdout_logfile={}/client-node-%(process_num)03d-log.txt
stdout_logfile_maxbytes=0
process_name = client-node-%(process_num)03d
numprocs = {}
'''.format(os.path.join(args.dir, 'client-node-%(process_num)03d'),args.bin, args.dir, args.clients))
'''.format(os.path.join(args.dir, 'client-node-%(process_num)03d'), exe, args.dir, args.clients))
f.write('[supervisord]\ndirectory=.\n')

@ -40,7 +40,7 @@ namespace llarp
Configure(llarp_ev_loop* loop, const std::string& ifname, int af,
uint16_t port);
virtual std::unique_ptr< ILinkSession >
virtual ILinkSession*
NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) = 0;
virtual void
@ -118,12 +118,7 @@ namespace llarp
protected:
void
PutSession(const Addr& addr, ILinkSession* s)
{
util::Lock l(m_SessionsMutex);
m_Sessions.insert(
std::make_pair(addr, std::unique_ptr< ILinkSession >(s)));
}
PutSession(const Addr& addr, ILinkSession* s);
llarp_logic* m_Logic = nullptr;
Addr m_ourAddr;
@ -132,8 +127,7 @@ namespace llarp
util::Mutex m_LinksMutex;
util::Mutex m_SessionsMutex;
std::unordered_map< PubKey, Addr, PubKey::Hash > m_Links;
std::unordered_map< Addr, std::unique_ptr< ILinkSession >, Addr::Hash >
m_Sessions;
std::unordered_map< Addr, ILinkSession*, Addr::Hash > m_Sessions;
};
} // namespace llarp

@ -53,7 +53,18 @@ namespace llarp
++itr;
}
else
{
{
util::Lock lock(m_LinksMutex);
auto i = m_Links.find(itr->second->GetPubKey());
if(i != m_Links.end())
{
m_Links.erase(i);
}
}
delete itr->second;
itr = m_Sessions.erase(itr);
}
}
}
@ -83,9 +94,11 @@ namespace llarp
llarp::Addr addr(to);
auto itr = m_Sessions.find(addr);
if(itr == m_Sessions.end())
m_Sessions
.insert(std::make_pair(addr, std::move(NewOutboundSession(rc, to))))
.first->second->Start();
{
auto s = NewOutboundSession(rc, to);
s->Start();
m_Sessions.emplace(addr, s);
}
}
bool
@ -120,6 +133,7 @@ namespace llarp
if(itr == m_Sessions.end())
return;
itr->second->SendClose();
delete itr->second;
m_Sessions.erase(itr);
}
}
@ -155,12 +169,17 @@ namespace llarp
if(itr == m_Links.end())
return false;
addr = itr->second;
llarp::LogDebug("found addr for ", remote, ", ", addr);
}
{
util::Lock l(m_SessionsMutex);
auto itr = m_Sessions.find(addr);
if(itr == m_Sessions.end())
{
llarp::LogWarn("no session to ", addr, " for ", remote);
return false;
}
llarp::LogDebug("SendMessageBuffer ", buf.sz, "bytes");
result = itr->second->SendMessageBuffer(buf);
}
return result;
@ -189,6 +208,13 @@ namespace llarp
return m_SecretKey;
}
void
ILinkLayer::PutSession(const Addr& addr, ILinkSession* s)
{
util::Lock l(m_SessionsMutex);
m_Sessions.emplace(addr, s);
}
bool
ILinkLayer::EnsureKeys(const char* f)
{
@ -225,7 +251,18 @@ namespace llarp
++itr;
}
else
{
{
util::Lock lock(m_LinksMutex);
auto i = m_Links.find(itr->second->GetPubKey());
if(i != m_Links.end())
{
m_Links.erase(i);
}
}
delete itr->second;
itr = m_Sessions.erase(itr);
}
}
ScheduleTick(interval);
}

@ -38,6 +38,12 @@ namespace llarp
size_t recvBufOffset;
std::vector< byte_t > recvMsg;
void
Alive()
{
lastActive = llarp_time_now_ms();
}
/// base
BaseSession(llarp_router* r);
@ -69,6 +75,7 @@ namespace llarp
parent = p;
EnterState(eLinkEstablished);
llarp::LogDebug("link established with ", remoteAddr);
Alive();
}
void
@ -78,22 +85,27 @@ namespace llarp
virtual ~BaseSession();
void
PumpWrite(utp_socket* s)
PumpWrite()
{
// TODO: use utp_writev
while(sendq.size())
{
auto& front = sendq.front();
write_ll(s, front.data(), front.size());
write_ll(front.data(), front.size());
sendq.pop();
}
}
void
write_ll(utp_socket* s, void* buf, size_t sz)
write_ll(void* buf, size_t sz)
{
if(sock == nullptr)
{
llarp::LogWarn("write_ll failed: no socket");
return;
}
llarp::LogDebug("utp_write ", sz, " bytes to ", remoteAddr);
ssize_t wrote = utp_write(s, buf, sz);
ssize_t wrote = utp_write(sock, buf, sz);
if(wrote < 0)
{
llarp::LogWarn("utp_write returned ", wrote);
@ -111,8 +123,11 @@ namespace llarp
bool
QueueWriteBuffers(llarp_buffer_t buf)
{
llarp::LogDebug("write ", buf.sz, " bytes to ", remoteAddr);
if(state != eSessionReady)
{
llarp::LogWarn("failed to send ", buf.sz,
" bytes on non ready session state=", state);
return false;
}
size_t sz = buf.sz;
@ -178,7 +193,7 @@ namespace llarp
llarp_buffer_put_uint32(&buf, LLARP_PROTO_VERSION);
llarp_buffer_put_uint32(&buf, sz);
// send it
write_ll(s, recvBuf.data(), sz + (sizeof(uint32_t) * 2));
write_ll(recvBuf.data(), sz + (sizeof(uint32_t) * 2));
sock = s;
}
@ -211,6 +226,8 @@ namespace llarp
void
SendClose()
{
if(sock)
Close(sock);
}
void
@ -232,6 +249,7 @@ namespace llarp
bool
Recv(const void* buf, size_t sz)
{
Alive();
const byte_t* ptr = (const byte_t*)buf;
llarp::LogDebug("utp read ", sz, " from ", remoteAddr);
while(sz + recvBufOffset > FragmentBufferSize)
@ -262,7 +280,7 @@ namespace llarp
}
const PubKey&
GetPubKey() const
RemotePubKey() const
{
return remoteRC.pubkey;
}
@ -334,8 +352,9 @@ namespace llarp
void
Pump()
{
ILinkLayer::Pump();
utp_check_timeouts(_utp_ctx);
utp_issue_deferred_acks(_utp_ctx);
ILinkLayer::Pump();
}
void
@ -352,8 +371,7 @@ namespace llarp
router->crypto.encryption_keygen(k);
return true;
}
std::unique_ptr< ILinkSession >
ILinkSession*
NewOutboundSession(const RouterContact& rc, const AddressInfo& addr);
utp_socket*
@ -381,11 +399,13 @@ namespace llarp
TimedOut = [&](llarp_time_t now) -> bool {
return this->IsTimedOut(now);
};
GetPubKey = std::bind(&BaseSession::RemotePubKey, this);
lastActive = llarp_time_now_ms();
Pump = [&]() { PumpWrite(this->sock); };
Pump = [&]() { PumpWrite(); };
Tick = std::bind(&BaseSession::TickImpl, this, std::placeholders::_1);
SendMessageBuffer = std::bind(&BaseSession::QueueWriteBuffers, this,
std::placeholders::_1);
SendMessageBuffer = [&](llarp_buffer_t buf) -> bool {
return this->QueueWriteBuffers(buf);
};
IsEstablished = [&]() { return this->state == eSessionReady; };
HandleLinkIntroMessage = [](const LinkIntroMessage*) -> bool {
return false;
@ -401,6 +421,7 @@ namespace llarp
remoteRC = rc;
sock = s;
assert(utp_set_userdata(sock, this) == this);
assert(s == sock);
remoteAddr = addr;
Start = std::bind(&BaseSession::Connect, this);
}
@ -410,6 +431,7 @@ namespace llarp
{
remoteRC.Clear();
sock = s;
assert(s == sock);
assert(utp_set_userdata(sock, this) == this);
remoteAddr = addr;
Start = []() {};
@ -423,14 +445,18 @@ namespace llarp
BaseSession::~BaseSession()
{
if(sock)
{
utp_close(sock);
utp_set_userdata(sock, nullptr);
}
}
std::unique_ptr< ILinkSession >
ILinkSession*
LinkLayer::NewOutboundSession(const RouterContact& rc,
const AddressInfo& addr)
{
return std::make_unique< BaseSession >(
router, utp_create_socket(_utp_ctx), rc, addr);
return new BaseSession(router, utp_create_socket(_utp_ctx), rc, addr);
}
uint64
@ -440,9 +466,11 @@ namespace llarp
static_cast< BaseSession* >(utp_get_userdata(arg->socket));
if(self)
{
assert(self->sock);
assert(self->sock == arg->socket);
if(self->state == BaseSession::eSessionReady)
if(self->state == BaseSession::eClose)
{
return 0;
}
else if(self->state == BaseSession::eSessionReady)
self->Recv(arg->buf, arg->len);
else if(self->state == BaseSession::eLinkEstablished)
{
@ -466,15 +494,20 @@ namespace llarp
static_cast< LinkLayer* >(utp_context_get_userdata(arg->context));
BaseSession* session =
static_cast< BaseSession* >(utp_get_userdata(arg->socket));
if(arg->state == UTP_STATE_CONNECT)
{
assert(session->sock);
assert(session->sock == arg->socket);
session->OutboundLinkEstablished(l);
}
else if(arg->state == UTP_STATE_EOF)
if(session)
{
session->SendClose();
if(arg->state == UTP_STATE_CONNECT)
{
if(session->state == BaseSession::eClose)
{
return 0;
}
session->OutboundLinkEstablished(l);
}
else if(arg->state == UTP_STATE_EOF)
{
session->SendClose();
}
}
return 0;
}
@ -507,6 +540,7 @@ namespace llarp
uint32_t sz, bool isLastFragment)
{
llarp::LogDebug("encrypt then hash ", sz, " bytes");
buf.Randomize();
const byte_t* nonce = buf.data() + FragmentHashSize;
byte_t* body = buf.data() + FragmentOverheadSize;
@ -525,8 +559,8 @@ namespace llarp
void
BaseSession::EnterState(State st)
{
state = st;
lastActive = llarp_time_now_ms();
Alive();
state = st;
if(st == eSessionReady)
{
parent->MapAddr(this->remoteAddr, remoteRC.pubkey);
@ -589,6 +623,7 @@ namespace llarp
BaseSession::RecvHandshake(const void* buf, size_t bufsz, LinkLayer* p,
utp_socket* s)
{
Alive();
size_t sz = bufsz;
parent = p;
sock = s;

@ -108,7 +108,7 @@ llarp_router_try_connect(struct llarp_router *router,
}
void
llarp_router::HandleLinkSessionEstablished(const llarp::RouterContact &rc)
llarp_router::HandleLinkSessionEstablished(llarp::RouterContact rc)
{
async_verify_RC(rc);
}
@ -154,10 +154,8 @@ llarp_router::PersistSessionUntil(const llarp::RouterID &remote,
bool
llarp_router::SendToOrQueue(const llarp::RouterID &remote,
const llarp::ILinkMessage *m)
const llarp::ILinkMessage *msg)
{
std::unique_ptr< const llarp::ILinkMessage > msg =
std::unique_ptr< const llarp::ILinkMessage >(m);
llarp::ILinkLayer *chosen = nullptr;
if(inboundLinks.size() == 0)
@ -338,10 +336,13 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job)
{
router->validRouters.erase(pk);
}
router->validRouters.insert(std::make_pair(pk, job->rc));
llarp::RouterContact rc = job->rc;
router->validRouters.insert(std::make_pair(pk, rc));
// track valid router in dht
router->dht->impl.nodes->PutNode(job->rc);
router->dht->impl.nodes->PutNode(rc);
// this was an outbound establish job
if(ctx->establish_job)
@ -449,8 +450,7 @@ llarp_router::Tick()
}
void
llarp_router::SendTo(llarp::RouterID remote,
std::unique_ptr< const llarp::ILinkMessage > &msg,
llarp_router::SendTo(llarp::RouterID remote, const llarp::ILinkMessage *msg,
llarp::ILinkLayer *selected)
{
llarp_buffer_t buf =
@ -465,9 +465,11 @@ llarp_router::SendTo(llarp::RouterID remote,
// set size of message
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
llarp::LogDebug("send ", buf.sz, " bytes to ", remote);
if(selected)
{
selected->SendTo(remote, buf);
if(!selected->SendTo(remote, buf))
llarp::LogWarn("message to ", remote, " was dropped");
return;
}
bool sent = outboundLink->SendTo(remote, buf);

@ -108,8 +108,7 @@ struct llarp_router
llarp_router();
virtual ~llarp_router();
void
HandleLinkSessionEstablished(const llarp::RouterContact &);
void HandleLinkSessionEstablished(llarp::RouterContact);
bool
HandleRecvLinkMessageBuffer(llarp::ILinkSession *from, llarp_buffer_t msg);
@ -176,8 +175,7 @@ struct llarp_router
/// sendto or drop
void
SendTo(llarp::RouterID remote,
std::unique_ptr< const llarp::ILinkMessage > &msg,
SendTo(llarp::RouterID remote, const llarp::ILinkMessage *msg,
llarp::ILinkLayer *chosen);
/// manually flush outbound message queue for just 1 router

Loading…
Cancel
Save