Fix a possible sync issue between thr load and conn children list on error

Refactor and rename functions, struct fields, and vars
Simplify if conditions and fix/improve logs
Clean up
pull/48/head
Soner Tari 4 years ago
parent 757ed35687
commit 1d75bfb17f

@ -206,7 +206,7 @@ log_dbg_printf(const char *fmt, ...)
}
int
log_dbg_level_printf(int level, const char *function, int thridx, long long unsigned int id, evutil_socket_t fd, evutil_socket_t child_fd, const char *fmt, ...)
log_dbg_level_printf(int level, const char *function, int thrid, long long unsigned int id, evutil_socket_t fd, evutil_socket_t child_fd, const char *fmt, ...)
{
va_list ap;
char *buf;
@ -224,7 +224,7 @@ log_dbg_level_printf(int level, const char *function, int thridx, long long unsi
char *logbuf;
// The *_main and *_main_va macros always pass 0 as fd, and the other macros fd > 0
if (fd) {
rv = asprintf(&logbuf, "[%s] [%d.%llu fd=%d cfd=%d] %s: %s\n", log_dbg_mode_names[level], thridx, id, fd, child_fd, function, buf);
rv = asprintf(&logbuf, "[%s] [%d.%llu fd=%d cfd=%d] %s: %s\n", log_dbg_mode_names[level], thrid, id, fd, child_fd, function, buf);
} else {
rv = asprintf(&logbuf, "[%s] %s: %s\n", log_dbg_mode_names[level], function, buf);
}

@ -68,17 +68,17 @@ void log_dbg_mode(int);
#define log_fine_main_va(format_str, ...) \
log_dbg_level_printf(LOG_DBG_MODE_FINE, __FUNCTION__, 0, 0, 0, 0, (format_str), __VA_ARGS__)
#define log_fine(str) \
log_dbg_level_printf(LOG_DBG_MODE_FINE, __FUNCTION__, ctx->conn->thr ? ctx->conn->thr->thridx : 0, ctx->conn->id, ctx->conn->fd, ctx->conn->child_fd, (str))
log_dbg_level_printf(LOG_DBG_MODE_FINE, __FUNCTION__, ctx->conn->thr ? ctx->conn->thr->id : 0, ctx->conn->id, ctx->conn->fd, ctx->conn->child_fd, (str))
#define log_fine_va(format_str, ...) \
log_dbg_level_printf(LOG_DBG_MODE_FINE, __FUNCTION__, ctx->conn->thr ? ctx->conn->thr->thridx : 0, ctx->conn->id, ctx->conn->fd, ctx->conn->child_fd, (format_str), __VA_ARGS__)
log_dbg_level_printf(LOG_DBG_MODE_FINE, __FUNCTION__, ctx->conn->thr ? ctx->conn->thr->id : 0, ctx->conn->id, ctx->conn->fd, ctx->conn->child_fd, (format_str), __VA_ARGS__)
// FINER
#define log_finer_main_va(format_str, ...) \
log_dbg_level_printf(LOG_DBG_MODE_FINER, __FUNCTION__, 0, 0, 0, 0, (format_str), __VA_ARGS__)
#define log_finer(str) \
log_dbg_level_printf(LOG_DBG_MODE_FINER, __FUNCTION__, ctx->conn->thr ? ctx->conn->thr->thridx : 0, ctx->conn->id, ctx->conn->fd, ctx->conn->child_fd, (str))
log_dbg_level_printf(LOG_DBG_MODE_FINER, __FUNCTION__, ctx->conn->thr ? ctx->conn->thr->id : 0, ctx->conn->id, ctx->conn->fd, ctx->conn->child_fd, (str))
#define log_finer_va(format_str, ...) \
log_dbg_level_printf(LOG_DBG_MODE_FINER, __FUNCTION__, ctx->conn->thr ? ctx->conn->thr->thridx : 0, ctx->conn->id, ctx->conn->fd, ctx->conn->child_fd, (format_str), __VA_ARGS__)
log_dbg_level_printf(LOG_DBG_MODE_FINER, __FUNCTION__, ctx->conn->thr ? ctx->conn->thr->id : 0, ctx->conn->id, ctx->conn->fd, ctx->conn->child_fd, (format_str), __VA_ARGS__)
// FINEST
#define log_finest_main(str) \
@ -86,9 +86,9 @@ void log_dbg_mode(int);
#define log_finest_main_va(format_str, ...) \
log_dbg_level_printf(LOG_DBG_MODE_FINEST, __FUNCTION__, 0, 0, 0, 0, (format_str), __VA_ARGS__)
#define log_finest(str) \
log_dbg_level_printf(LOG_DBG_MODE_FINEST, __FUNCTION__, ctx->conn->thr ? ctx->conn->thr->thridx : 0, ctx->conn->id, ctx->conn->fd, ctx->conn->child_fd, (str))
log_dbg_level_printf(LOG_DBG_MODE_FINEST, __FUNCTION__, ctx->conn->thr ? ctx->conn->thr->id : 0, ctx->conn->id, ctx->conn->fd, ctx->conn->child_fd, (str))
#define log_finest_va(format_str, ...) \
log_dbg_level_printf(LOG_DBG_MODE_FINEST, __FUNCTION__, ctx->conn->thr ? ctx->conn->thr->thridx : 0, ctx->conn->id, ctx->conn->fd, ctx->conn->child_fd, (format_str), __VA_ARGS__)
log_dbg_level_printf(LOG_DBG_MODE_FINEST, __FUNCTION__, ctx->conn->thr ? ctx->conn->thr->id : 0, ctx->conn->id, ctx->conn->fd, ctx->conn->child_fd, (format_str), __VA_ARGS__)
#else /* !DEBUG_PROXY */
#define log_fine_main_va(format_str, ...) ((void)0)
#define log_fine(str) ((void)0)

@ -153,9 +153,6 @@ pxy_conn_ctx_new_child(evutil_socket_t fd, pxy_conn_ctx_t *ctx)
free(child_ctx);
return NULL;
}
// @attention Child connections use the parent's event bases, otherwise we would get multithreading issues
pxy_thr_inc_load(ctx->thr);
return child_ctx;
}
@ -164,8 +161,6 @@ pxy_conn_ctx_free_child(pxy_conn_child_ctx_t *ctx)
{
log_finest("ENTER");
pxy_thr_dec_load(ctx->conn->thr);
// If the proto doesn't have special args, proto_free() callback is NULL
if (ctx->protoctx->proto_free) {
ctx->protoctx->proto_free(ctx);
@ -174,13 +169,34 @@ pxy_conn_ctx_free_child(pxy_conn_child_ctx_t *ctx)
free(ctx);
}
// This function cannot fail.
static void NONNULL(1)
pxy_conn_attach_child(pxy_conn_child_ctx_t *ctx)
{
log_finest("Adding child conn");
// @attention Child connections use the parent's event bases, otherwise we would get multithreading issues
// Always keep thr load and conns list in sync
ctx->conn->thr->load++;
ctx->conn->thr->max_load = MAX(ctx->conn->thr->max_load, ctx->conn->thr->load);
// Prepend child to the children list of parent
ctx->next = ctx->conn->children;
ctx->conn->children = ctx;
if (ctx->next)
ctx->next->prev = ctx;
}
// This function cannot fail.
static void NONNULL(1)
pxy_conn_remove_child(pxy_conn_child_ctx_t *ctx)
pxy_conn_detach_child(pxy_conn_child_ctx_t *ctx)
{
assert(ctx->conn != NULL);
assert(ctx->conn->children != NULL);
log_finest("ENTER");
log_finest("Removing child conn");
ctx->conn->thr->load--;
if (ctx->prev) {
ctx->prev->next = ctx->next;
@ -240,7 +256,7 @@ pxy_conn_free_child(pxy_conn_child_ctx_t *ctx)
ctx->dst.bev = NULL;
}
pxy_conn_remove_child(ctx);
pxy_conn_detach_child(ctx);
pxy_conn_ctx_free_child(ctx);
}
@ -978,16 +994,10 @@ pxy_listener_acceptcb_child(UNUSED struct evconnlistener *listener, evutil_socke
pxy_conn_term(ctx, 1);
goto out;
}
ctx->thr->max_load = MAX(ctx->thr->max_load, pxy_thr_get_load(ctx->thr));
pxy_conn_attach_child(child_ctx);
ctx->child_count++;
// Prepend child to the children list of parent
child_ctx->next = ctx->children;
ctx->children = child_ctx;
if (child_ctx->next)
child_ctx->next->prev = child_ctx;
// @attention Do not enable src events here yet, they will be enabled after dst connects
if (prototcp_setup_src_child(child_ctx) == -1) {
goto out;
@ -1043,14 +1053,14 @@ pxy_opensock_child(pxy_conn_ctx_t *ctx)
evutil_socket_t fd = socket(ctx->spec->child_src_addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
if (fd == -1) {
log_err_level_printf(LOG_CRIT, "Error from socket(): %s (%i)\n", strerror(errno), errno);
log_fine_va("Error from socket(): %s (%i)\n", strerror(errno), errno);
log_fine_va("Error from socket(): %s (%i)", strerror(errno), errno);
evutil_closesocket(fd);
return -1;
}
if (evutil_make_socket_nonblocking(fd) == -1) {
log_err_level_printf(LOG_CRIT, "Error making socket nonblocking: %s (%i)\n", strerror(errno), errno);
log_fine_va("Error making socket nonblocking: %s (%i)\n", strerror(errno), errno);
log_fine_va("Error making socket nonblocking: %s (%i)", strerror(errno), errno);
evutil_closesocket(fd);
return -1;
}
@ -1058,21 +1068,21 @@ pxy_opensock_child(pxy_conn_ctx_t *ctx)
int on = 1;
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&on, sizeof(on)) == -1) {
log_err_level_printf(LOG_CRIT, "Error from setsockopt(SO_KEEPALIVE): %s (%i)\n", strerror(errno), errno);
log_fine_va("Error from setsockopt(SO_KEEPALIVE): %s (%i)\n", strerror(errno), errno);
log_fine_va("Error from setsockopt(SO_KEEPALIVE): %s (%i)", strerror(errno), errno);
evutil_closesocket(fd);
return -1;
}
if (evutil_make_listen_socket_reuseable(fd) == -1) {
log_err_level_printf(LOG_CRIT, "Error from setsockopt(SO_REUSABLE): %s\n", strerror(errno));
log_fine_va("Error from setsockopt(SO_REUSABLE): %s\n", strerror(errno));
log_err_level_printf(LOG_CRIT, "Error from setsockopt(SO_REUSABLE): %s (%i)\n", strerror(errno), errno);
log_fine_va("Error from setsockopt(SO_REUSABLE): %s (%i)", strerror(errno), errno);
evutil_closesocket(fd);
return -1;
}
if (bind(fd, (struct sockaddr *)&ctx->spec->child_src_addr, ctx->spec->child_src_addrlen) == -1) {
log_err_level_printf(LOG_CRIT, "Error from bind(): %s\n", strerror(errno));
log_fine_va("Error from bind(): %s\n", strerror(errno));
log_err_level_printf(LOG_CRIT, "Error from bind(): %s (%i)\n", strerror(errno), errno);
log_fine_va("Error from bind(): %s (%i)", strerror(errno), errno);
evutil_closesocket(fd);
return -1;
}
@ -1087,9 +1097,10 @@ pxy_setup_child_listener(pxy_conn_ctx_t *ctx)
// Child evcls use the evbase of the parent thread, otherwise we would get multithreading issues.
// We don't need a privsep call to open a socket for child listener,
// because listener port of child conns are assigned by the system, hence are from non-privileged range above 1024
if ((ctx->child_fd = pxy_opensock_child(ctx)) == -1) {
ctx->child_fd = pxy_opensock_child(ctx);
if (ctx->child_fd < 0) {
log_err_level_printf(LOG_CRIT, "Error opening child socket: %s (%i)\n", strerror(errno), errno);
log_fine_va("Error opening child socket: %s (%i)\n", strerror(errno), errno);
log_fine_va("Error opening child socket: %s (%i)", strerror(errno), errno);
pxy_conn_term(ctx, 1);
return -1;
}
@ -1410,7 +1421,7 @@ pxy_bev_eventcb_postexec_logging_and_stats(struct bufferevent *bev, short events
}
if (bev == ctx->srvdst.bev) {
ctx->thr->max_load = MAX(ctx->thr->max_load, pxy_thr_get_load(ctx->thr));
ctx->thr->max_load = MAX(ctx->thr->max_load, ctx->thr->load);
ctx->thr->max_fd = MAX(ctx->thr->max_fd, ctx->fd);
// src and other fd stats are collected in acceptcb functions

@ -34,32 +34,6 @@
#include <assert.h>
#include <sys/param.h>
size_t
pxy_thr_get_load(pxy_thr_ctx_t *tctx)
{
size_t load;
//pthread_mutex_lock(&tctx->mutex);
load = tctx->load;
//pthread_mutex_unlock(&tctx->mutex);
return load;
}
void
pxy_thr_inc_load(pxy_thr_ctx_t *tctx)
{
//pthread_mutex_lock(&tctx->mutex);
tctx->load++;
//pthread_mutex_unlock(&tctx->mutex);
}
void
pxy_thr_dec_load(pxy_thr_ctx_t *tctx)
{
//pthread_mutex_lock(&tctx->mutex);
tctx->load--;
//pthread_mutex_unlock(&tctx->mutex);
}
/*
* Attach a connection to its thread.
* This function cannot fail.
@ -74,7 +48,7 @@ pxy_thr_attach(pxy_conn_ctx_t *ctx)
log_finest("Adding conn");
// Always keep thr load and conns list in sync
pxy_thr_inc_load(ctx->thr);
ctx->thr->load++;
ctx->next = ctx->thr->conns;
ctx->thr->conns = ctx;
@ -97,7 +71,7 @@ pxy_thr_detach(pxy_conn_ctx_t *ctx)
log_finest("Removing conn");
// We increment thr load in pxy_conn_init() only (for parent conns)
pxy_thr_dec_load(ctx->thr);
ctx->thr->load--;
if (ctx->prev) {
ctx->prev->next = ctx->next;
@ -155,14 +129,14 @@ pxy_thr_get_expired_conns(pxy_thr_ctx_t *tctx, pxy_conn_ctx_t **expired_conns)
if (tctx->thrmgr->global->statslog) {
ctx = *expired_conns;
while (ctx) {
log_finest_main_va("thr=%d, fd=%d, child_fd=%d, time=%lld, src_addr=%s:%s, dst_addr=%s:%s, user=%s, valid=%d",
ctx->thr->thridx, ctx->fd, ctx->child_fd, (long long)(now - ctx->atime),
log_finest_main_va("thr=%d, id=%llu, fd=%d, child_fd=%d, time=%lld, src_addr=%s:%s, dst_addr=%s:%s, user=%s, valid=%d",
ctx->thr->id, ctx->id, ctx->fd, ctx->child_fd, (long long)(now - ctx->atime),
STRORDASH(ctx->srchost_str), STRORDASH(ctx->srcport_str), STRORDASH(ctx->dsthost_str), STRORDASH(ctx->dstport_str),
STRORDASH(ctx->user), ctx->protoctx->is_valid);
char *msg;
if (asprintf(&msg, "EXPIRED: thr=%d, time=%lld, src_addr=%s:%s, dst_addr=%s:%s, user=%s, valid=%d\n",
ctx->thr->thridx, (long long)(now - ctx->atime),
if (asprintf(&msg, "EXPIRED: thr=%d, id=%llu, time=%lld, src_addr=%s:%s, dst_addr=%s:%s, user=%s, valid=%d\n",
ctx->thr->id, ctx->id, (long long)(now - ctx->atime),
STRORDASH(ctx->srchost_str), STRORDASH(ctx->srcport_str), STRORDASH(ctx->dsthost_str), STRORDASH(ctx->dstport_str),
STRORDASH(ctx->user), ctx->protoctx->is_valid) < 0) {
break;
@ -180,17 +154,13 @@ pxy_thr_get_expired_conns(pxy_thr_ctx_t *tctx, pxy_conn_ctx_t **expired_conns)
}
static evutil_socket_t
pxy_thr_print_children(pxy_conn_child_ctx_t *ctx,
#ifdef DEBUG_PROXY
unsigned int parent_idx,
#endif /* DEBUG_PROXY */
evutil_socket_t max_fd)
pxy_thr_print_children(pxy_conn_child_ctx_t *ctx)
{
evutil_socket_t max_fd = 0;
while (ctx) {
// @attention No need to log child stats
log_finest_main_va("CHILD CONN: thr=%d, id=%d, pid=%u, src=%d, dst=%d, c=%d-%d",
ctx->conn->thr->thridx, ctx->conn->child_count, parent_idx, ctx->fd, ctx->dst_fd, ctx->src.closed, ctx->dst.closed);
// No need to log child stats
log_finest_main_va("CHILD CONN: thr=%d, id=%llu, cid=%d, src=%d, dst=%d, c=%d-%d",
ctx->conn->thr->id, ctx->conn->id, ctx->conn->child_count, ctx->fd, ctx->dst_fd, ctx->src.closed, ctx->dst.closed);
max_fd = MAX(max_fd, MAX(ctx->fd, ctx->dst_fd));
ctx = ctx->next;
}
@ -200,9 +170,8 @@ pxy_thr_print_children(pxy_conn_child_ctx_t *ctx,
static void
pxy_thr_print_info(pxy_thr_ctx_t *tctx)
{
log_finest_main_va("thr=%d, load=%zu", tctx->thridx, pxy_thr_get_load(tctx));
log_finest_main_va("thr=%d, load=%zu", tctx->id, tctx->load);
unsigned int idx = 1;
evutil_socket_t max_fd = 0;
time_t max_atime = 0;
time_t max_ctime = 0;
@ -217,8 +186,8 @@ pxy_thr_print_info(pxy_thr_ctx_t *tctx)
time_t atime = now - ctx->atime;
time_t ctime = now - ctx->ctime;
log_finest_main_va("PARENT CONN: thr=%d, id=%u, fd=%d, child_fd=%d, dst=%d, srvdst=%d, child_src=%d, child_dst=%d, p=%d-%d-%d c=%d-%d, ce=%d cc=%d, at=%lld ct=%lld, src_addr=%s:%s, dst_addr=%s:%s, user=%s, valid=%d",
tctx->thridx, idx, ctx->fd, ctx->child_fd, ctx->dst_fd, ctx->srvdst_fd, ctx->child_src_fd, ctx->child_dst_fd,
log_finest_main_va("PARENT CONN: thr=%d, id=%llu, fd=%d, child_fd=%d, dst=%d, srvdst=%d, child_src=%d, child_dst=%d, p=%d-%d-%d c=%d-%d, ce=%d cc=%d, at=%lld ct=%lld, src_addr=%s:%s, dst_addr=%s:%s, user=%s, valid=%d",
tctx->id, ctx->id, ctx->fd, ctx->child_fd, ctx->dst_fd, ctx->srvdst_fd, ctx->child_src_fd, ctx->child_dst_fd,
ctx->src.closed, ctx->dst.closed, ctx->srvdst.closed, ctx->children ? ctx->children->src.closed : 0, ctx->children ? ctx->children->dst.closed : 0,
ctx->children ? 1:0, ctx->child_count, (long long)atime, (long long)ctime,
STRORDASH(ctx->srchost_str), STRORDASH(ctx->srcport_str), STRORDASH(ctx->dsthost_str), STRORDASH(ctx->dstport_str),
@ -226,8 +195,8 @@ pxy_thr_print_info(pxy_thr_ctx_t *tctx)
// @attention Report idle connections only, i.e. the conns which have been idle since the last time we checked for expired conns
if (atime >= (time_t)tctx->thrmgr->global->expired_conn_check_period) {
if (asprintf(&smsg, "IDLE: thr=%d, id=%u, ce=%d cc=%d, at=%lld ct=%lld, src_addr=%s:%s, dst_addr=%s:%s, user=%s, valid=%d\n",
tctx->thridx, idx, ctx->children ? 1:0, ctx->child_count, (long long)atime, (long long)ctime,
if (asprintf(&smsg, "IDLE: thr=%d, id=%llu, ce=%d cc=%d, at=%lld ct=%lld, src_addr=%s:%s, dst_addr=%s:%s, user=%s, valid=%d\n",
tctx->id, ctx->id, ctx->children ? 1:0, ctx->child_count, (long long)atime, (long long)ctime,
STRORDASH(ctx->srchost_str), STRORDASH(ctx->srcport_str), STRORDASH(ctx->dsthost_str), STRORDASH(ctx->dstport_str),
STRORDASH(ctx->user), ctx->protoctx->is_valid) < 0) {
return;
@ -247,24 +216,18 @@ pxy_thr_print_info(pxy_thr_ctx_t *tctx)
max_ctime = MAX(max_ctime, ctime);
if (ctx->children) {
max_fd = pxy_thr_print_children(ctx->children,
#ifdef DEBUG_PROXY
idx,
#endif /* DEBUG_PROXY */
max_fd);
max_fd = MAX(max_fd, pxy_thr_print_children(ctx->children));
}
idx++;
ctx = ctx->next;
}
}
log_finest_main_va("thr=%d, mld=%zu, mfd=%d, mat=%lld, mct=%lld, iib=%llu, iob=%llu, eib=%llu, eob=%llu, swm=%zu, uwm=%zu, to=%zu, err=%zu, si=%u",
tctx->thridx, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes,
tctx->id, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes,
tctx->set_watermarks, tctx->unset_watermarks, tctx->timedout_conns, tctx->errors, tctx->stats_id);
if (asprintf(&smsg, "STATS: thr=%d, mld=%zu, mfd=%d, mat=%lld, mct=%lld, iib=%llu, iob=%llu, eib=%llu, eob=%llu, swm=%zu, uwm=%zu, to=%zu, err=%zu, si=%u\n",
tctx->thridx, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes,
tctx->id, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes,
tctx->set_watermarks, tctx->unset_watermarks, tctx->timedout_conns, tctx->errors, tctx->stats_id) < 0) {
return;
}
@ -287,7 +250,7 @@ pxy_thr_print_info(pxy_thr_ctx_t *tctx)
// Reset these stats with the current values (do not reset to 0 directly, there may be active conns)
tctx->max_fd = max_fd;
tctx->max_load = pxy_thr_get_load(tctx);
tctx->max_load = tctx->load;
}
/*
@ -297,12 +260,12 @@ pxy_thr_print_info(pxy_thr_ctx_t *tctx)
static void
pxy_thr_timer_cb(UNUSED evutil_socket_t fd, UNUSED short what, UNUSED void *arg)
{
pxy_thr_ctx_t *ctx = arg;
pxy_thr_ctx_t *tctx = arg;
log_finest_main_va("thr=%d, load=%zu, to=%u", ctx->thridx, pxy_thr_get_load(ctx), ctx->timeout_count);
log_finest_main_va("thr=%d, load=%zu, to=%u", tctx->id, tctx->load, tctx->timeout_count);
pxy_conn_ctx_t *expired = NULL;
pxy_thr_get_expired_conns(ctx, &expired);
pxy_thr_get_expired_conns(tctx, &expired);
#ifdef DEBUG_PROXY
if (expired) {
@ -312,11 +275,11 @@ pxy_thr_timer_cb(UNUSED evutil_socket_t fd, UNUSED short what, UNUSED void *arg)
pxy_conn_ctx_t *next = expired->next_expired;
log_fine_main_va("Delete timed out conn thr=%d, fd=%d, child_fd=%d, at=%lld ct=%lld",
expired->thr->thridx, expired->fd, expired->child_fd, (long long)(now - expired->atime), (long long)(now - expired->ctime));
expired->thr->id, expired->fd, expired->child_fd, (long long)(now - expired->atime), (long long)(now - expired->ctime));
// @attention Do not call the term function here, free the conn directly
pxy_conn_free(expired, 1);
ctx->timedout_conns++;
tctx->timedout_conns++;
expired = next;
}
@ -325,11 +288,11 @@ pxy_thr_timer_cb(UNUSED evutil_socket_t fd, UNUSED short what, UNUSED void *arg)
#endif /* DEBUG_PROXY */
// @attention Print thread info only if stats logging is enabled, if disabled debug logs are not printed either
if (ctx->thrmgr->global->statslog) {
ctx->timeout_count++;
if (ctx->timeout_count >= ctx->thrmgr->global->stats_period) {
ctx->timeout_count = 0;
pxy_thr_print_info(ctx);
if (tctx->thrmgr->global->statslog) {
tctx->timeout_count++;
if (tctx->timeout_count >= tctx->thrmgr->global->stats_period) {
tctx->timeout_count = 0;
pxy_thr_print_info(tctx);
}
}
}
@ -341,16 +304,16 @@ pxy_thr_timer_cb(UNUSED evutil_socket_t fd, UNUSED short what, UNUSED void *arg)
void *
pxy_thr(void *arg)
{
pxy_thr_ctx_t *ctx = arg;
struct timeval timer_delay = {ctx->thrmgr->global->expired_conn_check_period, 0};
pxy_thr_ctx_t *tctx = arg;
struct timeval timer_delay = {tctx->thrmgr->global->expired_conn_check_period, 0};
struct event *ev;
ev = event_new(ctx->evbase, -1, EV_PERSIST, pxy_thr_timer_cb, ctx);
ev = event_new(tctx->evbase, -1, EV_PERSIST, pxy_thr_timer_cb, tctx);
if (!ev)
return NULL;
evtimer_add(ev, &timer_delay);
ctx->running = 1;
event_base_dispatch(ctx->evbase);
tctx->running = 1;
event_base_dispatch(tctx->evbase);
event_free(ev);
return NULL;

@ -43,19 +43,13 @@ typedef struct pxy_thrmgr_ctx pxy_thrmgr_ctx_t;
typedef struct pxy_thr_ctx {
pthread_t thr;
int thridx;
int id;
pxy_thrmgr_ctx_t *thrmgr;
size_t load;
struct event_base *evbase;
struct evdns_base *dnsbase;
int running;
// @todo Do we need a thr mutex?
// This mutex is for thread-safe access to thr.load. But thrmgr read-accesses thr.load, and write-accesses are by thr only.
// Per-thread locking is necessary during connection setup
// to prevent multithreading issues between thrmgr thread and conn handling threads
//pthread_mutex_t mutex;
// Statistics
evutil_socket_t max_fd;
size_t max_load;
@ -79,10 +73,6 @@ typedef struct pxy_thr_ctx {
struct sqlite3_stmt *get_user;
} pxy_thr_ctx_t;
size_t pxy_thr_get_load(pxy_thr_ctx_t *) NONNULL(1);
void pxy_thr_inc_load(pxy_thr_ctx_t *) NONNULL(1);
void pxy_thr_dec_load(pxy_thr_ctx_t *) NONNULL(1);
void pxy_thr_attach(pxy_conn_ctx_t *) NONNULL(1);
void pxy_thr_detach(pxy_conn_ctx_t *) NONNULL(1);

@ -70,7 +70,7 @@ pxy_thrmgr_new(global_t *global)
int
pxy_thrmgr_run(pxy_thrmgr_ctx_t *ctx)
{
int idx = -1, dns = 0;
int i = -1, dns = 0;
dns = global_has_dns_spec(ctx->global);
@ -80,85 +80,75 @@ pxy_thrmgr_run(pxy_thrmgr_ctx_t *ctx)
}
memset(ctx->thr, 0, ctx->num_thr * sizeof(pxy_thr_ctx_t*));
for (idx = 0; idx < ctx->num_thr; idx++) {
if (!(ctx->thr[idx] = malloc(sizeof(pxy_thr_ctx_t)))) {
for (i = 0; i < ctx->num_thr; i++) {
if (!(ctx->thr[i] = malloc(sizeof(pxy_thr_ctx_t)))) {
log_dbg_printf("Failed to allocate memory\n");
goto leave;
}
memset(ctx->thr[idx], 0, sizeof(pxy_thr_ctx_t));
ctx->thr[idx]->evbase = event_base_new();
if (!ctx->thr[idx]->evbase) {
log_dbg_printf("Failed to create evbase %d\n", idx);
memset(ctx->thr[i], 0, sizeof(pxy_thr_ctx_t));
ctx->thr[i]->evbase = event_base_new();
if (!ctx->thr[i]->evbase) {
log_dbg_printf("Failed to create evbase %d\n", i);
goto leave;
}
if (dns) {
/* only create dns base if we actually need it later */
ctx->thr[idx]->dnsbase = evdns_base_new(
ctx->thr[idx]->evbase, 1);
if (!ctx->thr[idx]->dnsbase) {
log_dbg_printf("Failed to create dnsbase %d\n",
idx);
ctx->thr[i]->dnsbase = evdns_base_new(ctx->thr[i]->evbase, 1);
if (!ctx->thr[i]->dnsbase) {
log_dbg_printf("Failed to create dnsbase %d\n", i);
goto leave;
}
}
ctx->thr[idx]->load = 0;
ctx->thr[idx]->running = 0;
ctx->thr[idx]->conns = NULL;
ctx->thr[idx]->thridx = idx;
ctx->thr[idx]->timeout_count = 0;
ctx->thr[idx]->thrmgr = ctx;
if ((ctx->global->opts->user_auth || global_has_userauth_spec(ctx->global)) && sqlite3_prepare_v2(ctx->global->userdb, "SELECT user,ether,atime,desc FROM users WHERE ip = ?1", 100, &ctx->thr[idx]->get_user, NULL)) {
ctx->thr[i]->load = 0;
ctx->thr[i]->running = 0;
ctx->thr[i]->conns = NULL;
ctx->thr[i]->id = i;
ctx->thr[i]->timeout_count = 0;
ctx->thr[i]->thrmgr = ctx;
if ((ctx->global->opts->user_auth || global_has_userauth_spec(ctx->global)) && sqlite3_prepare_v2(ctx->global->userdb, "SELECT user,ether,atime,desc FROM users WHERE ip = ?1", 100, &ctx->thr[i]->get_user, NULL)) {
log_err_level_printf(LOG_CRIT, "Error preparing get_user sql stmt: %s\n", sqlite3_errmsg(ctx->global->userdb));
goto leave;
}
//if (pthread_mutex_init(&ctx->thr[idx]->mutex, NULL)) {
// log_dbg_printf("Failed to initialize thr mutex\n");
// goto leave;
//}
}
log_dbg_printf("Initialized %d connection handling threads\n",
ctx->num_thr);
log_dbg_printf("Initialized %d connection handling threads\n", ctx->num_thr);
for (idx = 0; idx < ctx->num_thr; idx++) {
if (pthread_create(&ctx->thr[idx]->thr, NULL, pxy_thr, ctx->thr[idx]))
for (i = 0; i < ctx->num_thr; i++) {
if (pthread_create(&ctx->thr[i]->thr, NULL, pxy_thr, ctx->thr[i]))
goto leave_thr;
while (!ctx->thr[idx]->running) {
while (!ctx->thr[i]->running) {
sched_yield();
}
}
log_dbg_printf("Started %d connection handling threads\n",
ctx->num_thr);
log_dbg_printf("Started %d connection handling threads\n", ctx->num_thr);
return 0;
leave_thr:
idx--;
while (idx >= 0) {
pthread_cancel(ctx->thr[idx]->thr);
pthread_join(ctx->thr[idx]->thr, NULL);
idx--;
i--;
while (i >= 0) {
pthread_cancel(ctx->thr[i]->thr);
pthread_join(ctx->thr[i]->thr, NULL);
i--;
}
idx = ctx->num_thr - 1;
i = ctx->num_thr - 1;
leave:
while (idx >= 0) {
if (ctx->thr[idx]) {
if (ctx->thr[idx]->dnsbase) {
evdns_base_free(ctx->thr[idx]->dnsbase, 0);
while (i >= 0) {
if (ctx->thr[i]) {
if (ctx->thr[i]->dnsbase) {
evdns_base_free(ctx->thr[i]->dnsbase, 0);
}
if (ctx->thr[idx]->evbase) {
event_base_free(ctx->thr[idx]->evbase);
if (ctx->thr[i]->evbase) {
event_base_free(ctx->thr[i]->evbase);
}
if (ctx->global->opts->user_auth || global_has_userauth_spec(ctx->global)) {
sqlite3_finalize(ctx->thr[idx]->get_user);
sqlite3_finalize(ctx->thr[i]->get_user);
}
//pthread_mutex_destroy(&ctx->thr[idx]->mutex);
free(ctx->thr[idx]);
free(ctx->thr[i]);
}
idx--;
i--;
}
if (ctx->thr) {
free(ctx->thr);
@ -174,25 +164,24 @@ void
pxy_thrmgr_free(pxy_thrmgr_ctx_t *ctx)
{
if (ctx->thr) {
for (int idx = 0; idx < ctx->num_thr; idx++) {
event_base_loopbreak(ctx->thr[idx]->evbase);
for (int i = 0; i < ctx->num_thr; i++) {
event_base_loopbreak(ctx->thr[i]->evbase);
sched_yield();
}
for (int idx = 0; idx < ctx->num_thr; idx++) {
pthread_join(ctx->thr[idx]->thr, NULL);
for (int i = 0; i < ctx->num_thr; i++) {
pthread_join(ctx->thr[i]->thr, NULL);
}
for (int idx = 0; idx < ctx->num_thr; idx++) {
if (ctx->thr[idx]->dnsbase) {
evdns_base_free(ctx->thr[idx]->dnsbase, 0);
for (int i = 0; i < ctx->num_thr; i++) {
if (ctx->thr[i]->dnsbase) {
evdns_base_free(ctx->thr[i]->dnsbase, 0);
}
if (ctx->thr[idx]->evbase) {
event_base_free(ctx->thr[idx]->evbase);
if (ctx->thr[i]->evbase) {
event_base_free(ctx->thr[i]->evbase);
}
if (ctx->global->opts->user_auth || global_has_userauth_spec(ctx->global)) {
sqlite3_finalize(ctx->thr[idx]->get_user);
sqlite3_finalize(ctx->thr[i]->get_user);
}
//pthread_mutex_destroy(&ctx->thr[idx]->mutex);
free(ctx->thr[idx]);
free(ctx->thr[i]);
}
free(ctx->thr);
}
@ -202,8 +191,9 @@ pxy_thrmgr_free(pxy_thrmgr_ctx_t *ctx)
/*
* Assign a new connection to a thread. Chooses the thread with the fewest
* currently active connections, returns the appropriate event bases.
* No need to be so accurate about balancing thread loads, so uses
* thread-level mutexes, instead of a thrmgr level mutex.
* No need to be so accurate about balancing thread loads,
* so does not use mutexes, thread or thrmgr level.
* @todo Check if read accesses to thr load can cause any multithreading issues.
* Returns the index of the chosen thread.
* This function cannot fail.
*/
@ -213,30 +203,29 @@ pxy_thrmgr_assign_thr(pxy_conn_ctx_t *ctx)
log_finest("ENTER");
pxy_thrmgr_ctx_t *tmctx = ctx->thrmgr;
size_t minload = pxy_thr_get_load(tmctx->thr[0]);
size_t minload = tmctx->thr[0]->load;
#ifdef DEBUG_THREAD
log_dbg_printf("===> Proxy connection handler thread status:\nthr[0]: %zu\n", minload);
#endif /* DEBUG_THREAD */
int thridx = 0;
for (int idx = 1; idx < tmctx->num_thr; idx++) {
size_t thrload = pxy_thr_get_load(tmctx->thr[idx]);
int thrid = 0;
for (int i = 1; i < tmctx->num_thr; i++) {
size_t thrload = tmctx->thr[i]->load;
if (minload > thrload) {
minload = thrload;
thridx = idx;
thrid = i;
}
#ifdef DEBUG_THREAD
log_dbg_printf("thr[%d]: %zu\n", idx, thrload);
log_dbg_printf("thr[%d]: %zu\n", i, thrload);
#endif /* DEBUG_THREAD */
}
ctx->thr = tmctx->thr[thridx];
ctx->thr = tmctx->thr[thrid];
#ifdef DEBUG_THREAD
log_dbg_printf("thridx: %d\n", thridx);
log_dbg_printf("thrid: %d\n", thrid);
#endif /* DEBUG_THREAD */
}

@ -205,7 +205,7 @@ log_dbg_printf(const char *fmt, ...)
}
int
log_dbg_level_printf(int level, const char *function, int thridx, long long unsigned int id, evutil_socket_t fd, const char *fmt, ...)
log_dbg_level_printf(int level, const char *function, int thrid, long long unsigned int id, evutil_socket_t fd, const char *fmt, ...)
{
va_list ap;
char *buf;
@ -221,7 +221,7 @@ log_dbg_level_printf(int level, const char *function, int thridx, long long unsi
return -1;
char *logbuf;
if (asprintf(&logbuf, "[%s] [%d.%llu fd=%d] %s: %s\n", log_dbg_mode_names[level], thridx, id, fd, function, buf) < 0) {
if (asprintf(&logbuf, "[%s] [%d.%llu fd=%d] %s: %s\n", log_dbg_mode_names[level], thrid, id, fd, function, buf) < 0) {
free(buf);
return -1;
}

@ -63,17 +63,17 @@ void log_dbg_mode(int);
#define log_fine_main_va(format_str, ...) \
log_dbg_level_printf(LOG_DBG_MODE_FINE, __FUNCTION__, 0, 0, 0, (format_str), __VA_ARGS__)
#define log_fine(str) \
log_dbg_level_printf(LOG_DBG_MODE_FINE, __FUNCTION__, ctx->thr ? ctx->thr->thridx : 0, ctx->id, ctx->fd, (str))
log_dbg_level_printf(LOG_DBG_MODE_FINE, __FUNCTION__, ctx->thr ? ctx->thr->id : 0, ctx->id, ctx->fd, (str))
#define log_fine_va(format_str, ...) \
log_dbg_level_printf(LOG_DBG_MODE_FINE, __FUNCTION__, ctx->thr ? ctx->thr->thridx : 0, ctx->id, ctx->fd, (format_str), __VA_ARGS__)
log_dbg_level_printf(LOG_DBG_MODE_FINE, __FUNCTION__, ctx->thr ? ctx->thr->id : 0, ctx->id, ctx->fd, (format_str), __VA_ARGS__)
// FINER
#define log_finer_main_va(format_str, ...) \
log_dbg_level_printf(LOG_DBG_MODE_FINER, __FUNCTION__, 0, 0, 0, (format_str), __VA_ARGS__)
#define log_finer(str) \
log_dbg_level_printf(LOG_DBG_MODE_FINER, __FUNCTION__, ctx->thr ? ctx->thr->thridx : 0, ctx->id, ctx->fd, (str))
log_dbg_level_printf(LOG_DBG_MODE_FINER, __FUNCTION__, ctx->thr ? ctx->thr->id : 0, ctx->id, ctx->fd, (str))
#define log_finer_va(format_str, ...) \
log_dbg_level_printf(LOG_DBG_MODE_FINER, __FUNCTION__, ctx->thr ? ctx->thr->thridx : 0, ctx->id, ctx->fd, (format_str), __VA_ARGS__)
log_dbg_level_printf(LOG_DBG_MODE_FINER, __FUNCTION__, ctx->thr ? ctx->thr->id : 0, ctx->id, ctx->fd, (format_str), __VA_ARGS__)
// FINEST
#define log_finest_main(str) \
@ -81,9 +81,9 @@ void log_dbg_mode(int);
#define log_finest_main_va(format_str, ...) \
log_dbg_level_printf(LOG_DBG_MODE_FINEST, __FUNCTION__, 0, 0, 0, (format_str), __VA_ARGS__)
#define log_finest(str) \
log_dbg_level_printf(LOG_DBG_MODE_FINEST, __FUNCTION__, ctx->thr ? ctx->thr->thridx : 0, ctx->id, ctx->fd, (str))
log_dbg_level_printf(LOG_DBG_MODE_FINEST, __FUNCTION__, ctx->thr ? ctx->thr->id : 0, ctx->id, ctx->fd, (str))
#define log_finest_va(format_str, ...) \
log_dbg_level_printf(LOG_DBG_MODE_FINEST, __FUNCTION__, ctx->thr ? ctx->thr->thridx : 0, ctx->id, ctx->fd, (format_str), __VA_ARGS__)
log_dbg_level_printf(LOG_DBG_MODE_FINEST, __FUNCTION__, ctx->thr ? ctx->thr->id : 0, ctx->id, ctx->fd, (format_str), __VA_ARGS__)
#else /* !DEBUG_PROXY */
#define log_fine_main_va(format_str, ...) ((void)0)
#define log_fine(str) ((void)0)

@ -137,7 +137,7 @@ struct pxy_conn_ctx {
time_t ctime;
// Conn last access time, used to determine expired conns
// Updated on entry to callback functions, parent or child
// Updated on entry to callback functions
time_t atime;
// Per-thread conn list, used to determine idle and expired conns, and to close them

@ -83,9 +83,8 @@ pxy_thr_detach(pxy_conn_ctx_t *ctx)
static void
pxy_thr_print_thr_info(pxy_thr_ctx_t *tctx)
{
log_finest_main_va("thr=%d, load=%lu", tctx->thridx, tctx->load);
log_finest_main_va("thr=%d, load=%lu", tctx->id, tctx->load);
unsigned int idx = 1;
evutil_socket_t max_fd = 0;
time_t max_atime = 0;
time_t max_ctime = 0;
@ -100,25 +99,24 @@ pxy_thr_print_thr_info(pxy_thr_ctx_t *tctx)
time_t atime = now - ctx->atime;
time_t ctime = now - ctx->ctime;
log_finest_main_va("CONN: thr=%d, id=%u, fd=%d, dst=%d, p=%d-%d, at=%lld ct=%lld, src_addr=%s:%s, dst_addr=%s:%s",
tctx->thridx, idx, ctx->fd, ctx->dst_fd, ctx->src.closed, ctx->dst.closed, (long long)atime, (long long)ctime,
log_finest_main_va("CONN: thr=%d, id=%llu, fd=%d, dst=%d, p=%d-%d, at=%lld ct=%lld, src_addr=%s:%s, dst_addr=%s:%s",
tctx->id, ctx->id, ctx->fd, ctx->dst_fd, ctx->src.closed, ctx->dst.closed, (long long)atime, (long long)ctime,
STRORDASH(ctx->srchost_str), STRORDASH(ctx->srcport_str), STRORDASH(ctx->dsthost_str), STRORDASH(ctx->dstport_str));
max_fd = MAX(max_fd, MAX(ctx->fd, ctx->dst_fd));
max_atime = MAX(max_atime, atime);
max_ctime = MAX(max_ctime, ctime);
idx++;
ctx = ctx->next;
}
}
log_finest_main_va("STATS: thr=%d, mld=%zu, mfd=%d, mat=%lld, mct=%lld, iib=%llu, iob=%llu, eib=%llu, eob=%llu, swm=%zu, uwm=%zu, err=%zu, si=%u",
tctx->thridx, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes,
tctx->id, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes,
tctx->set_watermarks, tctx->unset_watermarks, tctx->errors, tctx->stats_id);
if (asprintf(&smsg, "STATS: thr=%d, mld=%zu, mfd=%d, mat=%lld, mct=%lld, iib=%llu, iob=%llu, eib=%llu, eob=%llu, swm=%zu, uwm=%zu, err=%zu, si=%u\n",
tctx->thridx, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes,
tctx->id, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes,
tctx->set_watermarks, tctx->unset_watermarks, tctx->errors, tctx->stats_id) < 0) {
return;
}
@ -151,16 +149,16 @@ pxy_thr_print_thr_info(pxy_thr_ctx_t *tctx)
static void
pxy_thr_timer_cb(UNUSED evutil_socket_t fd, UNUSED short what, UNUSED void *arg)
{
pxy_thr_ctx_t *ctx = arg;
pxy_thr_ctx_t *tctx = arg;
log_finest_main_va("thr=%d, load=%lu, to=%u", ctx->thridx, ctx->load, ctx->timeout_count);
log_finest_main_va("thr=%d, load=%lu, to=%u", tctx->id, tctx->load, tctx->timeout_count);
// @attention Print thread info only if stats logging is enabled, if disabled debug logs are not printed either
if (ctx->thrmgr->opts->statslog) {
ctx->timeout_count++;
if (ctx->timeout_count >= ctx->thrmgr->opts->stats_period) {
ctx->timeout_count = 0;
pxy_thr_print_thr_info(ctx);
if (tctx->thrmgr->opts->statslog) {
tctx->timeout_count++;
if (tctx->timeout_count >= tctx->thrmgr->opts->stats_period) {
tctx->timeout_count = 0;
pxy_thr_print_thr_info(tctx);
}
}
}
@ -172,16 +170,16 @@ pxy_thr_timer_cb(UNUSED evutil_socket_t fd, UNUSED short what, UNUSED void *arg)
void *
pxy_thr(void *arg)
{
pxy_thr_ctx_t *ctx = arg;
pxy_thr_ctx_t *tctx = arg;
struct timeval timer_delay = {10, 0};
struct event *ev;
ev = event_new(ctx->evbase, -1, EV_PERSIST, pxy_thr_timer_cb, ctx);
ev = event_new(tctx->evbase, -1, EV_PERSIST, pxy_thr_timer_cb, tctx);
if (!ev)
return NULL;
evtimer_add(ev, &timer_delay);
ctx->running = 1;
event_base_dispatch(ctx->evbase);
tctx->running = 1;
event_base_dispatch(tctx->evbase);
event_free(ev);
return NULL;

@ -43,7 +43,7 @@ typedef struct pxy_thrmgr_ctx pxy_thrmgr_ctx_t;
typedef struct pxy_thr_ctx {
pthread_t thr;
int thridx;
int id;
pxy_thrmgr_ctx_t *thrmgr;
size_t load;
struct event_base *evbase;

@ -70,7 +70,7 @@ pxy_thrmgr_new(opts_t *opts)
int
pxy_thrmgr_run(pxy_thrmgr_ctx_t *ctx)
{
int idx = -1;
int i = -1;
if (!(ctx->thr = malloc(ctx->num_thr * sizeof(pxy_thr_ctx_t*)))) {
log_dbg_printf("Failed to allocate memory\n");
@ -78,33 +78,33 @@ pxy_thrmgr_run(pxy_thrmgr_ctx_t *ctx)
}
memset(ctx->thr, 0, ctx->num_thr * sizeof(pxy_thr_ctx_t*));
for (idx = 0; idx < ctx->num_thr; idx++) {
if (!(ctx->thr[idx] = malloc(sizeof(pxy_thr_ctx_t)))) {
for (i = 0; i < ctx->num_thr; i++) {
if (!(ctx->thr[i] = malloc(sizeof(pxy_thr_ctx_t)))) {
log_dbg_printf("Failed to allocate memory\n");
goto leave;
}
memset(ctx->thr[idx], 0, sizeof(pxy_thr_ctx_t));
ctx->thr[idx]->evbase = event_base_new();
if (!ctx->thr[idx]->evbase) {
log_dbg_printf("Failed to create evbase %d\n", idx);
memset(ctx->thr[i], 0, sizeof(pxy_thr_ctx_t));
ctx->thr[i]->evbase = event_base_new();
if (!ctx->thr[i]->evbase) {
log_dbg_printf("Failed to create evbase %d\n", i);
goto leave;
}
ctx->thr[idx]->load = 0;
ctx->thr[idx]->running = 0;
ctx->thr[idx]->conns = NULL;
ctx->thr[idx]->thridx = idx;
ctx->thr[idx]->timeout_count = 0;
ctx->thr[idx]->thrmgr = ctx;
ctx->thr[i]->load = 0;
ctx->thr[i]->running = 0;
ctx->thr[i]->conns = NULL;
ctx->thr[i]->id = i;
ctx->thr[i]->timeout_count = 0;
ctx->thr[i]->thrmgr = ctx;
}
log_dbg_printf("Initialized %d connection handling threads\n",
ctx->num_thr);
for (idx = 0; idx < ctx->num_thr; idx++) {
if (pthread_create(&ctx->thr[idx]->thr, NULL,
pxy_thr, ctx->thr[idx]))
for (i = 0; i < ctx->num_thr; i++) {
if (pthread_create(&ctx->thr[i]->thr, NULL,
pxy_thr, ctx->thr[i]))
goto leave_thr;
while (!ctx->thr[idx]->running) {
while (!ctx->thr[i]->running) {
sched_yield();
}
}
@ -115,23 +115,23 @@ pxy_thrmgr_run(pxy_thrmgr_ctx_t *ctx)
return 0;
leave_thr:
idx--;
while (idx >= 0) {
pthread_cancel(ctx->thr[idx]->thr);
pthread_join(ctx->thr[idx]->thr, NULL);
idx--;
i--;
while (i >= 0) {
pthread_cancel(ctx->thr[i]->thr);
pthread_join(ctx->thr[i]->thr, NULL);
i--;
}
idx = ctx->num_thr - 1;
i = ctx->num_thr - 1;
leave:
while (idx >= 0) {
if (ctx->thr[idx]) {
if (ctx->thr[idx]->evbase) {
event_base_free(ctx->thr[idx]->evbase);
while (i >= 0) {
if (ctx->thr[i]) {
if (ctx->thr[i]->evbase) {
event_base_free(ctx->thr[i]->evbase);
}
free(ctx->thr[idx]);
free(ctx->thr[i]);
}
idx--;
i--;
}
if (ctx->thr) {
free(ctx->thr);
@ -147,18 +147,18 @@ void
pxy_thrmgr_free(pxy_thrmgr_ctx_t *ctx)
{
if (ctx->thr) {
for (int idx = 0; idx < ctx->num_thr; idx++) {
event_base_loopbreak(ctx->thr[idx]->evbase);
for (int i = 0; i < ctx->num_thr; i++) {
event_base_loopbreak(ctx->thr[i]->evbase);
sched_yield();
}
for (int idx = 0; idx < ctx->num_thr; idx++) {
pthread_join(ctx->thr[idx]->thr, NULL);
for (int i = 0; i < ctx->num_thr; i++) {
pthread_join(ctx->thr[i]->thr, NULL);
}
for (int idx = 0; idx < ctx->num_thr; idx++) {
if (ctx->thr[idx]->evbase) {
event_base_free(ctx->thr[idx]->evbase);
for (int i = 0; i < ctx->num_thr; i++) {
if (ctx->thr[i]->evbase) {
event_base_free(ctx->thr[i]->evbase);
}
free(ctx->thr[idx]);
free(ctx->thr[i]);
}
free(ctx->thr);
}
@ -185,24 +185,23 @@ pxy_thrmgr_assign_thr(pxy_conn_ctx_t *ctx)
log_dbg_printf("===> Proxy connection handler thread status:\nthr[0]: %zu\n", minload);
#endif /* DEBUG_THREAD */
int thridx = 0;
for (int idx = 1; idx < tmctx->num_thr; idx++) {
size_t thrload = tmctx->thr[idx]->load;
int thrid = 0;
for (int i = 1; i < tmctx->num_thr; i++) {
size_t thrload = tmctx->thr[i]->load;
if (minload > thrload) {
minload = thrload;
thridx = idx;
thrid = i;
}
#ifdef DEBUG_THREAD
log_dbg_printf("thr[%d]: %zu\n", idx, thrload);
log_dbg_printf("thr[%d]: %zu\n", i, thrload);
#endif /* DEBUG_THREAD */
}
ctx->thr = tmctx->thr[thridx];
ctx->thr = tmctx->thr[thrid];
#ifdef DEBUG_THREAD
log_dbg_printf("thridx: %d\n", thridx);
log_dbg_printf("thrid: %d\n", thrid);
#endif /* DEBUG_THREAD */
}

Loading…
Cancel
Save