From 8a90a6dce7ec812ff87e29b41259b5accc76acdc Mon Sep 17 00:00:00 2001 From: root Date: Fri, 22 Jan 2010 23:57:37 -0500 Subject: [PATCH] free queue contents properly; use input queue for memory limiting --- TODO | 2 +- common.c | 6 ++++-- pixz.h | 7 +++++-- write.c | 51 ++++++++++++++++++++++++++++++++++----------------- 4 files changed, 44 insertions(+), 22 deletions(-) diff --git a/TODO b/TODO index f7c22a7..690b4ce 100644 --- a/TODO +++ b/TODO @@ -15,8 +15,8 @@ CLEANUP BUGS * fast input or slow output -> blocks pile up, huge memory usage + * performance lags under IO? * slow input -> CPUs idle while waiting for input - * performance lags under IO? EFFICIENCY * should use ordered list in collator diff --git a/common.c b/common.c index d5c0534..2c8f38a 100644 --- a/common.c +++ b/common.c @@ -218,9 +218,10 @@ void *decode_block_start(off_t block_seek) { return bw; } -queue_t *queue_new(void) { +queue_t *queue_new(queue_free_t freer) { queue_t *q = malloc(sizeof(queue_t)); q->first = q->last = NULL; + q->freer = freer; pthread_mutex_init(&q->mutex, NULL); pthread_cond_init(&q->pop_cond, NULL); return q; @@ -229,7 +230,8 @@ queue_t *queue_new(void) { void queue_free(queue_t *q) { for (queue_item_t *i = q->first; i; ) { queue_item_t *tmp = i->next; - free(i->data); + if (q->freer) + q->freer(i->type, i->data); free(i); i = tmp; } diff --git a/pixz.h b/pixz.h index 2f687f8..fd4f3cd 100644 --- a/pixz.h +++ b/pixz.h @@ -36,12 +36,16 @@ struct queue_item_t { queue_item_t *next; }; +typedef void (*queue_free_t)(int type, void *p); + typedef struct { queue_item_t *first; queue_item_t *last; pthread_mutex_t mutex; pthread_cond_t pop_cond; + + queue_free_t freer; } queue_t; @@ -70,8 +74,7 @@ void read_file_index(void); void dump_file_index(void); void free_file_index(void); -queue_t *queue_new(void); +queue_t *queue_new(queue_free_t freer); void queue_free(queue_t *q); -// data should be on heap, if present void queue_push(queue_t *q, int type, void *data); int queue_pop(queue_t *q, void **datap); diff --git a/write.c b/write.c index 72aa758..d8f46ec 100644 --- a/write.c +++ b/write.c @@ -27,7 +27,7 @@ struct io_block_t { static size_t gNumEncodeThreads = 0; static pthread_t *gEncodeThreads = NULL; static pthread_t gReadThread; -static queue_t *gEncodeQ, *gWriteQ; +static queue_t *gReadQ, *gEncodeQ, *gWriteQ; static size_t gBlockInSize = 0, gBlockOutSize = 0; static off_t gMultiHeaderStart = 0; @@ -47,7 +47,7 @@ static size_t gFileIndexBufPos = 0; static void *read_thread(void *data); static void *encode_thread(void *data); - +static void block_queue_free(int type, void *p); static bool is_multi_header(const char *name); static void add_file(off_t offset, const char *name); @@ -90,8 +90,16 @@ int main(int argc, char **argv) { // thread setup gNumEncodeThreads = num_threads(); gEncodeThreads = malloc(gNumEncodeThreads * sizeof(pthread_t)); - gEncodeQ = queue_new(); - gWriteQ = queue_new(); + gReadQ = queue_new(block_queue_free); + gEncodeQ = queue_new(block_queue_free); + gWriteQ = queue_new(block_queue_free); + for (size_t i = 0; i < (int)(gNumEncodeThreads * 2 + 4); ++i) { + // create blocks, including a margin of error + io_block_t *ib = malloc(sizeof(io_block_t)); + ib->input = malloc(gBlockInSize); + ib->output = malloc(gBlockOutSize); + queue_push(gReadQ, MSG_BLOCK, ib); + } if (pthread_create(&gReadThread, NULL, &read_thread, NULL)) die("Error creating read thread"); for (int i = 0; i < gNumEncodeThreads; ++i) { @@ -133,6 +141,7 @@ int main(int argc, char **argv) { die("Error joining read thread"); queue_free(gEncodeQ); queue_free(gWriteQ); + queue_free(gReadQ); free(gEncodeThreads); return 0; @@ -167,13 +176,9 @@ static void *read_thread(void *data) { // write last block, if necessary if (gReadBlock) { - if (gReadBlock->insize) { - queue_push(gEncodeQ, MSG_BLOCK, gReadBlock); - } else { // if this block had only one read, and it was EOF - free(gReadBlock->input); - free(gReadBlock->output); - free(gReadBlock); - } + // if this block had only one read, and it was EOF, it's waste + queue_push(gReadBlock->insize ? gEncodeQ : gReadQ, MSG_BLOCK, gReadBlock); + gReadBlock = NULL; } // stop the other threads @@ -191,9 +196,7 @@ static void *read_thread(void *data) { static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) { if (!gReadBlock) { - gReadBlock = malloc(sizeof(io_block_t)); - gReadBlock->input = malloc(gBlockInSize); - gReadBlock->output = malloc(gBlockOutSize); + queue_pop(gReadQ, (void**)&gReadBlock); gReadBlock->insize = 0; gReadBlock->seq = gBlockNum++; } @@ -251,6 +254,22 @@ static void add_file(off_t offset, const char *name) { gLastFile = f; } +static void block_queue_free(int type, void *p) { + switch (type) { + case MSG_BLOCK: { + io_block_t *ib = (io_block_t*)p; + free(ib->input); + free(ib->output); + free(ib); + break; + } + case MSG_STOP: + break; + default: + die("Unknown msg type %d", type); + } +} + #pragma mark ENCODING @@ -324,9 +343,7 @@ static void write_blocks(io_block_t **ibs, size_t *seq) { } else { *ibs = ib->next; } - free(ib->input); - free(ib->output); - free(ib); + queue_push(gReadQ, MSG_BLOCK, ib); ++*seq; block_missing = false;