@ -77,14 +77,45 @@ pxy_thrmgr_thr(void *arg)
return NULL ;
}
/*
* Create new thread manager but do not start any threads yet .
* This gets called before forking to background .
*/
pxy_thrmgr_ctx_t *
pxy_thrmgr_new ( opts_t * opts )
{
pxy_thrmgr_ctx_t * ctx ;
if ( ! ( ctx = malloc ( sizeof ( pxy_thrmgr_ctx_t ) ) ) )
return NULL ;
memset ( ctx , 0 , sizeof ( pxy_thrmgr_ctx_t ) ) ;
ctx - > opts = opts ;
ctx - > num_thr = 2 * sys_get_cpu_cores ( ) ;
// ctx->num_thr = 1;
return ctx ;
}
/*
* Start the thread manager and associated threads .
* This must be called after forking .
*
* Returns - 1 on failure , 0 on success .
*/
int
pxy_thrmgr_init ( pxy_thrmgr_ctx_t * ctx )
pxy_thrmgr_ run ( pxy_thrmgr_ctx_t * ctx )
{
int idx = - 1 , dns = 0 ;
dns = opts_has_dns_spec ( ctx - > opts ) ;
// pthread_mutex_init(&ctx->mutex, NULL);
// pthread_mutexattr_t *attr;
// pthread_mutexattr_init(attr);
// pthread_mutexattr_settype(attr, PTHREAD_MUTEX_RECURSIVE);
//// pthread_mutexattr_settype(attr, PTHREAD_MUTEX_ERRORCHECK);
pthread_mutex_init ( & ctx - > mutex , NULL ) ;
// pthread_mutex_init(&ctx->mutex, attr);
if ( ! ( ctx - > thr = malloc ( ctx - > num_thr * sizeof ( pxy_thr_ctx_t * ) ) ) ) {
log_dbg_printf ( " Failed to allocate memory \n " ) ;
@ -121,107 +152,6 @@ pxy_thrmgr_init(pxy_thrmgr_ctx_t *ctx)
log_dbg_printf ( " Initialized %d connection handling threads \n " ,
ctx - > num_thr ) ;
return 0 ;
leave :
while ( idx > = 0 ) {
if ( ctx - > thr [ idx ] ) {
if ( ctx - > thr [ idx ] - > dnsbase ) {
evdns_base_free ( ctx - > thr [ idx ] - > dnsbase , 0 ) ;
}
if ( ctx - > thr [ idx ] - > evbase ) {
event_base_free ( ctx - > thr [ idx ] - > evbase ) ;
}
free ( ctx - > thr [ idx ] ) ;
}
idx - - ;
}
pthread_mutex_destroy ( & ctx - > mutex ) ;
if ( ctx - > thr ) {
free ( ctx - > thr ) ;
ctx - > thr = NULL ;
}
return - 1 ;
}
/*
* Create new thread manager but do not start any threads yet .
* This gets called before forking to background .
*/
pxy_thrmgr_ctx_t *
pxy_thrmgr_new ( opts_t * opts )
{
pxy_thrmgr_ctx_t * ctx ;
if ( ! ( ctx = malloc ( sizeof ( pxy_thrmgr_ctx_t ) ) ) )
return NULL ;
memset ( ctx , 0 , sizeof ( pxy_thrmgr_ctx_t ) ) ;
ctx - > opts = opts ;
ctx - > num_thr = 2 * sys_get_cpu_cores ( ) ;
// ctx->num_thr = 1;
// pxy_thrmgr_init(ctx);
return ctx ;
}
/*
* Start the thread manager and associated threads .
* This must be called after forking .
*
* Returns - 1 on failure , 0 on success .
*/
int
pxy_thrmgr_run ( pxy_thrmgr_ctx_t * ctx )
{
int idx = - 1 , dns = 0 ;
// dns = opts_has_dns_spec(ctx->opts);
// pthread_mutexattr_t *attr;
// pthread_mutexattr_init(attr);
// pthread_mutexattr_settype(attr, PTHREAD_MUTEX_RECURSIVE);
//// pthread_mutexattr_settype(attr, PTHREAD_MUTEX_ERRORCHECK);
pthread_mutex_init ( & ctx - > mutex , NULL ) ;
// pthread_mutex_init(&ctx->mutex, attr);
// if (!(ctx->thr = malloc(ctx->num_thr * sizeof(pxy_thr_ctx_t*)))) {
// log_dbg_printf("Failed to allocate memory\n");
// goto leave;
// }
// memset(ctx->thr, 0, ctx->num_thr * sizeof(pxy_thr_ctx_t*));
//
// for (idx = 0; idx < ctx->num_thr; idx++) {
// if (!(ctx->thr[idx] = malloc(sizeof(pxy_thr_ctx_t)))) {
// log_dbg_printf("Failed to allocate memory\n");
// goto leave;
// }
// memset(ctx->thr[idx], 0, sizeof(pxy_thr_ctx_t));
// ctx->thr[idx]->evbase = event_base_new();
// if (!ctx->thr[idx]->evbase) {
// log_dbg_printf("Failed to create evbase %d\n", idx);
// goto leave;
// }
// if (dns) {
// /* only create dns base if we actually need it later */
// ctx->thr[idx]->dnsbase = evdns_base_new(
// ctx->thr[idx]->evbase, 1);
// if (!ctx->thr[idx]->dnsbase) {
// log_dbg_printf("Failed to create dnsbase %d\n",
// idx);
// goto leave;
// }
// }
// ctx->thr[idx]->load = 0;
// ctx->thr[idx]->running = 0;
// }
//
// log_dbg_printf("Initialized %d connection handling threads\n",
// ctx->num_thr);
pxy_thrmgr_init ( ctx ) ;
for ( idx = 0 ; idx < ctx - > num_thr ; idx + + ) {
if ( pthread_create ( & ctx - > thr [ idx ] - > thr , NULL ,
pxy_thrmgr_thr , ctx - > thr [ idx ] ) )
@ -336,25 +266,13 @@ pxy_thrmgr_attach(pxy_thrmgr_ctx_t *ctx, struct event_base **evbase,
int thridx ;
size_t minload ;
int err = pthread_mutex_lock ( & ctx - > mutex ) ;
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>>>> load pxy_thrmgr_attach() err=%d \n " , err ) ;
thridx = 0 ;
if ( ! ctx - > thr ) {
thridx = - 1 ;
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>>>> pxy_thrmgr_attach() goto exit_attach \n " ) ;
goto exit_attach ;
}
pthread_mutex_lock ( & ctx - > mutex ) ;
minload = ctx - > thr [ thridx ] - > load ;
# ifdef DEBUG_THREAD
log_dbg_printf ( " ===> Proxy connection handler thread status: \n "
" thr[%d]: %zu \n " , thridx , minload ) ;
# endif /* DEBUG_THREAD */
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>>>> for pxy_thrmgr_attach() \n " ) ;
for ( int idx = 1 ; idx < ctx - > num_thr ; idx + + ) {
# ifdef DEBUG_THREAD
log_dbg_printf ( " thr[%d]: %zu \n " , idx , ctx - > thr [ idx ] - > load ) ;
@ -364,9 +282,7 @@ pxy_thrmgr_attach(pxy_thrmgr_ctx_t *ctx, struct event_base **evbase,
thridx = idx ;
}
}
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>>>> evbase pxy_thrmgr_attach() \n " ) ;
* evbase = ctx - > thr [ thridx ] - > evbase ;
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>>>> dnsbase pxy_thrmgr_attach() \n " ) ;
* dnsbase = ctx - > thr [ thridx ] - > dnsbase ;
ctx - > thr [ thridx ] - > load + + ;
@ -375,15 +291,15 @@ pxy_thrmgr_attach(pxy_thrmgr_ctx_t *ctx, struct event_base **evbase,
mctx - > next = ctx - > thr [ thridx ] - > mctx ;
ctx - > thr [ thridx ] - > mctx = mctx ;
# ifdef DEBUG_THREAD
log_dbg_printf ( " thridx: %d \n " , thridx ) ;
# endif /* DEBUG_THREAD */
pxy_thrmgr_print_thr_info ( ctx ) ;
exit_attach :
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>>>> EXIT pxy_thrmgr_attach() \n " ) ;
pthread_mutex_unlock ( & ctx - > mutex ) ;
# ifdef DEBUG_THREAD
log_dbg_printf ( " thridx: %d \n " , thridx ) ;
# endif /* DEBUG_THREAD */
return thridx ;
}
@ -403,7 +319,7 @@ pxy_thrmgr_detach(pxy_thrmgr_ctx_t *ctx, int thridx, proxy_conn_meta_ctx_t *mctx
pxy_thrmgr_remove_node ( mctx , & ctx - > thr [ thridx ] - > mctx ) ;
ctx - > thr [ thridx ] - > load - - ;
log_dbg_level_printf ( LOG_DBG_MODE_FINE ST , " >>>>> pxy_thrmgr_detach(): AFTER pxy_thrmgr_remove_node \n " ) ;
log_dbg_level_printf ( LOG_DBG_MODE_FINE R , " >>>>> pxy_thrmgr_detach(): AFTER pxy_thrmgr_remove_node \n " ) ;
pxy_thrmgr_print_thr_info ( ctx ) ;
pthread_mutex_unlock ( & ctx - > mutex ) ;
@ -419,7 +335,7 @@ pxy_thrmgr_print_thr_info(pxy_thrmgr_ctx_t *ctx)
time_t now = time ( NULL ) ;
for ( int i = 0 ; i < ctx - > num_thr ; i + + ) {
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>> pxy_thrmgr_print_thr_info(): thr=%d, load=% d \n " , i , ctx - > thr [ i ] - > load ) ;
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>> pxy_thrmgr_print_thr_info(): thr=%d, load=% lu \n " , i , ctx - > thr [ i ] - > load ) ;
proxy_conn_meta_ctx_t * current = ctx - > thr [ i ] - > mctx ;
int count = 0 ;
@ -439,11 +355,11 @@ pxy_thrmgr_print_thr_info(pxy_thrmgr_ctx_t *ctx)
char * host , * port ;
if ( sys_sockaddr_str ( ( struct sockaddr * ) & current - > addr , current - > addrlen , & host , & port ) ! = 0 ) {
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>> pxy_thrmgr_print_thr_info(): sys_sockaddr_str FAILED \n " ) ;
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>> pxy_thrmgr_print_thr_info(): thr=%d, cont=%d, fd=%d, fd2=%d, src=%d, e2src=%d, dst=%d, e2dst=%d, dst2=%d, p=%d-%d-%d c=%d-%d, init=%d, cc=%d, time=% d\n " ,
i , count , current - > fd , current - > fd2 , src_fd , e2src_fd , dst_fd , e2dst_fd , dst2_fd , current - > src_eof , current - > e2src_eof , current - > dst_eof , current - > e2dst_eof , current - > dst2_eof , current - > initialized , current - > child_count , now - current - > access_time ) ;
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>> pxy_thrmgr_print_thr_info(): thr=%d, cont=%d, fd=%d, fd2=%d, src=%d, e2src=%d, dst=%d, e2dst=%d, dst2=%d, p=%d-%d-%d c=%d-%d, init=%d, cc=%d, time=% ll d\n " ,
i , count , current - > fd , current - > fd2 , src_fd , e2src_fd , dst_fd , e2dst_fd , dst2_fd , current - > src_eof , current - > e2src_eof , current - > dst_eof , current - > e2dst_eof , current - > dst2_eof , current - > initialized , current - > child_count , ( long int ) now - current - > access_time ) ;
} else {
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>> pxy_thrmgr_print_thr_info(): thr=%d, cont=%d, fd=%d, fd2=%d, src=%d, e2src=%d, dst=%d, e2dst=%d, dst2=%d, p=%d-%d-%d c=%d-%d, init=%d, cc=%d, time=% d, addr=%s:%s\n " ,
i , count , current - > fd , current - > fd2 , src_fd , e2src_fd , dst_fd , e2dst_fd , dst2_fd , current - > src_eof , current - > e2src_eof , current - > dst_eof , current - > e2dst_eof , current - > dst2_eof , current - > initialized , current - > child_count , now - current - > access_time , host ? host : " ? " , port ? port : " ? " ) ;
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>> pxy_thrmgr_print_thr_info(): thr=%d, cont=%d, fd=%d, fd2=%d, src=%d, e2src=%d, dst=%d, e2dst=%d, dst2=%d, p=%d-%d-%d c=%d-%d, init=%d, cc=%d, time=% ll d, addr=%s:%s\n " ,
i , count , current - > fd , current - > fd2 , src_fd , e2src_fd , dst_fd , e2dst_fd , dst2_fd , current - > src_eof , current - > e2src_eof , current - > dst_eof , current - > e2dst_eof , current - > dst2_eof , current - > initialized , current - > child_count , ( long int ) now - current - > access_time , host ? host : " ? " , port ? port : " ? " ) ;
free ( host ) ;
free ( port ) ;
}
@ -481,8 +397,8 @@ pxy_thrmgr_get_expired_conns(pxy_thrmgr_ctx_t *ctx, proxy_conn_meta_ctx_t **dele
proxy_conn_meta_ctx_t * delete = * delete_list ;
while ( delete ) {
proxy_conn_meta_ctx_t * next = delete - > delete ;
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>> pxy_thrmgr_get_expired_conns(): thr=%d, fd=%d, fd2=%d, time=% d\n " ,
delete - > thridx , delete - > fd , delete - > fd2 , now - delete - > access_time ) ;
log_dbg_level_printf ( LOG_DBG_MODE_FINEST , " >>> pxy_thrmgr_get_expired_conns(): thr=%d, fd=%d, fd2=%d, time=% ll d\n " ,
delete - > thridx , delete - > fd , delete - > fd2 , ( long int ) now - delete - > access_time ) ;
delete = next ;
}