diff --git a/libabyss/CMakeLists.txt b/libabyss/CMakeLists.txt index 7e3ce1a28..e516e0dcd 100644 --- a/libabyss/CMakeLists.txt +++ b/libabyss/CMakeLists.txt @@ -4,4 +4,4 @@ add_library(${ABYSS_LIB} "${CMAKE_CURRENT_SOURCE_DIR}/src/md5.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/src/server.cpp") target_include_directories(${ABYSS_LIB} PUBLIC include) -target_link_libraries(${ABYSS_LIB} PUBLIC ${PLATFORM_LIB}) +target_link_libraries(${ABYSS_LIB} PUBLIC ${PLATFORM_LIB} uv) diff --git a/libabyss/main.cpp b/libabyss/main.cpp index cf4b30caf..bb80dc869 100644 --- a/libabyss/main.cpp +++ b/libabyss/main.cpp @@ -117,7 +117,7 @@ main(ABSL_ATTRIBUTE_UNUSED int argc, ABSL_ATTRIBUTE_UNUSED char* argv[]) #endif llarp::SetLogLevel(llarp::eLogDebug); llarp_threadpool* threadpool = llarp_init_same_process_threadpool(); - llarp_ev_loop_ptr loop = llarp_make_ev_loop(); + llarp_ev_loop_ptr loop = llarp_make_uv_loop(); auto logic = std::make_shared< llarp::Logic >(threadpool); sockaddr_in addr; addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); diff --git a/libabyss/src/client.cpp b/libabyss/src/client.cpp index e721a6476..4c1e7e788 100644 --- a/libabyss/src/client.cpp +++ b/libabyss/src/client.cpp @@ -63,6 +63,7 @@ namespace abyss static void OnClosed(llarp_tcp_conn* conn) { + llarp::LogDebug("connection closed"); ConnImpl* self = static_cast< ConnImpl* >(conn->user); self->state = eCloseMe; } diff --git a/libabyss/src/server.cpp b/libabyss/src/server.cpp index bae67bf09..739c72dfa 100644 --- a/libabyss/src/server.cpp +++ b/libabyss/src/server.cpp @@ -212,6 +212,7 @@ namespace abyss bool ProcessRead(const char* buf, size_t sz) { + llarp::LogDebug("http read ", sz, " bytes"); if(m_Bad) { return false; @@ -266,6 +267,7 @@ namespace abyss static void OnClosed(llarp_tcp_conn* conn) { + llarp::LogDebug("connection closed"); ConnImpl* self = static_cast< ConnImpl* >(conn->user); self->_conn = nullptr; } @@ -280,6 +282,8 @@ namespace abyss void Tick() { + if(m_Bad) + Close(); } /// mark bad so next tick we are closed diff --git a/llarp/ev/ev.cpp b/llarp/ev/ev.cpp index 78b7e1efe..8b2fb96d2 100644 --- a/llarp/ev/ev.cpp +++ b/llarp/ev/ev.cpp @@ -186,11 +186,14 @@ llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const llarp_buffer_t &b) { ssize_t amount = conn->write(conn, buf.underlying.cur, EV_WRITE_BUF_SZ); if(amount <= 0) + { + llarp::LogError("write underrun"); return false; + } buf.underlying.cur += amount; sz -= amount; } - return conn->write(conn, buf.underlying.cur, sz); + return conn->write(conn, buf.underlying.cur, sz) > 0; } void @@ -241,12 +244,7 @@ llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *tcp, void llarp_tcp_acceptor_close(struct llarp_tcp_acceptor *tcp) { - llarp::ev_io *impl = static_cast< llarp::ev_io * >(tcp->user); - tcp->impl = nullptr; - tcp->loop->close_ev(impl); - if(tcp->closed) - tcp->closed(tcp); - // dont free acceptor because it may be stack allocated + tcp->close(tcp); } void diff --git a/llarp/ev/ev.h b/llarp/ev/ev.h index 4c282c485..b208370e8 100644 --- a/llarp/ev/ev.h +++ b/llarp/ev/ev.h @@ -178,6 +178,8 @@ struct llarp_tcp_acceptor void (*accepted)(struct llarp_tcp_acceptor *, struct llarp_tcp_conn *); /// handle after server socket closed (free-ing is handled by event loop) void (*closed)(struct llarp_tcp_acceptor *); + /// set by impl + void (*close)(struct llarp_tcp_acceptor *); }; /// bind to an address and start serving async diff --git a/llarp/ev/ev_libuv.cpp b/llarp/ev/ev_libuv.cpp index 5250bd96e..bc5fdcf02 100644 --- a/llarp/ev/ev_libuv.cpp +++ b/llarp/ev/ev_libuv.cpp @@ -14,14 +14,18 @@ namespace libuv llarp_tcp_conn m_Conn; llarp::Addr m_Addr; + std::deque< std::vector< char > > m_WriteQueue; + conn_glue(uv_loop_t* loop, llarp_tcp_connecter* tcp, const sockaddr* addr) : m_TCP(tcp), m_Accept(nullptr), m_Addr(*addr) { m_Connect.data = this; - m_Handle.data = tcp; + m_Handle.data = this; uv_tcp_init(loop, &m_Handle); m_Ticker.data = this; uv_timer_init(loop, &m_Ticker); + m_Conn.close = &ExplicitClose; + m_Conn.write = &ExplicitWrite; } conn_glue(uv_loop_t* loop, llarp_tcp_acceptor* tcp, const sockaddr* addr) @@ -31,10 +35,15 @@ namespace libuv uv_tcp_init(loop, &m_Handle); m_Ticker.data = this; uv_timer_init(loop, &m_Ticker); + m_Accept->close = &ExplicitCloseAccept; + m_Conn.write = nullptr; + m_Conn.closed = nullptr; } conn_glue(conn_glue* parent) : m_TCP(nullptr), m_Accept(nullptr) { + m_Conn.close = &ExplicitClose; + m_Conn.write = &ExplicitWrite; m_Handle.data = this; uv_tcp_init(parent->m_Handle.loop, &m_Handle); m_Ticker.data = this; @@ -60,19 +69,25 @@ namespace libuv { static_cast< conn_glue* >(conn->impl)->Close(); } + static void + ExplicitCloseAccept(llarp_tcp_acceptor* tcp) + { + static_cast< conn_glue* >(tcp->impl)->Close(); + } static ssize_t ExplicitWrite(llarp_tcp_conn* conn, const byte_t* ptr, size_t sz) { - if(static_cast< conn_glue* >(conn->impl)->WriteAsync(ptr, sz)) - return sz; - return 0; + return static_cast< conn_glue* >(conn->impl)->WriteAsync((char*)ptr, sz); } static void OnRead(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { - static_cast< conn_glue* >(stream->data)->Read(buf->base, nread); + if(nread >= 0) + static_cast< conn_glue* >(stream->data)->Read(buf->base, nread); + else if(nread < 0) + static_cast< conn_glue* >(stream->data)->Close(); delete[] buf->base; } @@ -86,8 +101,12 @@ namespace libuv void Read(const char* ptr, ssize_t sz) { - const llarp_buffer_t buf(ptr, sz); - m_Conn.read(&m_Conn, buf); + if(m_Conn.read) + { + llarp::LogDebug("tcp read ", sz, " bytes"); + const llarp_buffer_t buf(ptr, sz); + m_Conn.read(&m_Conn, buf); + } } void @@ -105,38 +124,53 @@ namespace libuv Start(); } else if(m_TCP->error) + { + llarp::LogError("failed to connect tcp ", uv_strerror(status)); m_TCP->error(m_TCP); + } } } void WriteFail() { - m_Conn.close(&m_Conn); + if(m_Conn.close) + m_Conn.close(&m_Conn); + } + + uv_stream_t* + Stream() + { + return (uv_stream_t*)&m_Handle; } static void OnWritten(uv_write_t* req, int status) { - conn_glue* self = static_cast< conn_glue* >(req->data); if(status) - self->WriteFail(); - delete req; + { + llarp::LogError("write failed on tcp: ", uv_strerror(status)); + static_cast< conn_glue* >(req->data)->Close(); + return; + } + static_cast< conn_glue* >(req->data)->DrainOne(); } - uv_stream_t* - Stream() + void + DrainOne() { - return (uv_stream_t*)&m_Handle; + m_WriteQueue.pop_front(); } - bool - WriteAsync(const void* data, size_t sz) + int + WriteAsync(char* data, size_t sz) { - uv_write_t* request = new uv_write_t(); - request->data = this; - auto buf = uv_buf_init((char*)data, sz); - return uv_write(request, Stream(), &buf, 1, &OnWritten) != -1; + m_WriteQueue.emplace_back(sz); + std::copy_n(data, sz, m_WriteQueue.back().begin()); + auto buf = uv_buf_init(m_WriteQueue.back().data(), sz); + uv_write_t* req = new uv_write_t(); + req->data = this; + return uv_write(req, Stream(), &buf, 1, &OnWritten) == 0 ? sz : 0; } static void @@ -150,15 +184,22 @@ namespace libuv { m_Handle.data = nullptr; if(m_Accept && m_Accept->closed) + { + m_Accept->impl = nullptr; m_Accept->closed(m_Accept); + } + m_Conn.impl = nullptr; if(m_Conn.closed) + { m_Conn.closed(&m_Conn); + } delete this; } void Close() { + llarp::LogDebug("close tcp connection"); uv_timer_stop(&m_Ticker); uv_close((uv_handle_t*)&m_Handle, &OnClosed); } @@ -170,6 +211,10 @@ namespace libuv { static_cast< conn_glue* >(stream->data)->Accept(); } + else + { + llarp::LogError("tcp accept failed: ", uv_strerror(status)); + } } static void @@ -191,8 +236,12 @@ namespace libuv void Start() { - uv_timer_start((uv_timer_t*)&m_Ticker, &OnTick, 10, 10); - uv_read_start(Stream(), &Alloc, &OnRead); + auto result = uv_timer_start((uv_timer_t*)&m_Ticker, &OnTick, 10, 10); + if(result) + llarp::LogError("failed to start timer ", uv_strerror(result)); + result = uv_read_start(Stream(), &Alloc, &OnRead); + if(result) + llarp::LogError("failed to start reader ", uv_strerror(result)); } void @@ -201,11 +250,18 @@ namespace libuv if(m_Accept && m_Accept->accepted) { conn_glue* child = new conn_glue(this); - uv_accept(Stream(), child->Stream()); - child->m_Conn.impl = this; + llarp::LogDebug("accepted new connection"); + child->m_Conn.impl = child; child->m_Conn.loop = m_Accept->loop; child->m_Conn.close = &ExplicitClose; child->m_Conn.write = &ExplicitWrite; + auto res = uv_accept(Stream(), child->Stream()); + if(res) + { + llarp::LogError("failed to accept tcp connection ", uv_strerror(res)); + child->Close(); + return; + } m_Accept->accepted(m_Accept, &child->m_Conn); child->Start(); } @@ -214,6 +270,7 @@ namespace libuv bool Server() { + m_Accept->close = &ExplicitCloseAccept; return uv_tcp_bind(&m_Handle, m_Addr, 0) == 0 && uv_listen(Stream(), 5, &OnAccept) == 0; } @@ -256,7 +313,7 @@ namespace libuv void RecvFrom(ssize_t sz, const uv_buf_t* buf, const struct sockaddr* fromaddr) { - if(sz >= 0) + if(sz >= 0 && m_UDP && m_UDP->recvfrom) { const size_t pktsz = sz; const llarp_buffer_t pkt{(const byte_t*)buf->base, pktsz}; @@ -274,7 +331,6 @@ namespace libuv void Tick() { - llarp::LogDebug("udp tick"); if(m_UDP && m_UDP->tick) m_UDP->tick(m_UDP); uv_timer_again(&m_Ticker); @@ -467,6 +523,7 @@ namespace libuv m_Impl.reset(uv_loop_new()); if(uv_loop_init(m_Impl.get()) == -1) return false; + uv_loop_configure(m_Impl.get(), UV_LOOP_BLOCK_SIGNAL, SIGPIPE); m_TickTimer.data = this; m_Run.store(true); return uv_timer_init(m_Impl.get(), &m_TickTimer) != -1;