From 42302434a7da85083ee2294743c09fc411dc3adf Mon Sep 17 00:00:00 2001 From: Dave Vasilevsky Date: Wed, 13 Jan 2010 00:08:30 -0500 Subject: [PATCH] Use threads and queues--still single-threaded --- common.c | 51 +++++++++ pixz.h | 27 ++++- write.c | 330 ++++++++++++++++++++++++++++++++++--------------------- 3 files changed, 279 insertions(+), 129 deletions(-) diff --git a/common.c b/common.c index 2dbbb83..8d3ed60 100644 --- a/common.c +++ b/common.c @@ -209,3 +209,54 @@ void *decode_block_start(off_t block_seek) { return bw; } + +queue_t *queue_new(void) { + queue_t *q = malloc(sizeof(queue_t)); + q->first = q->last = NULL; + pthread_mutex_init(&q->mutex, NULL); + pthread_cond_init(&q->pop_cond, NULL); + return q; +} + +void queue_free(queue_t *q) { + pthread_mutex_destroy(&q->mutex); + pthread_cond_destroy(&q->pop_cond); + free(q); +} + +void queue_push(queue_t *q, int type, void *data) { + pthread_mutex_lock(&q->mutex); + + queue_item_t *i = malloc(sizeof(queue_item_t)); + i->type = type; + i->data = data; + i->next = NULL; + + if (q->last) { + q->last->next = i; + } else { + q->first = i; + } + q->last = i; + + pthread_cond_signal(&q->pop_cond); + pthread_mutex_unlock(&q->mutex); +} + +int queue_pop(queue_t *q, void **datap) { + pthread_mutex_lock(&q->mutex); + while (!q->first) + pthread_cond_wait(&q->pop_cond, &q->mutex); + + queue_item_t *i = q->first; + q->first = i->next; + if (!q->first) + q->last = NULL; + + *datap = i->data; + int type = i->type; + free(i); + + pthread_mutex_unlock(&q->mutex); + return type; +} diff --git a/pixz.h b/pixz.h index fc80712..e7e893b 100644 --- a/pixz.h +++ b/pixz.h @@ -5,6 +5,8 @@ #include #include +#include + #pragma mark DEFINES @@ -17,12 +19,28 @@ #pragma mark TYPES +typedef struct file_index_t file_index_t; struct file_index_t { char *name; off_t offset; - struct file_index_t *next; + file_index_t *next; }; -typedef struct file_index_t file_index_t; + + +typedef struct queue_item_t queue_item_t; +struct queue_item_t { + int type; + void *data; + queue_item_t *next; +}; + +typedef struct { + queue_item_t *first; + queue_item_t *last; + + pthread_mutex_t mutex; + pthread_cond_t pop_cond; +} queue_t; #pragma mark GLOBALS @@ -44,3 +62,8 @@ void *decode_block_start(off_t block_seek); void read_file_index(void); void dump_file_index(void); void free_file_index(void); + +queue_t *queue_new(void); +void queue_free(queue_t *q); +void queue_push(queue_t *q, int type, void *data); +int queue_pop(queue_t *q, void **datap); diff --git a/write.c b/write.c index d2dd259..220ed7c 100644 --- a/write.c +++ b/write.c @@ -6,39 +6,58 @@ #include -#pragma mark GLOBALS +#pragma mark TYPES -static FILE *gOutFile = NULL; -static off_t gTotalRead = 0; +typedef enum { + MSG_BLOCK, + MSG_STOP, +} msg_type_t; + +typedef struct { + lzma_block block; + uint8_t *input, *output; + size_t insize, outsize; +} io_block_t; -static uint8_t gBlockBuf[BLOCKSIZE]; -static size_t gBlockSize = 0; -static lzma_filter gFilters[LZMA_FILTERS_MAX + 1]; + +#pragma mark GLOBALS + +static pthread_t gReadThread, gEncodeThread; +static queue_t *gEncodeQ, *gWriteQ; +static size_t gBlockOutSize = 0; static off_t gMultiHeaderStart = 0; static bool gMultiHeader = false; +static off_t gTotalRead = 0; +static io_block_t *gReadBlock = NULL; +static lzma_filter gFilters[LZMA_FILTERS_MAX + 1]; + +static FILE *gOutFile = NULL; static uint8_t gFileIndexBuf[CHUNKSIZE]; static size_t gFileIndexBufPos = 0; #pragma mark FUNCTION DECLARATIONS -void stream_edge(lzma_vli backward_size); -void write_block(void); -void write_block_header(lzma_block *block); -void encode_index(void); +static void *read_thread(void *data); +static void *encode_thread(void *data); + -bool is_multi_header(const char *name); -void add_file(off_t offset, const char *name); +static bool is_multi_header(const char *name); +static void add_file(off_t offset, const char *name); -void write_file_index(void); -void write_file_index_bytes(size_t size, uint8_t *buf); -void write_file_index_buf(lzma_action action); +static archive_read_callback tar_read; +static archive_open_callback tar_ok; +static archive_close_callback tar_ok; -archive_read_callback tar_read; -archive_open_callback tar_ok; -archive_close_callback tar_ok; +static void block_init(lzma_block *block); +static void stream_edge(lzma_vli backward_size); +static void encode_index(void); + +static void write_file_index(void); +static void write_file_index_bytes(size_t size, uint8_t *buf); +static void write_file_index_buf(lzma_action action); #pragma mark FUNCTION DEFINITIONS @@ -59,12 +78,67 @@ int main(int argc, char **argv) { .options = &lzma_opts }; gFilters[1] = (lzma_filter){ .id = LZMA_VLI_UNKNOWN, .options = NULL }; - // xz setup (index, header) + gBlockOutSize = lzma_block_buffer_bound(BLOCKSIZE); + + // thread setup + gEncodeQ = queue_new(); + gWriteQ = queue_new(); + if (pthread_create(&gReadThread, NULL, &read_thread, NULL)) + die("Error creating read thread"); + if (pthread_create(&gEncodeThread, NULL, &encode_thread, NULL)) + die("Error creating encode thread"); + + + // pre-block setup: header, index if (!(gIndex = lzma_index_init(NULL, NULL))) die("Error creating index"); stream_edge(LZMA_VLI_UNKNOWN); - // read archive + // write blocks + while (true) { + io_block_t *ib; + int msg = queue_pop(gWriteQ, (void**)&ib); + if (msg == MSG_STOP) + break; + + if (fwrite(ib->output, ib->outsize, 1, gOutFile) != 1) + die("Error writing block"); + + if (lzma_index_append(gIndex, NULL, + lzma_block_unpadded_size(&ib->block), + ib->block.uncompressed_size) != LZMA_OK) + die("Error adding to index"); + + free(ib->input); + free(ib->output); + free(ib); + } + + // file index + write_file_index(); + free_file_index(); + + // post-block cleanup: index, footer + encode_index(); + stream_edge(lzma_index_size(gIndex)); + lzma_index_end(gIndex, NULL); + fclose(gOutFile); + + // thread cleanup + queue_free(gEncodeQ); + queue_free(gWriteQ); + if (pthread_join(gReadThread, NULL)) + die("Error joining read thread"); + if (pthread_join(gEncodeThread, NULL)) + die("Error joining encode thread"); + + return 0; +} + + +#pragma mark READING + +static void *read_thread(void *data) { struct archive *ar = archive_read_new(); archive_read_support_compression_none(ar); archive_read_support_format_tar(ar); @@ -86,63 +160,111 @@ int main(int argc, char **argv) { } archive_read_finish(ar); fclose(gInFile); - - write_block(); // write last block, if necessary - add_file(gTotalRead, NULL); - write_file_index(); - free_file_index(); - // xz cleanup (index, footer) - encode_index(); - stream_edge(lzma_index_size(gIndex)); - lzma_index_end(gIndex, NULL); - lzma_end(&gStream); - fclose(gOutFile); + // write last block, if necessary + if (gReadBlock) { + if (gReadBlock->insize) { + queue_push(gEncodeQ, MSG_BLOCK, gReadBlock); + } else { // if this block had only one read, and it was EOF + free(gReadBlock->input); + free(gReadBlock->output); + free(gReadBlock); + } + } - return 0; + queue_push(gEncodeQ, MSG_STOP, NULL); + return NULL; } -ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) { - size_t space = BLOCKSIZE - gBlockSize; - if (space > CHUNKSIZE) - space = CHUNKSIZE; - if (space == 0) { - write_block(); - space = CHUNKSIZE; +static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) { + if (!gReadBlock) { + gReadBlock = malloc(sizeof(io_block_t)); + gReadBlock->input = malloc(BLOCKSIZE); + gReadBlock->output = malloc(gBlockOutSize); + gReadBlock->insize = 0; } - uint8_t *buf = gBlockBuf + gBlockSize; + size_t space = BLOCKSIZE - gReadBlock->insize; + if (space > CHUNKSIZE) + space = CHUNKSIZE; + uint8_t *buf = gReadBlock->input + gReadBlock->insize; size_t rd = fread(buf, 1, space, gInFile); - if (rd == 0 && ferror(gInFile)) + if (ferror(gInFile)) die("Error reading input file"); - gBlockSize += rd; + gReadBlock->insize += rd; gTotalRead += rd; *bufp = buf; + + if (gReadBlock->insize == BLOCKSIZE) { + queue_push(gEncodeQ, MSG_BLOCK, gReadBlock); + gReadBlock = NULL; + } + return rd; } -int tar_ok(struct archive *ar, void *ref) { +static int tar_ok(struct archive *ar, void *ref) { return ARCHIVE_OK; } -void stream_edge(lzma_vli backward_size) { - lzma_stream_flags flags = { .version = 0, .check = CHECK, - .backward_size = backward_size }; - uint8_t buf[LZMA_STREAM_HEADER_SIZE]; +static bool is_multi_header(const char *name) { + size_t i = strlen(name); + while (i != 0 && name[i - 1] != '/') + --i; - lzma_ret (*encoder)(const lzma_stream_flags *flags, uint8_t *buf); - encoder = backward_size == LZMA_VLI_UNKNOWN - ? &lzma_stream_header_encode - : &lzma_stream_footer_encode; - if ((*encoder)(&flags, buf) != LZMA_OK) - die("Error encoding stream edge"); + return strncmp(name + i, "._", 2) == 0; +} + +static void add_file(off_t offset, const char *name) { + if (name && is_multi_header(name)) { + if (!gMultiHeader) + gMultiHeaderStart = offset; + gMultiHeader = true; + return; + } - if (fwrite(buf, LZMA_STREAM_HEADER_SIZE, 1, gOutFile) != 1) - die("Error writing stream edge"); + file_index_t *f = malloc(sizeof(file_index_t)); + f->offset = gMultiHeader ? gMultiHeaderStart : offset; + gMultiHeader = false; + f->name = name ? strdup(name) : NULL; + f->next = NULL; + + if (gLastFile) { + gLastFile->next = f; + } else { // new index + gFileIndex = f; + } + gLastFile = f; } -void write_block_header(lzma_block *block) { + +#pragma mark ENCODING + +static void *encode_thread(void *vp) { + while (true) { + io_block_t *ib; + int msg = queue_pop(gEncodeQ, (void**)&ib); + if (msg == MSG_STOP) + break; + + block_init(&ib->block); + ib->outsize = 0; + if (lzma_block_buffer_encode(&ib->block, NULL, ib->input, ib->insize, + ib->output, &ib->outsize, gBlockOutSize) != LZMA_OK) + die("Error encoding block"); + + queue_push(gWriteQ, MSG_BLOCK, ib); + } + + queue_push(gWriteQ, MSG_STOP, NULL); + return NULL; +} + + +#pragma mark WRITING + +static void block_init(lzma_block *block) { block->version = 0; block->check = CHECK; block->filters = gFilters; @@ -150,49 +272,25 @@ void write_block_header(lzma_block *block) { if (lzma_block_header_size(block) != LZMA_OK) die("Error getting block header size"); - - uint8_t buf[block->header_size]; - if (lzma_block_header_encode(block, buf) != LZMA_OK) - die("Error encoding block header"); - if (fwrite(buf, block->header_size, 1, gOutFile) != 1) - die("Error writing block header"); } -void write_block(void) { - if (gBlockSize == 0) - return; - - lzma_block block; - write_block_header(&block); - - if (lzma_block_encoder(&gStream, &block) != LZMA_OK) - die("Error creating block encoder"); - gStream.next_in = gBlockBuf; - gStream.avail_in = gBlockSize; - uint8_t obuf[CHUNKSIZE]; - lzma_ret err = LZMA_OK; - while (err != LZMA_STREAM_END) { - gStream.next_out = obuf; - gStream.avail_out = CHUNKSIZE; - - err = lzma_code(&gStream, gStream.avail_in ? LZMA_RUN : LZMA_FINISH); - if (err != LZMA_OK && err != LZMA_STREAM_END) - die("Error encoding block"); - - if (gStream.avail_out != CHUNKSIZE) { - if (fwrite(obuf, CHUNKSIZE - gStream.avail_out, 1, gOutFile) != 1) - die("Error writing block data"); - } - } +static void stream_edge(lzma_vli backward_size) { + lzma_stream_flags flags = { .version = 0, .check = CHECK, + .backward_size = backward_size }; + uint8_t buf[LZMA_STREAM_HEADER_SIZE]; - if (lzma_index_append(gIndex, NULL, lzma_block_unpadded_size(&block), - block.uncompressed_size) != LZMA_OK) - die("Error adding to index"); + lzma_ret (*encoder)(const lzma_stream_flags *flags, uint8_t *buf); + encoder = backward_size == LZMA_VLI_UNKNOWN + ? &lzma_stream_header_encode + : &lzma_stream_footer_encode; + if ((*encoder)(&flags, buf) != LZMA_OK) + die("Error encoding stream edge"); - gBlockSize = 0; + if (fwrite(buf, LZMA_STREAM_HEADER_SIZE, 1, gOutFile) != 1) + die("Error writing stream edge"); } -void encode_index(void) { +static void encode_index(void) { if (lzma_index_encoder(&gStream, gIndex) != LZMA_OK) die("Error creating index encoder"); uint8_t obuf[CHUNKSIZE]; @@ -208,41 +306,18 @@ void encode_index(void) { die("Error writing index data"); } } + lzma_end(&gStream); } -void add_file(off_t offset, const char *name) { - if (name && is_multi_header(name)) { - if (!gMultiHeader) - gMultiHeaderStart = offset; - gMultiHeader = true; - return; - } - - file_index_t *f = malloc(sizeof(file_index_t)); - f->offset = gMultiHeader ? gMultiHeaderStart : offset; - gMultiHeader = false; - f->name = name ? strdup(name) : NULL; - f->next = NULL; - - if (gLastFile) { - gLastFile->next = f; - } else { // new index - gFileIndex = f; - } - gLastFile = f; -} - -bool is_multi_header(const char *name) { - size_t i = strlen(name); - while (i != 0 && name[i - 1] != '/') - --i; - - return strncmp(name + i, "._", 2) == 0; -} - -void write_file_index(void) { +static void write_file_index(void) { lzma_block block; - write_block_header(&block); + block_init(&block); + uint8_t hdrbuf[block.header_size]; + if (lzma_block_header_encode(&block, hdrbuf) != LZMA_OK) + die("Error encoding file index header"); + if (fwrite(hdrbuf, block.header_size, 1, gOutFile) != 1) + die("Error writing file index header"); + if (lzma_block_encoder(&gStream, &block) != LZMA_OK) die("Error creating file index encoder"); @@ -259,9 +334,10 @@ void write_file_index(void) { if (lzma_index_append(gIndex, NULL, lzma_block_unpadded_size(&block), block.uncompressed_size) != LZMA_OK) die("Error adding file-index to index"); + lzma_end(&gStream); } -void write_file_index_bytes(size_t size, uint8_t *buf) { +static void write_file_index_bytes(size_t size, uint8_t *buf) { size_t bufpos = 0; while (bufpos < size) { size_t len = size - bufpos; @@ -279,7 +355,7 @@ void write_file_index_bytes(size_t size, uint8_t *buf) { } } -void write_file_index_buf(lzma_action action) { +static void write_file_index_buf(lzma_action action) { uint8_t obuf[CHUNKSIZE]; gStream.avail_in = gFileIndexBufPos; gStream.next_in = gFileIndexBuf;