* fix up kqueue tcp connection tracking so it works on mac os

* match changes in epoll for kqueue changes
* additional checks in libabyss
pull/38/head^2
Jeff 6 years ago
parent 2d279e83fd
commit 4b92661f5d

@ -137,7 +137,7 @@ namespace abyss
ProcessRead(const char* buf, size_t sz)
{
if(state == eInitial)
return false;
return true;
bool done = false;
while(state < eReadResponseBody)
{
@ -202,13 +202,16 @@ namespace abyss
json::ToString(m_RequestBody, body);
// request base
char buf[512] = {0};
snprintf(buf, sizeof(buf),
"POST /rpc HTTP/1.0\r\nContent-Type: "
"application/json\r\nContent-Length: %lu\r\nAccept: "
"application/json\r\n",
body.size());
if(!llarp_tcp_conn_async_write(m_Conn, buf, strnlen(buf, sizeof(buf))))
int sz = snprintf(buf, sizeof(buf),
"POST /rpc HTTP/1.0\r\nContent-Type: "
"application/json\r\nContent-Length: %lu\r\nAccept: "
"application/json\r\n",
body.size());
if(sz <= 0)
return;
if(!llarp_tcp_conn_async_write(m_Conn, buf, sz))
{
llarp::LogError("failed to write first part of request");
CloseError();
return;
}

@ -97,44 +97,24 @@ namespace abyss
return name == "content-type" || name == "content-length";
}
bool
WriteStatusLine(int code, const std::string& message)
{
char buf[128] = {0};
int sz = snprintf(buf, sizeof(buf), "HTTP/1.0 %d %s\r\n", code,
message.c_str());
if(sz > 0)
{
return llarp_tcp_conn_async_write(_conn, buf, sz);
}
else
return false;
}
bool
WriteResponseSimple(int code, const std::string& msg,
const char* contentType, const char* content)
{
if(!WriteStatusLine(code, msg))
return false;
char buf[128] = {0};
int sz =
snprintf(buf, sizeof(buf), "Content-Type: %s\r\n", contentType);
if(sz <= 0)
return false;
if(!llarp_tcp_conn_async_write(_conn, buf, sz))
return false;
char buf[512] = {0};
size_t contentLength = strlen(content);
sz = snprintf(buf, sizeof(buf), "Content-Length: %zu\r\n\r\n",
contentLength);
int sz = snprintf(buf, sizeof(buf),
"HTTP/1.0 %d %s\r\nContent-Type: "
"%s\r\nContent-Length: %zu\r\n\r\n",
code, msg.c_str(), contentType, contentLength);
if(sz <= 0)
return false;
if(!llarp_tcp_conn_async_write(_conn, buf, sz))
return false;
if(!llarp_tcp_conn_async_write(_conn, content, contentLength))
return false;
m_State = eWriteHTTPBody;
return true;
return llarp_tcp_conn_async_write(_conn, content, contentLength);
}
bool
@ -236,8 +216,13 @@ namespace abyss
{
return false;
}
if(sz == 0)
return true;
bool done = false;
m_LastActive = _parent->now();
if(m_State < eReadHTTPBody)
{
const char* end = strstr(buf, "\r\n");

@ -121,7 +121,7 @@ llarp_ev_add_tun(struct llarp_ev_loop *loop, struct llarp_tun_io *tun)
tun->impl = dev;
if(dev)
{
return loop->add_ev(dev);
return loop->add_ev(dev, false);
}
return false;
}
@ -133,7 +133,10 @@ llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const void *pkt,
const byte_t *ptr = (const byte_t *)pkt;
llarp::tcp_conn *impl = static_cast< llarp::tcp_conn * >(conn->impl);
if(impl->_shouldClose)
{
llarp::LogError("write on closed connection");
return false;
}
while(sz > EV_WRITE_BUF_SZ)
{
if(!impl->queue_write((const byte_t *)ptr, EV_WRITE_BUF_SZ))
@ -173,15 +176,12 @@ llarp_tcp_async_try_connect(struct llarp_ev_loop *loop,
// actually parse address
llarp::Addr addr(addr_str, port_str);
llarp::ev_io *conn = loop->tcp_connect(tcp, addr);
if(conn && loop->add_ev(conn, true))
if(!loop->tcp_connect(tcp, addr))
{
llarp::LogDebug("async connect to ", addr);
return;
llarp::LogError("async connect failed");
if(tcp->error)
tcp->error(tcp);
}
llarp::LogError("async connect failed");
if(tcp->error)
tcp->error(tcp);
}
bool
@ -192,7 +192,7 @@ llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *tcp,
llarp::ev_io *impl = loop->bind_tcp(tcp, bindaddr);
if(impl)
{
return loop->add_ev(impl);
return loop->add_ev(impl, false);
}
return false;
}

@ -10,6 +10,7 @@
#include <llarp/codel.hpp>
#include <list>
#include <deque>
#include <algorithm>
#ifdef _WIN32
#include <variant>
@ -287,6 +288,11 @@ namespace llarp
int fd;
int flags = 0;
#if defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) \
|| (__APPLE__ && __MACH__)
struct kevent change;
#endif
posix_ev_io(int f) : fd(f)
{
}
@ -347,11 +353,17 @@ namespace llarp
return false;
}
virtual void
flush_write()
{
flush_write_buffers(0);
}
/// called in event loop when fd is ready for writing
/// requeues anything not written
/// this assumes fd is set to non blocking
virtual void
flush_write()
flush_write_buffers(size_t amount)
{
if(m_LossyWriteQueue)
m_LossyWriteQueue->Process([&](WriteBuffer& buffer) {
@ -361,28 +373,53 @@ namespace llarp
});
else if(m_BlockingWriteQueue)
{
// write buffers
while(m_BlockingWriteQueue->size())
if(amount)
{
auto& itr = m_BlockingWriteQueue->front();
ssize_t result = do_write(itr.buf, itr.bufsz);
if(result == -1)
return;
ssize_t dlt = itr.bufsz - result;
if(dlt > 0)
while(amount && m_BlockingWriteQueue->size())
{
// queue remaining to front of queue
WriteBuffer buff(itr.buf + dlt, itr.bufsz - dlt);
auto& itr = m_BlockingWriteQueue->front();
ssize_t result = do_write(itr.buf, std::min(amount, itr.bufsz));
if(result == -1)
return;
ssize_t dlt = itr.bufsz - result;
if(dlt > 0)
{
// queue remaining to front of queue
WriteBuffer buff(itr.buf + dlt, itr.bufsz - dlt);
m_BlockingWriteQueue->pop_front();
m_BlockingWriteQueue->push_front(buff);
// TODO: errno?
return;
}
m_BlockingWriteQueue->pop_front();
m_BlockingWriteQueue->push_front(buff);
// TODO: errno?
return;
amount -= result;
}
m_BlockingWriteQueue->pop_front();
if(errno == EAGAIN || errno == EWOULDBLOCK)
}
else
{
// write buffers
while(m_BlockingWriteQueue->size())
{
errno = 0;
return;
auto& itr = m_BlockingWriteQueue->front();
ssize_t result = do_write(itr.buf, itr.bufsz);
if(result == -1)
return;
ssize_t dlt = itr.bufsz - result;
if(dlt > 0)
{
// queue remaining to front of queue
WriteBuffer buff(itr.buf + dlt, itr.bufsz - dlt);
m_BlockingWriteQueue->pop_front();
m_BlockingWriteQueue->push_front(buff);
// TODO: errno?
return;
}
m_BlockingWriteQueue->pop_front();
if(errno == EAGAIN || errno == EWOULDBLOCK)
{
errno = 0;
return;
}
}
}
}
@ -475,7 +512,22 @@ namespace llarp
flush_write();
void
error();
flush_write_buffers(size_t a)
{
connected();
ev_io::flush_write_buffers(a);
}
void
error()
{
if(_conn)
{
llarp::LogError("tcp_conn error: ", strerror(errno));
if(_conn->error)
_conn->error(_conn);
}
}
virtual ssize_t
do_write(void* buf, size_t sz);
@ -548,12 +600,13 @@ struct llarp_ev_loop
virtual llarp::ev_io*
bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* addr) = 0;
virtual llarp::ev_io*
/// return false on socket error (non blocking)
virtual bool
tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr) = 0;
/// register event listener
virtual bool
add_ev(llarp::ev_io* ev, bool write = false) = 0;
add_ev(llarp::ev_io* ev, bool write) = 0;
virtual bool
running() const = 0;

