ncsubproc: unify thread implementations #553

This commit is contained in:
nick black 2020-05-02 09:29:39 -04:00
parent 8feb01c7cc
commit 5132b5b2b5
No known key found for this signature in database
GPG Key ID: 5F43400C21CBFACC

View File

@ -17,11 +17,23 @@ ncfdplane_destroy_inner(ncfdplane* n){
return ret; return ret;
} }
static void * // if pidfd is < 0, it won't be used in the poll()
ncfdplane_thread(void* vncfp){ static void
ncfdplane* ncfp = vncfp; fdthread(ncfdplane* ncfp, int pidfd){
struct pollfd pfds[2];
memset(pfds, 0, sizeof(pfds));
char* buf = malloc(BUFSIZ); char* buf = malloc(BUFSIZ);
ssize_t r; int pevents;
pfds[0].fd = ncfp->fd;
pfds[0].events = POLLIN;
const int fdcount = pidfd < 0 ? 1 : 2;
if(fdcount > 1){
pfds[1].fd = pidfd;
pfds[1].events = POLLIN;
}
ssize_t r = 0;
while((pevents = poll(pfds, fdcount, -1)) >= 0 || errno == EINTR){
if(pfds[0].revents & POLLIN){
while((r = read(ncfp->fd, buf, BUFSIZ)) >= 0){ while((r = read(ncfp->fd, buf, BUFSIZ)) >= 0){
if(r == 0){ if(r == 0){
break; break;
@ -29,15 +41,25 @@ ncfdplane_thread(void* vncfp){
if( (r = ncfp->cb(ncfp, buf, r, ncfp->curry)) ){ if( (r = ncfp->cb(ncfp, buf, r, ncfp->curry)) ){
break; break;
} }
if(ncfp->destroyed){
break;
} }
// FIXME need to continue reading on pipe/socket }
if(r <= 0){ }
if(fdcount > 1 && pfds[1].revents & POLLIN){
r = 0;
break;
}
}
if(r <= 0 && !ncfp->destroyed){
ncfp->donecb(ncfp, r == 0 ? 0 : errno, ncfp->curry); ncfp->donecb(ncfp, r == 0 ? 0 : errno, ncfp->curry);
} }
free(buf); free(buf);
if(ncfp->destroyed){
ncfdplane_destroy_inner(ncfp);
} }
static void *
ncfdplane_thread(void* vncfp){
fdthread(vncfp, -1);
return NULL; return NULL;
} }
@ -150,38 +172,7 @@ kill_and_wait_subproc(int pidfd){
static void * static void *
ncsubproc_thread(void* vncsp){ ncsubproc_thread(void* vncsp){
ncsubproc* ncsp = vncsp; ncsubproc* ncsp = vncsp;
struct pollfd pfds[2]; fdthread(ncsp->nfp, ncsp->pidfd);
memset(pfds, 0, sizeof(pfds));
char* buf = malloc(BUFSIZ);
int pevents;
pfds[0].fd = ncsp->nfp->fd;
pfds[1].fd = ncsp->pidfd;
pfds[0].events = POLLIN;
pfds[1].events = POLLIN;
ssize_t r = 0;
while((pevents = poll(pfds, sizeof(pfds) / sizeof(*pfds), -1)) >= 0 || errno == EINTR){
if(pfds[0].revents & POLLIN){
while((r = read(ncsp->nfp->fd, buf, BUFSIZ)) >= 0){
if(r == 0){
break;
}
if( (r = ncsp->nfp->cb(ncsp->nfp, buf, r, ncsp->nfp->curry)) ){
break;
}
if(ncsp->nfp->destroyed){
break;
}
}
}
if(pfds[1].revents & POLLIN){
r = 0;
break;
}
}
if(r <= 0 && !ncsp->nfp->destroyed){
ncsp->nfp->donecb(ncsp->nfp, r == 0 ? 0 : errno, ncsp->nfp->curry);
}
free(buf);
kill_and_wait_subproc(ncsp->pidfd); kill_and_wait_subproc(ncsp->pidfd);
if(ncsp->nfp->destroyed){ if(ncsp->nfp->destroyed){
ncfdplane_destroy_inner(ncsp->nfp); ncfdplane_destroy_inner(ncsp->nfp);