diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index 364f5e19b..50b2ee6f2 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -26,12 +26,7 @@ llarp_make_ev_loop() void llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, std::shared_ptr logic) { - while (ev->running()) - { - ev->update_time(); - ev->tick(EV_TICK_INTERVAL); - llarp::LogContext::Instance().logStream->Tick(ev->time_now()); - } + ev->run(); logic->clear_event_loop(); ev->stopped(); } diff --git a/llarp/ev/ev.h b/llarp/ev/ev.h index 8733fad8f..98f5168ff 100644 --- a/llarp/ev/ev.h +++ b/llarp/ev/ev.h @@ -62,10 +62,6 @@ llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr& ev); void llarp_ev_loop_stop(const llarp_ev_loop_ptr& ev); -/// list of packets we recv'd -/// forward declared -struct llarp_pkt_list; - /// UDP handling configuration struct llarp_udp_io { @@ -83,11 +79,6 @@ struct llarp_udp_io int (*sendto)(struct llarp_udp_io*, const llarp::SockAddr&, const byte_t*, size_t); }; -/// get all packets recvieved last tick -/// return true if we got packets return false if we didn't -bool -llarp_ev_udp_recvmany(struct llarp_udp_io* udp, struct llarp_pkt_list* pkts); - /// add UDP handler int llarp_ev_add_udp(struct llarp_ev_loop* ev, struct llarp_udp_io* udp, const llarp::SockAddr& src); diff --git a/llarp/ev/ev.hpp b/llarp/ev/ev.hpp index fe53129b3..ee9789e51 100644 --- a/llarp/ev/ev.hpp +++ b/llarp/ev/ev.hpp @@ -812,72 +812,4 @@ struct llarp_ev_loop call_soon(std::function f) = 0; }; -struct PacketBuffer -{ - PacketBuffer(PacketBuffer&& other) - { - _ptr = other._ptr; - _sz = other._sz; - other._ptr = nullptr; - other._sz = 0; - } - - PacketBuffer(const PacketBuffer&) = delete; - - PacketBuffer& - operator=(const PacketBuffer&) = delete; - - PacketBuffer() : PacketBuffer(nullptr, 0){}; - explicit PacketBuffer(size_t sz) : _sz{sz} - { - _ptr = new char[sz]; - } - PacketBuffer(char* buf, size_t sz) - { - _ptr = buf; - _sz = sz; - } - ~PacketBuffer() - { - if (_ptr) - delete[] _ptr; - } - byte_t* - data() - { - return (byte_t*)_ptr; - } - size_t - size() - { - return _sz; - } - byte_t& operator[](size_t sz) - { - return data()[sz]; - } - void - reserve(size_t sz) - { - if (_ptr) - delete[] _ptr; - _ptr = new char[sz]; - _sz = sz; - } - - private: - char* _ptr = nullptr; - size_t _sz = 0; -}; - -struct PacketEvent -{ - llarp::SockAddr remote; - PacketBuffer pkt; -}; - -struct llarp_pkt_list : public std::vector -{ -}; - #endif diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 21225b8bc..a674e2627 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -397,8 +397,6 @@ namespace libuv uv_check_t m_Ticker; llarp_udp_io* const m_UDP; llarp::SockAddr m_Addr; - llarp_pkt_list m_LastPackets; - std::array m_Buffer; udp_glue(uv_loop_t* loop, llarp_udp_io* udp, const llarp::SockAddr& src) : m_UDP(udp), m_Addr(src) @@ -424,16 +422,7 @@ namespace libuv udp_glue* glue = static_cast(handle->data); if (addr) glue->RecvFrom(nread, buf, llarp::SockAddr(*addr)); - if (nread <= 0 || glue->m_UDP == nullptr || glue->m_UDP->recvfrom != nullptr) - delete[] buf->base; - } - - bool - RecvMany(llarp_pkt_list* pkts) - { - *pkts = std::move(m_LastPackets); - m_LastPackets = llarp_pkt_list(); - return pkts->size() > 0; + delete[] buf->base; } void @@ -447,11 +436,6 @@ namespace libuv const llarp_buffer_t pkt((const byte_t*)buf->base, pktsz); m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt}); } - else - { - PacketBuffer pbuf(buf->base, pktsz); - m_LastPackets.emplace_back(PacketEvent{fromaddr, std::move(pbuf)}); - } } } @@ -760,9 +744,11 @@ namespace libuv OnAsyncWake(uv_async_t* async_handle) { Loop* loop = static_cast(async_handle->data); + loop->update_time(); loop->process_timer_queue(); loop->process_cancel_queue(); loop->FlushLogic(); + llarp::LogContext::Instance().logStream->Tick(loop->time_now()); } Loop::Loop() : llarp_ev_loop(), m_LogicCalls(1024), m_timerQueue(20), m_timerCancelQueue(20) @@ -787,12 +773,13 @@ namespace libuv #endif m_TickTimer = new uv_timer_t; m_TickTimer->data = this; + if (uv_timer_init(&m_Impl, m_TickTimer) == -1) + return false; m_Run.store(true); m_nextID.store(0); - m_WakeUp.data = this; uv_async_init(&m_Impl, &m_WakeUp, &OnAsyncWake); - return uv_timer_init(&m_Impl, m_TickTimer) != -1; + return true; } void @@ -829,6 +816,11 @@ namespace libuv int Loop::run() { + uv_timer_start( + m_TickTimer, + [](uv_timer_t* t) { static_cast(t->loop->data)->FlushLogic(); }, + 1000, + 1000); return uv_run(&m_Impl, UV_RUN_DEFAULT); } @@ -923,11 +915,7 @@ namespace libuv while (not m_timerCancelQueue.empty()) { uint64_t job_id = m_timerCancelQueue.popFront(); - auto itr = m_pendingCalls.find(job_id); - if (itr != m_pendingCalls.end()) - { - m_pendingCalls.erase(itr); - } + m_pendingCalls.erase(job_id); } } @@ -937,8 +925,9 @@ namespace libuv auto itr = m_pendingCalls.find(job_id); if (itr != m_pendingCalls.end()) { - LogicCall(m_Logic, itr->second); - m_pendingCalls.erase(itr); + if (itr->second) + itr->second(); + m_pendingCalls.erase(itr->first); } } @@ -1063,9 +1052,3 @@ namespace libuv } } // namespace libuv - -bool -llarp_ev_udp_recvmany(struct llarp_udp_io* u, struct llarp_pkt_list* pkts) -{ - return static_cast(u->impl)->RecvMany(pkts); -} diff --git a/llarp/iwp/message_buffer.cpp b/llarp/iwp/message_buffer.cpp index 67d137c83..cda8477ab 100644 --- a/llarp/iwp/message_buffer.cpp +++ b/llarp/iwp/message_buffer.cpp @@ -19,6 +19,7 @@ namespace llarp { const llarp_buffer_t buf(m_Data); CryptoManager::instance()->shorthash(m_Digest, buf); + m_Acks.set(0); } ILinkSession::Packet_t diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index c0d6265bc..aaa44b5ca 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -143,10 +143,8 @@ namespace llarp Session::EncryptWorker(CryptoQueue_ptr msgs) { LogDebug("encrypt worker ", msgs->size(), " messages"); - auto itr = msgs->begin(); - while (itr != msgs->end()) + for (auto& pkt : *msgs) { - Packet_t pkt = std::move(*itr); llarp_buffer_t pktbuf(pkt); const TunnelNonce nonce_ptr{pkt.data() + HMACSIZE}; pktbuf.base += PacketOverhead; @@ -157,7 +155,6 @@ namespace llarp pktbuf.sz = pkt.size() - HMACSIZE; CryptoManager::instance()->hmac(pkt.data(), pktbuf, m_SessionKey); Send_LL(pkt.data(), pkt.size()); - ++itr; } } @@ -911,11 +908,6 @@ namespace llarp return false; } } - else - { - // this case should never happen - ::abort(); - } break; case State::Introduction: if (m_Inbound) diff --git a/llarp/link/server.cpp b/llarp/link/server.cpp index 9c997aba7..1733a116b 100644 --- a/llarp/link/server.cpp +++ b/llarp/link/server.cpp @@ -116,7 +116,13 @@ namespace llarp { m_Loop = loop; m_udp.user = this; - m_udp.recvfrom = nullptr; + m_udp.recvfrom = [](llarp_udp_io* udp, const llarp::SockAddr& from, ManagedBuffer pktbuf) { + ILinkSession::Packet_t pkt; + auto& buf = pktbuf.underlying; + pkt.resize(buf.sz); + std::copy_n(buf.base, buf.sz, pkt.data()); + static_cast(udp->user)->RecvFrom(from, std::move(pkt)); + }; m_udp.tick = &ILinkLayer::udp_tick; if (ifname == "*") { @@ -495,23 +501,7 @@ namespace llarp ILinkLayer::udp_tick(llarp_udp_io* udp) { ILinkLayer* link = static_cast(udp->user); - auto pkts = std::make_shared(); - llarp_ev_udp_recvmany(&link->m_udp, pkts.get()); - auto logic = link->logic(); - if (logic == nullptr) - return; - LogicCall(logic, [pkts, link]() { - auto itr = pkts->begin(); - while (itr != pkts->end()) - { - if (link->m_RecentlyClosed.find(itr->remote) == link->m_RecentlyClosed.end()) - { - link->RecvFrom(itr->remote, std::move(itr->pkt)); - } - ++itr; - } - link->Pump(); - }); + link->Pump(); } } // namespace llarp diff --git a/llarp/link/session.hpp b/llarp/link/session.hpp index 941ec61c0..cb8920acf 100644 --- a/llarp/link/session.hpp +++ b/llarp/link/session.hpp @@ -45,7 +45,7 @@ namespace llarp /// message delivery result hook function using CompletionHandler = std::function; - using Packet_t = PacketBuffer; + using Packet_t = std::vector; using Message_t = std::vector; /// send a message buffer to the remote endpoint