diff --git a/libabyss/src/client.cpp b/libabyss/src/client.cpp index ecac9576c..df237660b 100644 --- a/libabyss/src/client.cpp +++ b/libabyss/src/client.cpp @@ -137,7 +137,7 @@ namespace abyss ProcessRead(const char* buf, size_t sz) { if(state == eInitial) - return false; + return true; bool done = false; while(state < eReadResponseBody) { @@ -202,13 +202,16 @@ namespace abyss json::ToString(m_RequestBody, body); // request base char buf[512] = {0}; - snprintf(buf, sizeof(buf), - "POST /rpc HTTP/1.0\r\nContent-Type: " - "application/json\r\nContent-Length: %lu\r\nAccept: " - "application/json\r\n", - body.size()); - if(!llarp_tcp_conn_async_write(m_Conn, buf, strnlen(buf, sizeof(buf)))) + int sz = snprintf(buf, sizeof(buf), + "POST /rpc HTTP/1.0\r\nContent-Type: " + "application/json\r\nContent-Length: %lu\r\nAccept: " + "application/json\r\n", + body.size()); + if(sz <= 0) + return; + if(!llarp_tcp_conn_async_write(m_Conn, buf, sz)) { + llarp::LogError("failed to write first part of request"); CloseError(); return; } diff --git a/libabyss/src/server.cpp b/libabyss/src/server.cpp index 0dc408264..114e09945 100644 --- a/libabyss/src/server.cpp +++ b/libabyss/src/server.cpp @@ -97,44 +97,24 @@ namespace abyss return name == "content-type" || name == "content-length"; } - bool - WriteStatusLine(int code, const std::string& message) - { - char buf[128] = {0}; - int sz = snprintf(buf, sizeof(buf), "HTTP/1.0 %d %s\r\n", code, - message.c_str()); - if(sz > 0) - { - return llarp_tcp_conn_async_write(_conn, buf, sz); - } - else - return false; - } - bool WriteResponseSimple(int code, const std::string& msg, const char* contentType, const char* content) { - if(!WriteStatusLine(code, msg)) - return false; - char buf[128] = {0}; - int sz = - snprintf(buf, sizeof(buf), "Content-Type: %s\r\n", contentType); - if(sz <= 0) - return false; - if(!llarp_tcp_conn_async_write(_conn, buf, sz)) - return false; + char buf[512] = {0}; size_t contentLength = strlen(content); - sz = snprintf(buf, sizeof(buf), "Content-Length: %zu\r\n\r\n", - contentLength); + int sz = snprintf(buf, sizeof(buf), + "HTTP/1.0 %d %s\r\nContent-Type: " + "%s\r\nContent-Length: %zu\r\n\r\n", + code, msg.c_str(), contentType, contentLength); if(sz <= 0) return false; if(!llarp_tcp_conn_async_write(_conn, buf, sz)) return false; - if(!llarp_tcp_conn_async_write(_conn, content, contentLength)) - return false; + m_State = eWriteHTTPBody; - return true; + + return llarp_tcp_conn_async_write(_conn, content, contentLength); } bool @@ -236,8 +216,13 @@ namespace abyss { return false; } + + if(sz == 0) + return true; + bool done = false; m_LastActive = _parent->now(); + if(m_State < eReadHTTPBody) { const char* end = strstr(buf, "\r\n"); diff --git a/llarp/ev.cpp b/llarp/ev.cpp index 070478ebc..865719282 100644 --- a/llarp/ev.cpp +++ b/llarp/ev.cpp @@ -121,7 +121,7 @@ llarp_ev_add_tun(struct llarp_ev_loop *loop, struct llarp_tun_io *tun) tun->impl = dev; if(dev) { - return loop->add_ev(dev); + return loop->add_ev(dev, false); } return false; } @@ -133,7 +133,10 @@ llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const void *pkt, const byte_t *ptr = (const byte_t *)pkt; llarp::tcp_conn *impl = static_cast< llarp::tcp_conn * >(conn->impl); if(impl->_shouldClose) + { + llarp::LogError("write on closed connection"); return false; + } while(sz > EV_WRITE_BUF_SZ) { if(!impl->queue_write((const byte_t *)ptr, EV_WRITE_BUF_SZ)) @@ -173,15 +176,12 @@ llarp_tcp_async_try_connect(struct llarp_ev_loop *loop, // actually parse address llarp::Addr addr(addr_str, port_str); - llarp::ev_io *conn = loop->tcp_connect(tcp, addr); - if(conn && loop->add_ev(conn, true)) + if(!loop->tcp_connect(tcp, addr)) { - llarp::LogDebug("async connect to ", addr); - return; + llarp::LogError("async connect failed"); + if(tcp->error) + tcp->error(tcp); } - llarp::LogError("async connect failed"); - if(tcp->error) - tcp->error(tcp); } bool @@ -192,7 +192,7 @@ llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *tcp, llarp::ev_io *impl = loop->bind_tcp(tcp, bindaddr); if(impl) { - return loop->add_ev(impl); + return loop->add_ev(impl, false); } return false; } diff --git a/llarp/ev.hpp b/llarp/ev.hpp index 9caab0050..b26188f49 100644 --- a/llarp/ev.hpp +++ b/llarp/ev.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #ifdef _WIN32 #include @@ -287,6 +288,11 @@ namespace llarp int fd; int flags = 0; +#if defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) \ + || (__APPLE__ && __MACH__) + struct kevent change; +#endif + posix_ev_io(int f) : fd(f) { } @@ -347,11 +353,17 @@ namespace llarp return false; } + virtual void + flush_write() + { + flush_write_buffers(0); + } + /// called in event loop when fd is ready for writing /// requeues anything not written /// this assumes fd is set to non blocking virtual void - flush_write() + flush_write_buffers(size_t amount) { if(m_LossyWriteQueue) m_LossyWriteQueue->Process([&](WriteBuffer& buffer) { @@ -361,28 +373,53 @@ namespace llarp }); else if(m_BlockingWriteQueue) { - // write buffers - while(m_BlockingWriteQueue->size()) + if(amount) { - auto& itr = m_BlockingWriteQueue->front(); - ssize_t result = do_write(itr.buf, itr.bufsz); - if(result == -1) - return; - ssize_t dlt = itr.bufsz - result; - if(dlt > 0) + while(amount && m_BlockingWriteQueue->size()) { - // queue remaining to front of queue - WriteBuffer buff(itr.buf + dlt, itr.bufsz - dlt); + auto& itr = m_BlockingWriteQueue->front(); + ssize_t result = do_write(itr.buf, std::min(amount, itr.bufsz)); + if(result == -1) + return; + ssize_t dlt = itr.bufsz - result; + if(dlt > 0) + { + // queue remaining to front of queue + WriteBuffer buff(itr.buf + dlt, itr.bufsz - dlt); + m_BlockingWriteQueue->pop_front(); + m_BlockingWriteQueue->push_front(buff); + // TODO: errno? + return; + } m_BlockingWriteQueue->pop_front(); - m_BlockingWriteQueue->push_front(buff); - // TODO: errno? - return; + amount -= result; } - m_BlockingWriteQueue->pop_front(); - if(errno == EAGAIN || errno == EWOULDBLOCK) + } + else + { + // write buffers + while(m_BlockingWriteQueue->size()) { - errno = 0; - return; + auto& itr = m_BlockingWriteQueue->front(); + ssize_t result = do_write(itr.buf, itr.bufsz); + if(result == -1) + return; + ssize_t dlt = itr.bufsz - result; + if(dlt > 0) + { + // queue remaining to front of queue + WriteBuffer buff(itr.buf + dlt, itr.bufsz - dlt); + m_BlockingWriteQueue->pop_front(); + m_BlockingWriteQueue->push_front(buff); + // TODO: errno? + return; + } + m_BlockingWriteQueue->pop_front(); + if(errno == EAGAIN || errno == EWOULDBLOCK) + { + errno = 0; + return; + } } } } @@ -475,7 +512,22 @@ namespace llarp flush_write(); void - error(); + flush_write_buffers(size_t a) + { + connected(); + ev_io::flush_write_buffers(a); + } + + void + error() + { + if(_conn) + { + llarp::LogError("tcp_conn error: ", strerror(errno)); + if(_conn->error) + _conn->error(_conn); + } + } virtual ssize_t do_write(void* buf, size_t sz); @@ -548,12 +600,13 @@ struct llarp_ev_loop virtual llarp::ev_io* bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* addr) = 0; - virtual llarp::ev_io* + /// return false on socket error (non blocking) + virtual bool tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr) = 0; /// register event listener virtual bool - add_ev(llarp::ev_io* ev, bool write = false) = 0; + add_ev(llarp::ev_io* ev, bool write) = 0; virtual bool running() const = 0; diff --git a/llarp/ev_epoll.hpp b/llarp/ev_epoll.hpp index 4910cd8ba..034fa208e 100644 --- a/llarp/ev_epoll.hpp +++ b/llarp/ev_epoll.hpp @@ -87,17 +87,6 @@ namespace llarp } } - void - tcp_conn::error() - { - if(_conn) - { - llarp::LogError("tcp_conn error: ", strerror(errno)); - if(_conn->error) - _conn->error(_conn); - } - } - int tcp_serv::read(void*, size_t) { @@ -272,28 +261,29 @@ struct llarp_epoll_loop : public llarp_ev_loop { } - llarp::ev_io* + bool tcp_connect(struct llarp_tcp_connecter* tcp, const sockaddr* remoteaddr) { // create socket int fd = ::socket(remoteaddr->sa_family, SOCK_STREAM, 0); if(fd == -1) - return nullptr; + return false; // set non blocking int flags = fcntl(fd, F_GETFL, 0); if(flags == -1) { ::close(fd); - return nullptr; + return false; } if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { ::close(fd); - return nullptr; + return false; } llarp::tcp_conn* conn = new llarp::tcp_conn(this, fd, remoteaddr, tcp); + add_ev(conn, true); conn->connect(); - return conn; + return true; } llarp::ev_io* diff --git a/llarp/ev_kqueue.hpp b/llarp/ev_kqueue.hpp index a06dbaccb..f83ce6084 100644 --- a/llarp/ev_kqueue.hpp +++ b/llarp/ev_kqueue.hpp @@ -29,25 +29,42 @@ namespace llarp int tcp_conn::read(void* buf, size_t sz) { + if(sz == 0) + { + if(tcp.read) + tcp.read(&tcp, 0, 0); + return 0; + } if(_shouldClose) return -1; ssize_t amount = ::read(fd, buf, sz); - if(amount > 0) + if(amount >= 0) { - if(tcp->read) - tcp->read(tcp, buf, amount); + if(tcp.read) + tcp.read(&tcp, buf, amount); } else { - // error + if(errno == EAGAIN || errno == EWOULDBLOCK) + { + errno = 0; + return 0; + } _shouldClose = true; return -1; } return 0; } + void + tcp_conn::flush_write() + { + connected(); + ev_io::flush_write(); + } + ssize_t tcp_conn::do_write(void* buf, size_t sz) { @@ -62,33 +79,65 @@ namespace llarp #endif } + void + tcp_conn::connect() + { + socklen_t slen = sizeof(sockaddr_in); + if(_addr.ss_family == AF_UNIX) + slen = sizeof(sockaddr_un); + else if(_addr.ss_family == AF_INET6) + slen = sizeof(sockaddr_in6); + int result = ::connect(fd, (const sockaddr*)&_addr, slen); + if(result == 0) + { + llarp::LogDebug("Connected"); + connected(); + } + else if(errno == EINPROGRESS) + { + llarp::LogDebug("connect in progress"); + errno = 0; + return; + } + else if(_conn) + { + _conn->error(_conn); + } + } + int tcp_serv::read(void*, size_t) { int new_fd = ::accept(fd, nullptr, nullptr); if(new_fd == -1) { - llarp::LogError("failed to accept on ", fd, ":", strerror(errno)); + llarp::LogError("failed to accept on ", fd, ": ", strerror(errno)); + return -1; + } + // get flags + int flags = fcntl(new_fd, F_GETFL, 0); + if(flags == -1) + { + ::close(new_fd); + return -1; + } + // set flags + if(fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1) + { + llarp::LogError("Failed to set non block on ", fd, ": ", strerror(errno)); + ::close(new_fd); return -1; } - llarp_tcp_conn* conn = new llarp_tcp_conn; - // zero out callbacks - conn->tick = nullptr; - conn->closed = nullptr; - conn->read = nullptr; // build handler - llarp::tcp_conn* connimpl = new tcp_conn(new_fd, conn); - conn->impl = connimpl; - conn->loop = loop; + llarp::tcp_conn* connimpl = new llarp::tcp_conn(loop, new_fd); if(loop->add_ev(connimpl, true)) { // call callback if(tcp->accepted) - tcp->accepted(tcp, conn); + tcp->accepted(tcp, &connimpl->tcp); return 0; } // cleanup error - delete conn; delete connimpl; return -1; } @@ -248,7 +297,6 @@ namespace llarp struct llarp_kqueue_loop : public llarp_ev_loop { int kqueuefd; - struct kevent change; /* event we want to monitor */ llarp_kqueue_loop() : kqueuefd(-1) { @@ -279,6 +327,18 @@ struct llarp_kqueue_loop : public llarp_ev_loop ::close(fd); return nullptr; } + // set non blocking + int flags = fcntl(fd, F_GETFL, 0); + if(flags == -1) + { + ::close(fd); + return nullptr; + } + if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) + { + ::close(fd); + return nullptr; + } llarp::ev_io* serv = new llarp::tcp_serv(this, fd, tcp); tcp->impl = serv; return serv; @@ -314,6 +374,30 @@ struct llarp_kqueue_loop : public llarp_ev_loop return kqueuefd != -1; } + bool + tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr) + { + int fd = ::socket(addr->sa_family, SOCK_STREAM, 0); + if(fd == -1) + return false; + int flags = fcntl(fd, F_GETFL, 0); + if(flags == -1) + { + ::close(fd); + return false; + } + if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) + { + ::close(fd); + return false; + } + + llarp::tcp_conn* conn = new llarp::tcp_conn(this, fd, addr, tcp); + add_ev(conn, true); + conn->connect(); + return true; + } + int tick(int ms) { @@ -333,9 +417,10 @@ struct llarp_kqueue_loop : public llarp_ev_loop if(ev) { if(events[idx].filter & EVFILT_READ) - ev->read(readbuf, sizeof(readbuf)); + ev->read(readbuf, + std::min(sizeof(readbuf), size_t(events[idx].data))); if(events[idx].filter & EVFILT_WRITE) - ev->flush_write(); + ev->flush_write_buffers(events[idx].data); } ++idx; } @@ -350,7 +435,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop { timespec t; t.tv_sec = 0; - t.tv_nsec = 1000UL * EV_TICK_INTERVAL; + t.tv_nsec = 1000000UL * EV_TICK_INTERVAL; struct kevent events[1024]; int result; do @@ -366,9 +451,10 @@ struct llarp_kqueue_loop : public llarp_ev_loop if(ev) { if(events[idx].filter & EVFILT_READ) - ev->read(readbuf, sizeof(readbuf)); + ev->read(readbuf, + std::min(sizeof(readbuf), size_t(events[idx].data))); if(events[idx].filter & EVFILT_WRITE) - ev->flush_write(); + ev->flush_write_buffers(events[idx].data); } else { @@ -453,8 +539,8 @@ struct llarp_kqueue_loop : public llarp_ev_loop bool close_ev(llarp::ev_io* ev) { - EV_SET(&change, ev->fd, ev->flags, EV_DELETE, 0, 0, nullptr); - return kevent(kqueuefd, &change, 1, nullptr, 0, nullptr) != -1; + EV_SET(&ev->change, ev->fd, ev->flags, EV_DELETE, 0, 0, nullptr); + return kevent(kqueuefd, &ev->change, 1, nullptr, 0, nullptr) != -1; } llarp::ev_io* @@ -469,18 +555,27 @@ struct llarp_kqueue_loop : public llarp_ev_loop } bool - add_ev(llarp::ev_io* ev, bool write) + add_ev(llarp::ev_io* ev, bool w) { ev->flags = EVFILT_READ; - if(write) - ev->flags |= EVFILT_WRITE; - - EV_SET(&change, ev->fd, ev->flags, EV_ADD, 0, 0, ev); - if(kevent(kqueuefd, &change, 1, nullptr, 0, nullptr) == -1) + EV_SET(&ev->change, ev->fd, EVFILT_READ, EV_ADD, 0, 0, ev); + if(kevent(kqueuefd, &ev->change, 1, nullptr, 0, nullptr) == -1) { + llarp::LogError("Failed to add event: ", strerror(errno)); delete ev; return false; } + if(w) + { + ev->flags |= EVFILT_WRITE; + EV_SET(&ev->change, ev->fd, EVFILT_WRITE, EV_ADD, 0, 0, ev); + if(kevent(kqueuefd, &ev->change, 1, nullptr, 0, nullptr) == -1) + { + llarp::LogError("Failed to add event: ", strerror(errno)); + delete ev; + return false; + } + } handlers.emplace_back(ev); return true; } diff --git a/llarp/service/context.cpp b/llarp/service/context.cpp index 4b65f193f..0f430bd3b 100644 --- a/llarp/service/context.cpp +++ b/llarp/service/context.cpp @@ -245,8 +245,7 @@ namespace llarp } // construct - service = std::unique_ptr< llarp::service::Endpoint >( - itr->second(conf.first, m_Router)); + service.reset(itr->second(conf.first, m_Router)); } // configure for(const auto &option : conf.second)