diff --git a/common.c b/common.c index 0740d30..99e018f 100644 --- a/common.c +++ b/common.c @@ -1,6 +1,7 @@ #include "pixz.h" #include +#include #pragma mark UTILS @@ -418,6 +419,7 @@ queue_t *gPipelineStartQ = NULL, *gPipelineMergeQ = NULL; size_t gPipelineProcessMax = 0; +size_t gPipelineQSize = 0; pipeline_data_free_t gPLFreer = NULL; pipeline_split_t gPLSplit = NULL; @@ -457,7 +459,13 @@ void pipeline_create( gPLProcessCount = gPipelineProcessMax; gPLProcessThreads = malloc(gPLProcessCount * sizeof(pthread_t)); - for (size_t i = 0; i < (int)(gPLProcessCount * 2 + 3); ++i) { + int qsize = gPipelineQSize ? gPipelineQSize + : ceil(gPLProcessCount * 1.3 + 1); + if (qsize < gPLProcessCount) { + fprintf(stderr, "Warning: queue size is less than thread count, " + "performance will suffer!\n"); + } + for (size_t i = 0; i < qsize; ++i) { // create blocks, including a margin of error pipeline_item_t *item = malloc(sizeof(pipeline_item_t)); item->data = create(); diff --git a/pixz.c b/pixz.c index bfa6c73..186f666 100644 --- a/pixz.c +++ b/pixz.c @@ -60,7 +60,7 @@ int main(int argc, char **argv) { char *optend; long optint; double optdbl; - while ((ch = getopt(argc, argv, "dxli:o:tvhp:0123456789f:")) != -1) { + while ((ch = getopt(argc, argv, "dxli:o:tvhp:0123456789f:q:")) != -1) { switch (ch) { case 'd': op = OP_READ; break; case 'x': op = OP_EXTRACT; break; @@ -81,6 +81,12 @@ int main(int argc, char **argv) { usage("Need a non-negative integer argument to -p"); gPipelineProcessMax = optint; break; + case 'q': + optint = strtol(optarg, &optend, 10); + if (optint <= 0 || *optend) + usage("Need a positive integer argument to -q"); + gPipelineQSize = optint; + break; default: if (ch >= '0' && ch <= '9') { level = ch - '0'; @@ -91,7 +97,7 @@ int main(int argc, char **argv) { } argc -= optind; argv += optind; - + gInFile = stdin; gOutFile = stdout; bool iremove = false; diff --git a/pixz.h b/pixz.h index 978ab95..36f53b3 100644 --- a/pixz.h +++ b/pixz.h @@ -104,6 +104,7 @@ int queue_pop(queue_t *q, void **datap); #pragma mark PIPELINE +extern size_t gPipelineQSize; extern size_t gPipelineProcessMax; extern queue_t *gPipelineStartQ, *gPipelineSplitQ, *gPipelineMergeQ;