diff --git a/write.c b/write.c index 773189d..0d4b3a5 100644 --- a/write.c +++ b/write.c @@ -24,6 +24,8 @@ struct io_block_t { #pragma mark GLOBALS +#define DEBUG 0 + static size_t gNumEncodeThreads = 0; static pthread_t *gEncodeThreads = NULL; static pthread_t gReadThread; @@ -45,6 +47,12 @@ static size_t gFileIndexBufPos = 0; #pragma mark FUNCTION DECLARATIONS +#if DEBUG + #define debug(str, ...) fprintf(stderr, str "\n", ##__VA_ARGS__) +#else + #define debug(...) +#endif + static void *read_thread(void *data); static void *encode_thread(void *data); static void block_queue_free(int type, void *p); @@ -70,6 +78,7 @@ static void write_file_index_buf(lzma_action action); #pragma mark FUNCTION DEFINITIONS int main(int argc, char **argv) { + debug("launch"); if (argc != 3) die("Need two arguments"); if (!(gInFile = fopen(argv[1], "r"))) @@ -104,9 +113,10 @@ int main(int argc, char **argv) { if (pthread_create(&gReadThread, NULL, &read_thread, NULL)) die("Error creating read thread"); for (int i = 0; i < gNumEncodeThreads; ++i) { - if (pthread_create(&gEncodeThreads[i], NULL, &encode_thread, NULL)) + if (pthread_create(&gEncodeThreads[i], NULL, &encode_thread, (void*)(uintptr_t)i)) die("Error creating encode thread"); } + debug("writer: start"); // pre-block setup: header, index if (!(gIndex = lzma_index_init(NULL, NULL))) @@ -122,6 +132,7 @@ int main(int argc, char **argv) { if (msg == MSG_STOP) break; + debug("writer: received %zu", ib->seq); write_blocks(&seq, &ibs, ib); } @@ -136,6 +147,7 @@ int main(int argc, char **argv) { fclose(gOutFile); // thread cleanup + debug("writer: cleaning up reader"); if (pthread_join(gReadThread, NULL)) die("Error joining read thread"); queue_free(gEncodeQ); @@ -143,6 +155,7 @@ int main(int argc, char **argv) { queue_free(gReadQ); free(gEncodeThreads); + debug("exit"); return 0; } @@ -150,6 +163,8 @@ int main(int argc, char **argv) { #pragma mark READING static void *read_thread(void *data) { + debug("reader: start"); + struct archive *ar = archive_read_new(); archive_read_support_compression_none(ar); archive_read_support_format_tar(ar); @@ -176,11 +191,13 @@ static void *read_thread(void *data) { // write last block, if necessary if (gReadBlock) { // if this block had only one read, and it was EOF, it's waste + debug("reader: handling last block %zu", gReadBlock->seq); queue_push(gReadBlock->insize ? gEncodeQ : gReadQ, MSG_BLOCK, gReadBlock); gReadBlock = NULL; } // stop the other threads + debug("reader: cleaning up encoders"); for (int i = 0; i < gNumEncodeThreads; ++i) { queue_push(gEncodeQ, MSG_STOP, NULL); } @@ -190,6 +207,7 @@ static void *read_thread(void *data) { } queue_push(gWriteQ, MSG_STOP, NULL); + debug("reader: end"); return NULL; } @@ -198,6 +216,7 @@ static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) { queue_pop(gReadQ, (void**)&gReadBlock); gReadBlock->insize = 0; gReadBlock->seq = gBlockNum++; + debug("reader: reading %zu", gReadBlock->seq); } size_t space = gBlockInSize - gReadBlock->insize; @@ -212,6 +231,7 @@ static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) { *bufp = buf; if (gReadBlock->insize == gBlockInSize) { + debug("reader: sending %zu", gReadBlock->seq); queue_push(gEncodeQ, MSG_BLOCK, gReadBlock); gReadBlock = NULL; } @@ -272,19 +292,23 @@ static void block_queue_free(int type, void *p) { #pragma mark ENCODING -static void *encode_thread(void *vp) { +static void *encode_thread(void *arg) { + int thnum = (uintptr_t)arg; + while (true) { io_block_t *ib; int msg = queue_pop(gEncodeQ, (void**)&ib); if (msg == MSG_STOP) break; + debug("encoder %d: received %zu", thnum, ib->seq); block_init(&ib->block); ib->outsize = 0; if (lzma_block_buffer_encode(&ib->block, NULL, ib->input, ib->insize, ib->output, &ib->outsize, gBlockOutSize) != LZMA_OK) die("Error encoding block"); + debug("encoder %d: sending %zu", thnum, ib->seq); queue_push(gWriteQ, MSG_BLOCK, ib); } @@ -321,6 +345,8 @@ static void stream_edge(lzma_vli backward_size) { } static void write_block(io_block_t *ib) { + debug("writer: writing %zu", ib->seq); + // Does it make sense to chunk this? size_t written = 0; while (ib->outsize > written) { @@ -336,6 +362,8 @@ static void write_block(io_block_t *ib) { lzma_block_unpadded_size(&ib->block), ib->block.uncompressed_size) != LZMA_OK) die("Error adding to index"); + + debug("writer: writing %zu complete", ib->seq); } static void write_blocks(size_t *seq, io_block_t **ibs, io_block_t *ib) {