From 6ef89ea8b2d1a4a8710b6027d415ce3df3cc6f66 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 6 Sep 2018 09:16:24 -0400 Subject: [PATCH] un break (kinda) --- Makefile | 2 +- contrib/testnet/genconf.py | 10 +++- include/llarp/link/server.hpp | 12 ++--- llarp/link/server.cpp | 43 +++++++++++++++-- llarp/link/utp.cpp | 91 ++++++++++++++++++++++++----------- llarp/router.cpp | 20 ++++---- llarp/router.hpp | 6 +-- 7 files changed, 128 insertions(+), 56 deletions(-) diff --git a/Makefile b/Makefile index af7ace1cf..2b8c6a5c2 100644 --- a/Makefile +++ b/Makefile @@ -43,7 +43,7 @@ clean: rm -f *.a *.so debug-configure: - cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_COMPILER=$(CC) -DCMAKE_CXX_COMPILER=$(CXX) -DTUNTAP=ON -DJEMALLOC=ON + cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_COMPILER=$(CC) -DCMAKE_CXX_COMPILER=$(CXX) -DTUNTAP=ON release-configure: clean cmake -GNinja -DSTATIC_LINK=ON -DCMAKE_BUILD_TYPE=Release -DRELEASE_MOTTO="$(shell cat motto.txt)" -DCMAKE_C_COMPILER=$(CC) -DCMAKE_CXX_COMPILER=$(CXX) -DTUNTAP=ON diff --git a/contrib/testnet/genconf.py b/contrib/testnet/genconf.py index dca190f2a..05bc50c23 100644 --- a/contrib/testnet/genconf.py +++ b/contrib/testnet/genconf.py @@ -18,6 +18,7 @@ def clientNodeName(id): return 'client-node-%03d' % id def main(): ap = AP() + ap.add_argument('--valgrind', type=bool, default=False) ap.add_argument('--dir', type=str, default='testnet_tmp') ap.add_argument('--svc', type=int, default=20, help='number of service nodes') @@ -31,6 +32,11 @@ def main(): args = ap.parse_args() + if args.valgrind: + exe = 'valgrind {}'.format(args.bin) + else: + exe = args.bin + basedir = os.path.abspath(args.dir) for nodeid in range(args.svc): @@ -107,7 +113,7 @@ stdout_logfile={}/svc-node-%(process_num)03d-log.txt stdout_logfile_maxbytes=0 process_name = svc-node-%(process_num)03d numprocs = {} -'''.format(os.path.join(args.dir, 'svc-node-%(process_num)03d'), args.bin, args.dir, args.svc)) +'''.format(os.path.join(args.dir, 'svc-node-%(process_num)03d'), exe, args.dir, args.svc)) f.write('''[program:client-node] directory = {} command = {} @@ -118,7 +124,7 @@ stdout_logfile={}/client-node-%(process_num)03d-log.txt stdout_logfile_maxbytes=0 process_name = client-node-%(process_num)03d numprocs = {} -'''.format(os.path.join(args.dir, 'client-node-%(process_num)03d'),args.bin, args.dir, args.clients)) +'''.format(os.path.join(args.dir, 'client-node-%(process_num)03d'), exe, args.dir, args.clients)) f.write('[supervisord]\ndirectory=.\n') diff --git a/include/llarp/link/server.hpp b/include/llarp/link/server.hpp index 32e044e45..a3e8aff2b 100644 --- a/include/llarp/link/server.hpp +++ b/include/llarp/link/server.hpp @@ -40,7 +40,7 @@ namespace llarp Configure(llarp_ev_loop* loop, const std::string& ifname, int af, uint16_t port); - virtual std::unique_ptr< ILinkSession > + virtual ILinkSession* NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) = 0; virtual void @@ -118,12 +118,7 @@ namespace llarp protected: void - PutSession(const Addr& addr, ILinkSession* s) - { - util::Lock l(m_SessionsMutex); - m_Sessions.insert( - std::make_pair(addr, std::unique_ptr< ILinkSession >(s))); - } + PutSession(const Addr& addr, ILinkSession* s); llarp_logic* m_Logic = nullptr; Addr m_ourAddr; @@ -132,8 +127,7 @@ namespace llarp util::Mutex m_LinksMutex; util::Mutex m_SessionsMutex; std::unordered_map< PubKey, Addr, PubKey::Hash > m_Links; - std::unordered_map< Addr, std::unique_ptr< ILinkSession >, Addr::Hash > - m_Sessions; + std::unordered_map< Addr, ILinkSession*, Addr::Hash > m_Sessions; }; } // namespace llarp diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index b97542b7a..e68dcac86 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -53,7 +53,18 @@ namespace llarp ++itr; } else + { + { + util::Lock lock(m_LinksMutex); + auto i = m_Links.find(itr->second->GetPubKey()); + if(i != m_Links.end()) + { + m_Links.erase(i); + } + } + delete itr->second; itr = m_Sessions.erase(itr); + } } } @@ -83,9 +94,11 @@ namespace llarp llarp::Addr addr(to); auto itr = m_Sessions.find(addr); if(itr == m_Sessions.end()) - m_Sessions - .insert(std::make_pair(addr, std::move(NewOutboundSession(rc, to)))) - .first->second->Start(); + { + auto s = NewOutboundSession(rc, to); + s->Start(); + m_Sessions.emplace(addr, s); + } } bool @@ -120,6 +133,7 @@ namespace llarp if(itr == m_Sessions.end()) return; itr->second->SendClose(); + delete itr->second; m_Sessions.erase(itr); } } @@ -155,12 +169,17 @@ namespace llarp if(itr == m_Links.end()) return false; addr = itr->second; + llarp::LogDebug("found addr for ", remote, ", ", addr); } { util::Lock l(m_SessionsMutex); auto itr = m_Sessions.find(addr); if(itr == m_Sessions.end()) + { + llarp::LogWarn("no session to ", addr, " for ", remote); return false; + } + llarp::LogDebug("SendMessageBuffer ", buf.sz, "bytes"); result = itr->second->SendMessageBuffer(buf); } return result; @@ -189,6 +208,13 @@ namespace llarp return m_SecretKey; } + void + ILinkLayer::PutSession(const Addr& addr, ILinkSession* s) + { + util::Lock l(m_SessionsMutex); + m_Sessions.emplace(addr, s); + } + bool ILinkLayer::EnsureKeys(const char* f) { @@ -225,7 +251,18 @@ namespace llarp ++itr; } else + { + { + util::Lock lock(m_LinksMutex); + auto i = m_Links.find(itr->second->GetPubKey()); + if(i != m_Links.end()) + { + m_Links.erase(i); + } + } + delete itr->second; itr = m_Sessions.erase(itr); + } } ScheduleTick(interval); } diff --git a/llarp/link/utp.cpp b/llarp/link/utp.cpp index 2d9ed910f..56b6361e0 100644 --- a/llarp/link/utp.cpp +++ b/llarp/link/utp.cpp @@ -38,6 +38,12 @@ namespace llarp size_t recvBufOffset; std::vector< byte_t > recvMsg; + void + Alive() + { + lastActive = llarp_time_now_ms(); + } + /// base BaseSession(llarp_router* r); @@ -69,6 +75,7 @@ namespace llarp parent = p; EnterState(eLinkEstablished); llarp::LogDebug("link established with ", remoteAddr); + Alive(); } void @@ -78,22 +85,27 @@ namespace llarp virtual ~BaseSession(); void - PumpWrite(utp_socket* s) + PumpWrite() { // TODO: use utp_writev while(sendq.size()) { auto& front = sendq.front(); - write_ll(s, front.data(), front.size()); + write_ll(front.data(), front.size()); sendq.pop(); } } void - write_ll(utp_socket* s, void* buf, size_t sz) + write_ll(void* buf, size_t sz) { + if(sock == nullptr) + { + llarp::LogWarn("write_ll failed: no socket"); + return; + } llarp::LogDebug("utp_write ", sz, " bytes to ", remoteAddr); - ssize_t wrote = utp_write(s, buf, sz); + ssize_t wrote = utp_write(sock, buf, sz); if(wrote < 0) { llarp::LogWarn("utp_write returned ", wrote); @@ -111,8 +123,11 @@ namespace llarp bool QueueWriteBuffers(llarp_buffer_t buf) { + llarp::LogDebug("write ", buf.sz, " bytes to ", remoteAddr); if(state != eSessionReady) { + llarp::LogWarn("failed to send ", buf.sz, + " bytes on non ready session state=", state); return false; } size_t sz = buf.sz; @@ -178,7 +193,7 @@ namespace llarp llarp_buffer_put_uint32(&buf, LLARP_PROTO_VERSION); llarp_buffer_put_uint32(&buf, sz); // send it - write_ll(s, recvBuf.data(), sz + (sizeof(uint32_t) * 2)); + write_ll(recvBuf.data(), sz + (sizeof(uint32_t) * 2)); sock = s; } @@ -211,6 +226,8 @@ namespace llarp void SendClose() { + if(sock) + Close(sock); } void @@ -232,6 +249,7 @@ namespace llarp bool Recv(const void* buf, size_t sz) { + Alive(); const byte_t* ptr = (const byte_t*)buf; llarp::LogDebug("utp read ", sz, " from ", remoteAddr); while(sz + recvBufOffset > FragmentBufferSize) @@ -262,7 +280,7 @@ namespace llarp } const PubKey& - GetPubKey() const + RemotePubKey() const { return remoteRC.pubkey; } @@ -334,8 +352,9 @@ namespace llarp void Pump() { - ILinkLayer::Pump(); + utp_check_timeouts(_utp_ctx); utp_issue_deferred_acks(_utp_ctx); + ILinkLayer::Pump(); } void @@ -352,8 +371,7 @@ namespace llarp router->crypto.encryption_keygen(k); return true; } - - std::unique_ptr< ILinkSession > + ILinkSession* NewOutboundSession(const RouterContact& rc, const AddressInfo& addr); utp_socket* @@ -381,11 +399,13 @@ namespace llarp TimedOut = [&](llarp_time_t now) -> bool { return this->IsTimedOut(now); }; + GetPubKey = std::bind(&BaseSession::RemotePubKey, this); lastActive = llarp_time_now_ms(); - Pump = [&]() { PumpWrite(this->sock); }; + Pump = [&]() { PumpWrite(); }; Tick = std::bind(&BaseSession::TickImpl, this, std::placeholders::_1); - SendMessageBuffer = std::bind(&BaseSession::QueueWriteBuffers, this, - std::placeholders::_1); + SendMessageBuffer = [&](llarp_buffer_t buf) -> bool { + return this->QueueWriteBuffers(buf); + }; IsEstablished = [&]() { return this->state == eSessionReady; }; HandleLinkIntroMessage = [](const LinkIntroMessage*) -> bool { return false; @@ -401,6 +421,7 @@ namespace llarp remoteRC = rc; sock = s; assert(utp_set_userdata(sock, this) == this); + assert(s == sock); remoteAddr = addr; Start = std::bind(&BaseSession::Connect, this); } @@ -410,6 +431,7 @@ namespace llarp { remoteRC.Clear(); sock = s; + assert(s == sock); assert(utp_set_userdata(sock, this) == this); remoteAddr = addr; Start = []() {}; @@ -423,14 +445,18 @@ namespace llarp BaseSession::~BaseSession() { + if(sock) + { + utp_close(sock); + utp_set_userdata(sock, nullptr); + } } - std::unique_ptr< ILinkSession > + ILinkSession* LinkLayer::NewOutboundSession(const RouterContact& rc, const AddressInfo& addr) { - return std::make_unique< BaseSession >( - router, utp_create_socket(_utp_ctx), rc, addr); + return new BaseSession(router, utp_create_socket(_utp_ctx), rc, addr); } uint64 @@ -440,9 +466,11 @@ namespace llarp static_cast< BaseSession* >(utp_get_userdata(arg->socket)); if(self) { - assert(self->sock); - assert(self->sock == arg->socket); - if(self->state == BaseSession::eSessionReady) + if(self->state == BaseSession::eClose) + { + return 0; + } + else if(self->state == BaseSession::eSessionReady) self->Recv(arg->buf, arg->len); else if(self->state == BaseSession::eLinkEstablished) { @@ -466,15 +494,20 @@ namespace llarp static_cast< LinkLayer* >(utp_context_get_userdata(arg->context)); BaseSession* session = static_cast< BaseSession* >(utp_get_userdata(arg->socket)); - if(arg->state == UTP_STATE_CONNECT) - { - assert(session->sock); - assert(session->sock == arg->socket); - session->OutboundLinkEstablished(l); - } - else if(arg->state == UTP_STATE_EOF) + if(session) { - session->SendClose(); + if(arg->state == UTP_STATE_CONNECT) + { + if(session->state == BaseSession::eClose) + { + return 0; + } + session->OutboundLinkEstablished(l); + } + else if(arg->state == UTP_STATE_EOF) + { + session->SendClose(); + } } return 0; } @@ -507,6 +540,7 @@ namespace llarp uint32_t sz, bool isLastFragment) { + llarp::LogDebug("encrypt then hash ", sz, " bytes"); buf.Randomize(); const byte_t* nonce = buf.data() + FragmentHashSize; byte_t* body = buf.data() + FragmentOverheadSize; @@ -525,8 +559,8 @@ namespace llarp void BaseSession::EnterState(State st) { - state = st; - lastActive = llarp_time_now_ms(); + Alive(); + state = st; if(st == eSessionReady) { parent->MapAddr(this->remoteAddr, remoteRC.pubkey); @@ -589,6 +623,7 @@ namespace llarp BaseSession::RecvHandshake(const void* buf, size_t bufsz, LinkLayer* p, utp_socket* s) { + Alive(); size_t sz = bufsz; parent = p; sock = s; diff --git a/llarp/router.cpp b/llarp/router.cpp index 7e27e27b4..b888a9774 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -108,7 +108,7 @@ llarp_router_try_connect(struct llarp_router *router, } void -llarp_router::HandleLinkSessionEstablished(const llarp::RouterContact &rc) +llarp_router::HandleLinkSessionEstablished(llarp::RouterContact rc) { async_verify_RC(rc); } @@ -154,10 +154,8 @@ llarp_router::PersistSessionUntil(const llarp::RouterID &remote, bool llarp_router::SendToOrQueue(const llarp::RouterID &remote, - const llarp::ILinkMessage *m) + const llarp::ILinkMessage *msg) { - std::unique_ptr< const llarp::ILinkMessage > msg = - std::unique_ptr< const llarp::ILinkMessage >(m); llarp::ILinkLayer *chosen = nullptr; if(inboundLinks.size() == 0) @@ -338,10 +336,13 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job) { router->validRouters.erase(pk); } - router->validRouters.insert(std::make_pair(pk, job->rc)); + + llarp::RouterContact rc = job->rc; + + router->validRouters.insert(std::make_pair(pk, rc)); // track valid router in dht - router->dht->impl.nodes->PutNode(job->rc); + router->dht->impl.nodes->PutNode(rc); // this was an outbound establish job if(ctx->establish_job) @@ -449,8 +450,7 @@ llarp_router::Tick() } void -llarp_router::SendTo(llarp::RouterID remote, - std::unique_ptr< const llarp::ILinkMessage > &msg, +llarp_router::SendTo(llarp::RouterID remote, const llarp::ILinkMessage *msg, llarp::ILinkLayer *selected) { llarp_buffer_t buf = @@ -465,9 +465,11 @@ llarp_router::SendTo(llarp::RouterID remote, // set size of message buf.sz = buf.cur - buf.base; buf.cur = buf.base; + llarp::LogDebug("send ", buf.sz, " bytes to ", remote); if(selected) { - selected->SendTo(remote, buf); + if(!selected->SendTo(remote, buf)) + llarp::LogWarn("message to ", remote, " was dropped"); return; } bool sent = outboundLink->SendTo(remote, buf); diff --git a/llarp/router.hpp b/llarp/router.hpp index e32bd9f89..786bc8756 100644 --- a/llarp/router.hpp +++ b/llarp/router.hpp @@ -108,8 +108,7 @@ struct llarp_router llarp_router(); virtual ~llarp_router(); - void - HandleLinkSessionEstablished(const llarp::RouterContact &); + void HandleLinkSessionEstablished(llarp::RouterContact); bool HandleRecvLinkMessageBuffer(llarp::ILinkSession *from, llarp_buffer_t msg); @@ -176,8 +175,7 @@ struct llarp_router /// sendto or drop void - SendTo(llarp::RouterID remote, - std::unique_ptr< const llarp::ILinkMessage > &msg, + SendTo(llarp::RouterID remote, const llarp::ILinkMessage *msg, llarp::ILinkLayer *chosen); /// manually flush outbound message queue for just 1 router