@ -87,17 +87,6 @@ namespace llarp
}
}
void
tcp_conn::error()
{
if(_conn)
{
llarp::LogError("tcp_conn error: ", strerror(errno));
if(_conn->error)
_conn->error(_conn);
}
}
int
tcp_serv::read(void*, size_t)
{
@ -272,28 +261,29 @@ struct llarp_epoll_loop : public llarp_ev_loop
{
}
llarp::ev_io*
bool
tcp_connect(struct llarp_tcp_connecter* tcp, const sockaddr* remoteaddr)
{
// create socket
int fd = ::socket(remoteaddr->sa_family, SOCK_STREAM, 0);
if(fd == -1)
return nullptr;
return false;
// set non blocking
int flags = fcntl(fd, F_GETFL, 0);
if(flags == -1)
{
::close(fd);
return nullptr;
return false;
}
if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
{
::close(fd);
return nullptr;
return false;
}
llarp::tcp_conn* conn = new llarp::tcp_conn(this, fd, remoteaddr, tcp);
add_ev(conn, true);
conn->connect();
return conn;
return true;
}
llarp::ev_io*

@ -29,25 +29,42 @@ namespace llarp
int
tcp_conn::read(void* buf, size_t sz)
{
if(sz == 0)
{
if(tcp.read)
tcp.read(&tcp, 0, 0);
return 0;
}
if(_shouldClose)
return -1;
ssize_t amount = ::read(fd, buf, sz);
if(amount > 0)
if(amount >= 0)
{
if(tcp->read)
tcp->read(tcp, buf, amount);
if(tcp.read)
tcp.read(&tcp, buf, amount);
}
else
{
// error
if(errno == EAGAIN || errno == EWOULDBLOCK)
{
errno = 0;
return 0;
}
_shouldClose = true;
return -1;
}
return 0;
}
void
tcp_conn::flush_write()
{
connected();
ev_io::flush_write();
}
ssize_t
tcp_conn::do_write(void* buf, size_t sz)
{
@ -62,33 +79,65 @@ namespace llarp
#endif
}
void
tcp_conn::connect()
{
socklen_t slen = sizeof(sockaddr_in);
if(_addr.ss_family == AF_UNIX)
slen = sizeof(sockaddr_un);
else if(_addr.ss_family == AF_INET6)
slen = sizeof(sockaddr_in6);
int result = ::connect(fd, (const sockaddr*)&_addr, slen);
if(result == 0)
{
llarp::LogDebug("Connected");
connected();
}
else if(errno == EINPROGRESS)
{
llarp::LogDebug("connect in progress");
errno = 0;
return;
}
else if(_conn)
{
_conn->error(_conn);
}
}
int
tcp_serv::read(void*, size_t)
{
int new_fd = ::accept(fd, nullptr, nullptr);
if(new_fd == -1)
{
llarp::LogError("failed to accept on ", fd, ":", strerror(errno));
llarp::LogError("failed to accept on ", fd, ": ", strerror(errno));
return -1;
}
// get flags
int flags = fcntl(new_fd, F_GETFL, 0);
if(flags == -1)
{
::close(new_fd);
return -1;
}
// set flags
if(fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
{
llarp::LogError("Failed to set non block on ", fd, ": ", strerror(errno));
::close(new_fd);
return -1;
}
llarp_tcp_conn* conn = new llarp_tcp_conn;
// zero out callbacks
conn->tick = nullptr;
conn->closed = nullptr;
conn->read = nullptr;
// build handler
llarp::tcp_conn* connimpl = new tcp_conn(new_fd, conn);
conn->impl = connimpl;
conn->loop = loop;
llarp::tcp_conn* connimpl = new llarp::tcp_conn(loop, new_fd);
if(loop->add_ev(connimpl, true))
{
// call callback
if(tcp->accepted)
tcp->accepted(tcp, conn);
tcp->accepted(tcp, &connimpl->tcp);
return 0;
}
// cleanup error
delete conn;
delete connimpl;
return -1;
}
@ -248,7 +297,6 @@ namespace llarp
struct llarp_kqueue_loop : public llarp_ev_loop
{
int kqueuefd;
struct kevent change; /* event we want to monitor */
llarp_kqueue_loop() : kqueuefd(-1)
{
@ -279,6 +327,18 @@ struct llarp_kqueue_loop : public llarp_ev_loop
::close(fd);
return nullptr;
}
// set non blocking
int flags = fcntl(fd, F_GETFL, 0);
if(flags == -1)
{
::close(fd);
return nullptr;
}
if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
{
::close(fd);
return nullptr;
}
llarp::ev_io* serv = new llarp::tcp_serv(this, fd, tcp);
tcp->impl = serv;
return serv;
@ -314,6 +374,30 @@ struct llarp_kqueue_loop : public llarp_ev_loop
return kqueuefd != -1;
}
bool
tcp_connect(llarp_tcp_connecter* tcp, const sockaddr* addr)
{
int fd = ::socket(addr->sa_family, SOCK_STREAM, 0);
if(fd == -1)
return false;
int flags = fcntl(fd, F_GETFL, 0);
if(flags == -1)
{
::close(fd);
return false;
}
if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
{
::close(fd);
return false;
}
llarp::tcp_conn* conn = new llarp::tcp_conn(this, fd, addr, tcp);
add_ev(conn, true);
conn->connect();
return true;
}
int
tick(int ms)
{
@ -333,9 +417,10 @@ struct llarp_kqueue_loop : public llarp_ev_loop
if(ev)
{
if(events[idx].filter & EVFILT_READ)
ev->read(readbuf, sizeof(readbuf));
ev->read(readbuf,
std::min(sizeof(readbuf), size_t(events[idx].data)));
if(events[idx].filter & EVFILT_WRITE)
ev->flush_write();
ev->flush_write_buffers(events[idx].data);
}
++idx;
}
@ -350,7 +435,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop
{
timespec t;
t.tv_sec = 0;
t.tv_nsec = 1000UL * EV_TICK_INTERVAL;
t.tv_nsec = 1000000UL * EV_TICK_INTERVAL;
struct kevent events[1024];
int result;
do
@ -366,9 +451,10 @@ struct llarp_kqueue_loop : public llarp_ev_loop
if(ev)
{
if(events[idx].filter & EVFILT_READ)
ev->read(readbuf, sizeof(readbuf));
ev->read(readbuf,
std::min(sizeof(readbuf), size_t(events[idx].data)));
if(events[idx].filter & EVFILT_WRITE)
ev->flush_write();
ev->flush_write_buffers(events[idx].data);
}
else
{
@ -453,8 +539,8 @@ struct llarp_kqueue_loop : public llarp_ev_loop
bool
close_ev(llarp::ev_io* ev)
{
EV_SET(&change, ev->fd, ev->flags, EV_DELETE, 0, 0, nullptr);
return kevent(kqueuefd, &change, 1, nullptr, 0, nullptr) != -1;
EV_SET(&ev->change, ev->fd, ev->flags, EV_DELETE, 0, 0, nullptr);
return kevent(kqueuefd, &ev->change, 1, nullptr, 0, nullptr) != -1;
}
llarp::ev_io*
@ -469,18 +555,27 @@ struct llarp_kqueue_loop : public llarp_ev_loop
}
bool
add_ev(llarp::ev_io* ev, bool write)
add_ev(llarp::ev_io* ev, bool w)
{
ev->flags = EVFILT_READ;
if(write)
ev->flags |= EVFILT_WRITE;
EV_SET(&change, ev->fd, ev->flags, EV_ADD, 0, 0, ev);
if(kevent(kqueuefd, &change, 1, nullptr, 0, nullptr) == -1)
EV_SET(&ev->change, ev->fd, EVFILT_READ, EV_ADD, 0, 0, ev);
if(kevent(kqueuefd, &ev->change, 1, nullptr, 0, nullptr) == -1)
{
llarp::LogError("Failed to add event: ", strerror(errno));
delete ev;
return false;
}
if(w)
{
ev->flags |= EVFILT_WRITE;
EV_SET(&ev->change, ev->fd, EVFILT_WRITE, EV_ADD, 0, 0, ev);
if(kevent(kqueuefd, &ev->change, 1, nullptr, 0, nullptr) == -1)
{
llarp::LogError("Failed to add event: ", strerror(errno));
delete ev;
return false;
}
}
handlers.emplace_back(ev);
return true;
}

@ -245,8 +245,7 @@ namespace llarp
}
// construct
service = std::unique_ptr< llarp::service::Endpoint >(
itr->second(conf.first, m_Router));
service.reset(itr->second(conf.first, m_Router));
}
// configure
for(const auto &option : conf.second)

Loading…
Cancel
Save