Defer e2 setup until after parent is connected, to prevent multithreading issues

Refactoring, improvements
pull/13/head
Soner Tari 7 years ago
parent 4d88906d24
commit 6975175117

@ -101,7 +101,8 @@ proxy_listener_ctx_free(proxy_listener_ctx_t *ctx)
/*
* Callback for error events on the socket listener bufferevent.
*/
static void
//static void
void
proxy_listener_errorcb(struct evconnlistener *listener, UNUSED void *ctx)
{
proxy_conn_meta_ctx_t *mctx = ctx;
@ -117,7 +118,8 @@ proxy_listener_errorcb(struct evconnlistener *listener, UNUSED void *ctx)
/*
* Callback for accept events on the socket listener bufferevent.
*/
static void
//static void
void
proxy_listener_acceptcb_e2(UNUSED struct evconnlistener *listener,
evutil_socket_t fd,
struct sockaddr *peeraddr, int peeraddrlen,
@ -225,22 +227,21 @@ proxy_listener_acceptcb(UNUSED struct evconnlistener *listener,
pxy_conn_ctx_t *parent_ctx = pxy_conn_setup(fd, peeraddr, peeraddrlen, mctx);
mctx->parent_ctx = parent_ctx;
// @attention Use the evbase of the mctx thread, otherwise we get multithreading issues
struct evconnlistener *evcl2 = evconnlistener_new(mctx->thr->evbase, proxy_listener_acceptcb_e2, mctx, LEV_OPT_CLOSE_ON_FREE, 1024, fd2);
if (!evcl2) {
log_err_printf("Error creating evconnlistener e2: %s, fd=%d, fd2=%d <<<<<<\n", strerror(errno), fd, fd2);
// @attention Do not call proxy_listener_ctx_free() on evcl2, evcl2 does not have any next listener
// @todo Create a new struct for evcl2 and related functions
//proxy_listener_ctx_free(evcl2);
evconnlistener_free(evcl2);
evutil_closesocket(fd2);
return;
}
mctx->evcl2 = evcl2;
evconnlistener_set_error_cb(evcl2, proxy_listener_errorcb);
log_dbg_level_printf(LOG_DBG_MODE_FINER, ">>>>> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! proxy_listener_acceptcb: FINISHED SETTING UP E2 SUCCESS, parent fd=%d, NEW fd2=%d\n", fd, fd2);
// // @attention Use the evbase of the mctx thread, otherwise we get multithreading issues
// struct evconnlistener *evcl2 = evconnlistener_new(mctx->thr->evbase, proxy_listener_acceptcb_e2, mctx, LEV_OPT_CLOSE_ON_FREE, 1024, fd2);
// if (!evcl2) {
// log_err_printf("Error creating evconnlistener e2: %s, fd=%d, fd2=%d <<<<<<\n", strerror(errno), fd, fd2);
// // @attention Cannot call proxy_listener_ctx_free() on evcl2, evcl2 does not have any ctx with next listener
// // @todo Create a new struct for evcl2 and related functions
// //proxy_listener_ctx_free(lctxe2);
// evutil_closesocket(fd2);
// return;
// }
// mctx->evcl2 = evcl2;
//
// evconnlistener_set_error_cb(evcl2, proxy_listener_errorcb);
//
// log_dbg_level_printf(LOG_DBG_MODE_FINER, ">>>>> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! proxy_listener_acceptcb: FINISHED SETTING UP E2 SUCCESS, parent fd=%d, NEW fd2=%d\n", fd, fd2);
}
/*

@ -69,6 +69,8 @@ typedef struct proxy_conn_meta_ctx {
evutil_socket_t fd2;
struct evconnlistener *evcl2;
char *pxy_dst;
pxy_conn_ctx_t *child_ctx;
pxy_conn_child_info_t *child_info;
@ -98,6 +100,13 @@ proxy_ctx_t * proxy_new(opts_t *, int) NONNULL(1) MALLOC;
void proxy_run(proxy_ctx_t *) NONNULL(1);
void proxy_loopbreak(proxy_ctx_t *) NONNULL(1);
void proxy_free(proxy_ctx_t *) NONNULL(1);
void
proxy_listener_errorcb(struct evconnlistener *listener, UNUSED void *ctx);
void
proxy_listener_acceptcb_e2(struct evconnlistener *listener,
evutil_socket_t fd,
struct sockaddr *peeraddr, int peeraddrlen,
void *arg);
#endif /* !PROXY_H */

@ -380,6 +380,9 @@ pxy_conn_meta_ctx_free(proxy_conn_meta_ctx_t *mctx)
if (mctx->sni) {
free(mctx->sni);
}
if (mctx->pxy_dst) {
free(mctx->pxy_dst);
}
if (mctx->child_info) {
pxy_conn_child_info_free(mctx->child_info);
}
@ -2045,8 +2048,10 @@ pxy_conn_free_e2(pxy_conn_ctx_t *ctx, int free)
if (!ctx->mctx->parent_ctx && !ctx->mctx->child_ctx) {
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">############################# pxy_conn_free_e2: FREEING evcl2, pfd=%d, fd2=%d, cfd=%d\n", pfd, ctx->mctx->fd2, fd);
if (ctx->mctx->evcl2) {
evconnlistener_free(ctx->mctx->evcl2);
}
evutil_closesocket(ctx->mctx->fd2);
evconnlistener_free(ctx->mctx->evcl2);
log_dbg_level_printf(LOG_DBG_MODE_FINER, ">############################# pxy_conn_free_e2: RELEASING META CTX, fd=%d, parent fd=%d\n", fd, pfd);
rv = 2;
@ -2063,7 +2068,8 @@ pxy_conn_free_e2(pxy_conn_ctx_t *ctx, int free)
// @attention Free the parent ctx asap, we need its fds
if (parent_ctx) {
log_dbg_level_printf(LOG_DBG_MODE_FINER, ">############################# pxy_conn_free_e2: RETRY freeing parent, fd=%d, parent fd=%d\n", fd, pfd);
if (rv = pxy_conn_free(parent_ctx)) {
rv = pxy_conn_free(parent_ctx);
if (rv) {
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">############################# pxy_conn_free_e2: FREE parent SUCCESS, fd=%d, parent fd=%d\n", fd, pfd);
}
}
@ -2127,8 +2133,10 @@ pxy_conn_free(pxy_conn_ctx_t *ctx)
ctx->mctx->parent_ctx = NULL;
if (!ctx->mctx->child_ctx) {
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">############################# pxy_conn_free: FREEING evcl2, pfd=%d, fd2=%d, cfd=%d\n", fd, ctx->mctx->fd2, cfd);
if (ctx->mctx->evcl2) {
evconnlistener_free(ctx->mctx->evcl2);
}
evutil_closesocket(ctx->mctx->fd2);
evconnlistener_free(ctx->mctx->evcl2);
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">############################# pxy_conn_free: RELEASING META CTX, fd=%d, child fd=%d\n", fd, cfd);
rv = 2;
@ -2191,8 +2199,10 @@ pxy_child_conn_free(pxy_conn_ctx_t *ctx)
if (!ctx->mctx->parent_ctx && !ctx->mctx->child_ctx) {
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">############################# pxy_conn_free_e2: FREEING evcl2, pfd=%d, fd2=%d, cfd=%d\n", pfd, ctx->mctx->fd2, fd);
if (ctx->mctx->evcl2) {
evconnlistener_free(ctx->mctx->evcl2);
}
evutil_closesocket(ctx->mctx->fd2);
evconnlistener_free(ctx->mctx->evcl2);
log_dbg_level_printf(LOG_DBG_MODE_FINER, ">############################# pxy_conn_free_e2: RELEASING META CTX, fd=%d, parent fd=%d\n", fd, pfd);
} else {
@ -2240,8 +2250,10 @@ pxy_parent_conn_free(pxy_conn_ctx_t *ctx)
ctx->mctx->parent_ctx = NULL;
if (!ctx->mctx->child_ctx) {
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">############################# pxy_parent_conn_free: FREEING evcl2, pfd=%d, fd2=%d, cfd=%d\n", fd, ctx->mctx->fd2, cfd);
if (ctx->mctx->evcl2) {
evconnlistener_free(ctx->mctx->evcl2);
}
evutil_closesocket(ctx->mctx->fd2);
evconnlistener_free(ctx->mctx->evcl2);
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">############################# pxy_parent_conn_free: RELEASING META CTX, fd=%d, child fd=%d\n", fd, cfd);
} else {
@ -2271,6 +2283,33 @@ pxy_all_conn_free(proxy_conn_meta_ctx_t *mctx)
//pxy_conn_meta_ctx_free(mctx);
}
char *bev_names[] = {
"src",
"dst",
"e2src",
"e2dst",
"NULL",
"UNKWN"
};
char *
pxy_get_event_name(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
{
if (bev == ctx->src.bev) {
return bev_names[0];
} else if (bev == ctx->dst.bev) {
return bev_names[1];
} else if (bev == ctx->e2src.bev) {
return bev_names[2];
} else if (bev == ctx->e2dst.bev) {
return bev_names[3];
} else if (bev == NULL) {
return bev_names[4];
} else {
return bev_names[5];
}
}
/*
* Callback for read events on the up- and downstream connection bufferevents.
* Called when there is data ready in the input evbuffer.
@ -2290,21 +2329,23 @@ pxy_bev_readcb(struct bufferevent *bev, void *arg)
ctx->mctx->access_time = time(NULL);
char event_name[6] = "\0\0\0\0\0\0";
if (bev == ctx->src.bev) {
strcpy(event_name, "src");
} else if (bev == ctx->dst.bev) {
strcpy(event_name, "dst");
} else if (bev == ctx->e2src.bev) {
strcpy(event_name, "e2src");
} else if (bev == ctx->e2dst.bev) {
strcpy(event_name, "e2dst");
} else if (bev == NULL) {
strcpy(event_name, "NULL");
} else {
strcpy(event_name, "UNKWN");
}
// char event_name[6] = "\0\0\0\0\0\0";
// if (bev == ctx->src.bev) {
// strcpy(event_name, "src");
// } else if (bev == ctx->dst.bev) {
// strcpy(event_name, "dst");
// } else if (bev == ctx->e2src.bev) {
// strcpy(event_name, "e2src");
// } else if (bev == ctx->e2dst.bev) {
// strcpy(event_name, "e2dst");
// } else if (bev == NULL) {
// strcpy(event_name, "NULL");
// } else {
// strcpy(event_name, "UNKWN");
// }
char *event_name = pxy_get_event_name(bev, ctx);
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb: %s, fd=%d\n", event_name, ctx->fd);
if (bev == ctx->src.bev) {
@ -2318,32 +2359,37 @@ pxy_bev_readcb(struct bufferevent *bev, void *arg)
if (ctx->e2src.bev) {
struct evbuffer *inbuf = bufferevent_get_input(bev);
struct sockaddr_in e2listener_addr;
socklen_t e2listener_len;
e2listener_len = sizeof(e2listener_addr);
// @todo Check if the fd is the same for all children
if (getsockname(ctx->mctx->fd2, &e2listener_addr, &e2listener_len) < 0) {
perror("getsockname");
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb: %s, getsockname ERROR= %s, fd=%d ,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,, fd2=%d\n", event_name, strerror(errno), ctx->fd, ctx->mctx->fd2);
// @todo If getsockname() fails, terminate the connection instead?
// Leaving the packet in the buffer will eventually time out and drop the connection
if (ctx->e2src_eof) {
evbuffer_drain(inbuf, evbuffer_get_length(inbuf));
goto leave;
}
char *addr = inet_ntoa(e2listener_addr.sin_addr);
int addr_len = strlen(addr) + 5 + 3 + 1;
char *pxy_dst = malloc(addr_len);
snprintf(pxy_dst, addr_len, "[%s]:%d", addr, (int) ntohs(e2listener_addr.sin_port));
// struct sockaddr_in e2listener_addr;
// socklen_t e2listener_len;
//
// e2listener_len = sizeof(e2listener_addr);
//
// // @todo Check if the fd is the same for all children
// if (getsockname(ctx->mctx->fd2, &e2listener_addr, &e2listener_len) < 0) {
// perror("getsockname");
// log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb: %s, getsockname ERROR= %s, fd=%d ,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,, fd2=%d\n", event_name, strerror(errno), ctx->fd, ctx->mctx->fd2);
// // @todo If getsockname() fails, terminate the connection instead?
// // Leaving the packet in the buffer will eventually time out and drop the connection
// goto leave;
// }
//
// char *addr = inet_ntoa(e2listener_addr.sin_addr);
// int addr_len = strlen(addr) + 5 + 3 + 1;
//
// char *pxy_dst = malloc(addr_len);
// snprintf(pxy_dst, addr_len, "[%s]:%d", addr, (int) ntohs(e2listener_addr.sin_port));
char *custom_key = "\r\nSSLproxy-Addr: ";
size_t custom_field_len = strlen(custom_key) + strlen(pxy_dst) + 1;
size_t custom_field_len = strlen(custom_key) + strlen(ctx->mctx->pxy_dst) + 1;
char *custom_field = malloc(custom_field_len);
snprintf(custom_field, custom_field_len, "%s%s", custom_key, pxy_dst);
free(pxy_dst);
snprintf(custom_field, custom_field_len, "%s%s", custom_key, ctx->mctx->pxy_dst);
// free(pxy_dst);
log_dbg_level_printf(LOG_DBG_MODE_FINER, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb: custom_field= %s\n", custom_field);
@ -2351,6 +2397,7 @@ pxy_bev_readcb(struct bufferevent *bev, void *arg)
char *packet = malloc(packet_size + custom_field_len);
if (!packet) {
ctx->enomem = 1;
free(custom_field);
goto leave;
}
@ -2393,14 +2440,22 @@ pxy_bev_readcb(struct bufferevent *bev, void *arg)
free(custom_field);
struct evbuffer *e2outbuf = bufferevent_get_output(ctx->e2src.bev);
struct evbuffer *outbuf = bufferevent_get_output(ctx->e2src.bev);
// Decrement packet_size to avoid copying the null termination
int add_result = evbuffer_add(e2outbuf, packet, packet_size - 1);
int add_result = evbuffer_add(outbuf, packet, packet_size - 1);
if (add_result < 0) {
log_err_printf("ERROR: evbuffer_add failed\n");
}
if (evbuffer_get_length(outbuf) >= OUTBUF_LIMIT) {
/* temporarily disable data source;
* set an appropriate watermark. */
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb: setwatermark for e2src w, disable src r <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< WATERMARK");
bufferevent_setwatermark(ctx->e2src.bev, EV_WRITE, OUTBUF_LIMIT/2, OUTBUF_LIMIT);
bufferevent_disable(ctx->src.bev, EV_READ);
}
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb: src packet (size = %d), fd=%d:\n%.*s\n",
(int) packet_size, ctx->fd, (int) packet_size, packet);
// log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb: src packet (size = %d)\n", (int) packet_size);
@ -2414,6 +2469,11 @@ pxy_bev_readcb(struct bufferevent *bev, void *arg)
if (ctx->src.bev) {
struct evbuffer *inbuf = bufferevent_get_input(bev);
if (ctx->src_eof) {
evbuffer_drain(inbuf, evbuffer_get_length(inbuf));
goto leave;
}
size_t packet_size = evbuffer_get_length(inbuf);
char *packet = malloc(packet_size);
if (!packet) {
@ -2433,6 +2493,14 @@ pxy_bev_readcb(struct bufferevent *bev, void *arg)
log_err_printf("ERROR: evbuffer_add failed\n");
}
if (evbuffer_get_length(outbuf) >= OUTBUF_LIMIT) {
/* temporarily disable data source;
* set an appropriate watermark. */
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb: setwatermark for src w, disable e2src r <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< WATERMARK");
bufferevent_setwatermark(ctx->src.bev, EV_WRITE, OUTBUF_LIMIT/2, OUTBUF_LIMIT);
bufferevent_disable(ctx->e2src.bev, EV_READ);
}
// log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb: e2src packet (size = %d):\n%.*s\n",
// (int) packet_size, (int) packet_size, packet);
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb: e2src packet (size = %d)\n", (int) packet_size);
@ -2463,20 +2531,21 @@ pxy_bev_readcb_e2(struct bufferevent *bev, void *arg)
evutil_socket_t pfd = ctx->mctx->parent_ctx ? ctx->mctx->parent_ctx->fd : -1;
char event_name[6] = "\0\0\0\0\0\0";
if (bev == ctx->src.bev) {
strcpy(event_name, "src");
} else if (bev == ctx->dst.bev) {
strcpy(event_name, "dst");
} else if (bev == ctx->e2src.bev) {
strcpy(event_name, "e2src");
} else if (bev == ctx->e2dst.bev) {
strcpy(event_name, "e2dst");
} else if (bev == NULL) {
strcpy(event_name, "NULL");
} else {
strcpy(event_name, "UNKWN");
}
// char event_name[6] = "\0\0\0\0\0\0";
// if (bev == ctx->src.bev) {
// strcpy(event_name, "src");
// } else if (bev == ctx->dst.bev) {
// strcpy(event_name, "dst");
// } else if (bev == ctx->e2src.bev) {
// strcpy(event_name, "e2src");
// } else if (bev == ctx->e2dst.bev) {
// strcpy(event_name, "e2dst");
// } else if (bev == NULL) {
// strcpy(event_name, "NULL");
// } else {
// strcpy(event_name, "UNKWN");
// }
char *event_name = pxy_get_event_name(bev, ctx);
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>....................... pxy_bev_readcb_e2: %s, fd=%d\n", event_name, ctx->fd);
@ -2491,17 +2560,22 @@ pxy_bev_readcb_e2(struct bufferevent *bev, void *arg)
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>.................................................................................... pxy_bev_readcb_e2: PEER [%s]:%d <<<<< fd=%d, parent fd=%d\n", inet_ntoa(peeraddr.sin_addr), (int) ntohs(peeraddr.sin_port), ctx->fd, pfd);
struct evbuffer *e2outbuf = bufferevent_get_input(ctx->e2dst.bev);
struct evbuffer *inbuf = bufferevent_get_input(ctx->e2dst.bev);
if (ctx->dst_eof) {
evbuffer_drain(inbuf, evbuffer_get_length(inbuf));
goto leave;
}
char *custom_key = "SSLproxy-Addr: ";
struct evbuffer_ptr ebp = evbuffer_search(e2outbuf, custom_key, strlen(custom_key), NULL);
struct evbuffer_ptr ebp = evbuffer_search(inbuf, custom_key, strlen(custom_key), NULL);
if (ebp.pos != -1) {
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>....................... pxy_bev_readcb_e2: evbuffer_search FOUND SSLproxy-Addr at %ld\n", ebp.pos);
} else {
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>....................... pxy_bev_readcb_e2: evbuffer_search FAILED\n");
}
size_t packet_size = evbuffer_get_length(e2outbuf);
size_t packet_size = evbuffer_get_length(inbuf);
// ATTENTION: +1 is for null termination
char *packet = malloc(packet_size + 1);
if (!packet) {
@ -2512,7 +2586,7 @@ pxy_bev_readcb_e2(struct bufferevent *bev, void *arg)
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>....................... pxy_bev_readcb_e2: packet_size\n");
if (packet_size > 0) {
int bytes_read = evbuffer_remove(e2outbuf, packet, packet_size);
int bytes_read = evbuffer_remove(inbuf, packet, packet_size);
if (bytes_read < 0) {
log_err_printf("ERROR: evbuffer_remove cannot drain the buffer\n");
}
@ -2557,6 +2631,14 @@ pxy_bev_readcb_e2(struct bufferevent *bev, void *arg)
if (add_result < 0) {
log_err_printf("ERROR: evbuffer_add failed\n");
}
if (evbuffer_get_length(outbuf) >= OUTBUF_LIMIT) {
/* temporarily disable data source;
* set an appropriate watermark. */
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>....................... pxy_bev_readcb_e2: setwatermark for dst w, disable e2dst r <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< WATERMARK");
bufferevent_setwatermark(ctx->dst.bev, EV_WRITE, OUTBUF_LIMIT/2, OUTBUF_LIMIT);
bufferevent_disable(ctx->e2dst.bev, EV_READ);
}
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>....................... pxy_bev_readcb_e2: e2dst packet (size = %d), fd=%d, parent fd=%d:\n%.*s\n",
(int) packet_size, ctx->fd, pfd, (int) packet_size, packet);
@ -2571,6 +2653,11 @@ pxy_bev_readcb_e2(struct bufferevent *bev, void *arg)
if (ctx->e2dst.bev) {
struct evbuffer *inbuf = bufferevent_get_input(bev);
if (ctx->e2dst_eof) {
evbuffer_drain(inbuf, evbuffer_get_length(inbuf));
goto leave;
}
size_t packet_size = evbuffer_get_length(inbuf);
char *packet = malloc(packet_size);
if (!packet) {
@ -2584,21 +2671,29 @@ pxy_bev_readcb_e2(struct bufferevent *bev, void *arg)
log_err_printf("ERROR: evbuffer_remove cannot drain the buffer\n");
}
struct evbuffer *e2outbuf = bufferevent_get_output(ctx->e2dst.bev);
struct evbuffer *outbuf = bufferevent_get_output(ctx->e2dst.bev);
int add_result = evbuffer_add(e2outbuf, packet, packet_size);
int add_result = evbuffer_add(outbuf, packet, packet_size);
if (add_result < 0) {
log_err_printf("ERROR: evbuffer_add failed\n");
}
if (evbuffer_get_length(outbuf) >= OUTBUF_LIMIT) {
/* temporarily disable data source;
* set an appropriate watermark. */
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>....................... pxy_bev_readcb_e2: setwatermark for e2dst w, disable dst r <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< WATERMARK");
bufferevent_setwatermark(ctx->e2dst.bev, EV_WRITE, OUTBUF_LIMIT/2, OUTBUF_LIMIT);
bufferevent_disable(ctx->dst.bev, EV_READ);
}
// @todo Use a hexcode dump to print the packet?
// log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb_e2: dst packet (size = %d):\n%.*s\n",
// log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>....................... pxy_bev_readcb_e2: dst packet (size = %d):\n%.*s\n",
// (int) packet_size, (int) packet_size, packet);
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb_e2: dst packet (size = %d)\n", (int) packet_size);
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>....................... pxy_bev_readcb_e2: dst packet (size = %d)\n", (int) packet_size);
free(packet);
} else {
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>,,,,,,,,,,,,,,,,,,,,,,, pxy_bev_readcb_e2: dst ctx->e2dst.bev NULL\n");
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>....................... pxy_bev_readcb_e2: dst ctx->e2dst.bev NULL\n");
}
}
@ -2702,6 +2797,53 @@ pxy_connected_enable(struct bufferevent *bev, pxy_conn_ctx_t *ctx, char *event_n
dst->bev = NULL;
}
// @attention Defer evcl2 creation until parent init is complete, otherwise (1) causes multithreading issues (proxy_listener_acceptcb running on a different
// thread from the conn, and we only have thrmgr mutex), and (2) we need to clean up less upon errors.
// evcl2 uses the evbase of the mctx thread, otherwise we would get multithreading issues.
struct evconnlistener *evcl2 = evconnlistener_new(ctx->mctx->thr->evbase, proxy_listener_acceptcb_e2, ctx->mctx, LEV_OPT_CLOSE_ON_FREE, 1024, ctx->mctx->fd2);
if (!evcl2) {
log_err_printf("Error creating evconnlistener e2: %s, fd=%d, fd2=%d <<<<<<\n", strerror(errno), ctx->mctx->fd, ctx->mctx->fd2);
// @attention Cannot call proxy_listener_ctx_free() on evcl2, evcl2 does not have any ctx with next listener
// @todo Create a new struct for evcl2 and related functions
//proxy_listener_ctx_free(lctxe2);
evutil_closesocket(ctx->mctx->fd2);
return 0;
}
ctx->mctx->evcl2 = evcl2;
evconnlistener_set_error_cb(evcl2, proxy_listener_errorcb);
log_dbg_level_printf(LOG_DBG_MODE_FINER, ">>>>>=================================== pxy_connected_enable: FINISHED SETTING UP E2 SUCCESS, parent fd=%d, NEW fd2=%d\n", ctx->mctx->fd, ctx->mctx->fd2);
struct sockaddr_in e2listener_addr;
socklen_t e2listener_len;
e2listener_len = sizeof(e2listener_addr);
// @todo Check if the fd is the same for all children
if (getsockname(ctx->mctx->fd2, &e2listener_addr, &e2listener_len) < 0) {
perror("getsockname");
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>=================================== pxy_connected_enable: %s, getsockname ERROR= %s, fd=%d ,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,, fd2=%d\n", event_name, strerror(errno), ctx->fd, ctx->mctx->fd2);
// @todo If getsockname() fails, terminate the connection instead?
// Leaving the packet in the buffer will eventually time out and drop the connection
return 0;
}
char *addr = inet_ntoa(e2listener_addr.sin_addr);
int addr_len = strlen(addr) + 5 + 3 + 1;
ctx->mctx->pxy_dst = malloc(addr_len);
snprintf(ctx->mctx->pxy_dst, addr_len, "[%s]:%d", addr, (int) ntohs(e2listener_addr.sin_port));
// char *custom_key = "\r\nSSLproxy-Addr: ";
// size_t custom_field_len = strlen(custom_key) + strlen(ctx->mctx->pxy_dst) + 1;
//
// char *custom_field = malloc(custom_field_len);
// snprintf(custom_field, custom_field_len, "%s%s", custom_key, ctx->mctx->pxy_dst);
// free(ctx->mctx->pxy_dst);
log_dbg_level_printf(LOG_DBG_MODE_FINER, ">>>>>=================================== pxy_connected_enable: pxy_dst= %s, fd=%d, fd2=%d\n", ctx->mctx->pxy_dst, ctx->mctx->fd, ctx->mctx->fd2);
// Now open the gates
bufferevent_enable(ctx->src.bev, EV_READ|EV_WRITE);
}
return 1;
@ -2744,35 +2886,47 @@ pxy_bev_writecb(struct bufferevent *bev, void *arg)
ctx->mctx->access_time = time(NULL);
char event_name[6] = "\0\0\0\0\0\0";
if (bev == ctx->src.bev) {
strcpy(event_name, "src");
} else if (bev == ctx->dst.bev) {
strcpy(event_name, "dst");
} else if (bev == ctx->e2src.bev) {
strcpy(event_name, "e2src");
} else if (bev == ctx->e2dst.bev) {
strcpy(event_name, "e2dst");
} else if (bev == NULL) {
strcpy(event_name, "NULL");
} else {
strcpy(event_name, "UNKWN");
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>+++++++++++++++++++++++++++++++++++ pxy_bev_writecb: event_name == UNKWN <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< NOT INIT\n");
goto leave;
}
// char event_name[6] = "\0\0\0\0\0\0";
// if (bev == ctx->src.bev) {
// strcpy(event_name, "src");
// } else if (bev == ctx->dst.bev) {
// strcpy(event_name, "dst");
// } else if (bev == ctx->e2src.bev) {
// strcpy(event_name, "e2src");
// } else if (bev == ctx->e2dst.bev) {
// strcpy(event_name, "e2dst");
// } else if (bev == NULL) {
// strcpy(event_name, "NULL");
// } else {
// strcpy(event_name, "UNKWN");
// log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>+++++++++++++++++++++++++++++++++++ pxy_bev_writecb: event_name == UNKWN <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< NOT INIT\n");
// goto leave;
// }
char *event_name = pxy_get_event_name(bev, ctx);
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>+++++++++++++++++++++++++++++++++++ pxy_bev_writecb: %s, %d\n", event_name, ctx->fd);
// @todo Remove this
// XXX: For Squid's Zero Sized Reply
if ((bev == ctx->dst.bev) && !ctx->dst_connected) {
// @attention Do not call pxy_bev_eventcb() instead, that would cause deadlock? We don't use locks anymore.
//pxy_bev_eventcb(bev, BEV_EVENT_CONNECTED, ctx);
pxy_connected_enable(bev, ctx, event_name);
// // @todo Remove this
// // XXX: For Squid's Zero Sized Reply
// if ((bev == ctx->dst.bev) && !ctx->dst_connected) {
// // @attention Do not call pxy_bev_eventcb() instead, that would cause deadlock? We don't use locks anymore.
// //pxy_bev_eventcb(bev, BEV_EVENT_CONNECTED, ctx);
// pxy_connected_enable(bev, ctx, event_name);
// }
if ((bev==ctx->src.bev) || (bev==ctx->e2src.bev)) {
pxy_conn_desc_t *other = (bev==ctx->src.bev) ? &ctx->e2src : &ctx->src;
if (other->bev && !(bufferevent_get_enabled(other->bev) & EV_READ)) {
/* data source temporarily disabled;
* re-enable and reset watermark to 0. */
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>+++++++++++++++++++++++++++++++++++ pxy_bev_writecb: remove watermark for w, disable r <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< WATERMARK");
bufferevent_setwatermark(bev, EV_WRITE, 0, 0);
bufferevent_enable(other->bev, EV_READ);
}
}
if (ctx->src_eof || ctx->e2src_eof) {
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>+++++++++++++++++++++++++++++++++++ pxy_bev_writecb(): TRY CLOSING PARENT fd=%d\n", ctx->fd);
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>+++++++++++++++++++++++++++++++++++ pxy_bev_writecb: TRY CLOSING PARENT fd=%d\n", ctx->fd);
rv = pxy_conn_free(ctx);
}
@ -2805,20 +2959,21 @@ pxy_bev_writecb_e2(struct bufferevent *bev, void *arg)
pxy_conn_ctx_t *parent_ctx = ctx->mctx->parent_ctx;
char event_name[6] = "\0\0\0\0\0\0";
if (bev == ctx->src.bev) {
strcpy(event_name, "src");
} else if (bev == ctx->dst.bev) {
strcpy(event_name, "dst");
} else if (bev == ctx->e2src.bev) {
strcpy(event_name, "e2src");
} else if (bev == ctx->e2dst.bev) {
strcpy(event_name, "e2dst");
} else if (bev == NULL) {
strcpy(event_name, "NULL");
} else {
strcpy(event_name, "UNKWN");
}
// char event_name[6] = "\0\0\0\0\0\0";
// if (bev == ctx->src.bev) {
// strcpy(event_name, "src");
// } else if (bev == ctx->dst.bev) {
// strcpy(event_name, "dst");
// } else if (bev == ctx->e2src.bev) {
// strcpy(event_name, "e2src");
// } else if (bev == ctx->e2dst.bev) {
// strcpy(event_name, "e2dst");
// } else if (bev == NULL) {
// strcpy(event_name, "NULL");
// } else {
// strcpy(event_name, "UNKWN");
// }
char *event_name = pxy_get_event_name(bev, ctx);
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>??????????????????????????? pxy_bev_writecb_e2: %s, %d\n", event_name, ctx->fd);
@ -2835,6 +2990,15 @@ pxy_bev_writecb_e2(struct bufferevent *bev, void *arg)
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>??????????????????????????? pxy_bev_writecb_e2: ctx->parent_ctx NULL %s, %d\n", event_name, fd);
}
pxy_conn_desc_t *other = (bev==ctx->e2dst.bev) ? &ctx->dst : &ctx->e2dst;
if (other->bev && !(bufferevent_get_enabled(other->bev) & EV_READ)) {
/* data source temporarily disabled;
* re-enable and reset watermark to 0. */
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>??????????????????????????? pxy_bev_writecb_e2: remove watermark for w, disable r <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< WATERMARK");
bufferevent_setwatermark(bev, EV_WRITE, 0, 0);
bufferevent_enable(other->bev, EV_READ);
}
if (ctx->e2dst_eof || ctx->dst_eof || !parent_ctx) {
log_dbg_level_printf(LOG_DBG_MODE_FINER, ">>>>>??????????????????????????? pxy_bev_writecb_e2: TRY CLOSING CHILD fd=%d\n", fd);
rv = pxy_conn_free_e2(ctx, 0);
@ -2875,22 +3039,23 @@ pxy_bev_eventcb(struct bufferevent *bev, short events, void *arg)
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>=================================== pxy_bev_eventcb ENTER fd=%d\n", ctx->fd);
char event_name[6] = "\0\0\0\0\0\0";
if (bev == ctx->src.bev) {
strcpy(event_name, "src");
} else if (bev == ctx->dst.bev) {
strcpy(event_name, "dst");
} else if (bev == ctx->e2src.bev) {
strcpy(event_name, "e2src");
} else if (bev == ctx->e2dst.bev) {
strcpy(event_name, "e2dst");
} else if (bev == NULL) {
strcpy(event_name, "NULL");
} else {
strcpy(event_name, "UNKWN");
log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>=================================== pxy_bev_eventcb: event_name == UNKWN <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< NOT INIT\n");
goto leave;
}
// char event_name[6] = "\0\0\0\0\0\0";
// if (bev == ctx->src.bev) {
// strcpy(event_name, "src");
// } else if (bev == ctx->dst.bev) {
// strcpy(event_name, "dst");
// } else if (bev == ctx->e2src.bev) {
// strcpy(event_name, "e2src");
// } else if (bev == ctx->e2dst.bev) {
// strcpy(event_name, "e2dst");
// } else if (bev == NULL) {
// strcpy(event_name, "NULL");
// } else {
// strcpy(event_name, "UNKWN");
// log_dbg_level_printf(LOG_DBG_MODE_FINE, ">>>>>=================================== pxy_bev_eventcb: event_name == UNKWN <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< NOT INIT\n");
// goto leave;
// }
char *event_name = pxy_get_event_name(bev, ctx);
if (events & BEV_EVENT_CONNECTED) {
if (!pxy_connected_enable(bev, ctx, event_name)) {
@ -3088,20 +3253,21 @@ pxy_bev_eventcb_e2(struct bufferevent *bev, short events, void *arg)
ctx->mctx->access_time = time(NULL);
char event_name[6] = "\0\0\0\0\0\0";
if (bev == ctx->src.bev) {
strcpy(event_name, "src");
} else if (bev == ctx->dst.bev) {
strcpy(event_name, "dst");
} else if (bev == ctx->e2src.bev) {
strcpy(event_name, "e2src");
} else if (bev == ctx->e2dst.bev) {
strcpy(event_name, "e2dst");
} else if (bev == NULL) {
strcpy(event_name, "NULL");
} else {
strcpy(event_name, "UNKWN");
}
// char event_name[6] = "\0\0\0\0\0\0";
// if (bev == ctx->src.bev) {
// strcpy(event_name, "src");
// } else if (bev == ctx->dst.bev) {
// strcpy(event_name, "dst");
// } else if (bev == ctx->e2src.bev) {
// strcpy(event_name, "e2src");
// } else if (bev == ctx->e2dst.bev) {
// strcpy(event_name, "e2dst");
// } else if (bev == NULL) {
// strcpy(event_name, "NULL");
// } else {
// strcpy(event_name, "UNKWN");
// }
char *event_name = pxy_get_event_name(bev, ctx);
log_dbg_level_printf(LOG_DBG_MODE_FINEST, ">>>>>--------------------- pxy_bev_eventcb_e2: ENTER %s fd=%d\n", event_name, ctx->fd);
@ -3338,7 +3504,7 @@ pxy_conn_connect_e2(pxy_conn_ctx_t *ctx)
ctx->e2dst.bev = pxy_bufferevent_setup_e2(ctx, fd, ctx->e2dst.ssl);
// @attention Do not enable e2dst events here yet, they will be enabled after dst connects
// @todo Do we need a watermark?
// @todo Do we need a watermark for the header line of SSL proxy address?
//bufferevent_setwatermark(ctx->e2dst.bev, EV_READ, 200, OUTBUF_LIMIT);
/* create server-side socket and eventbuffer */

Loading…
Cancel
Save