diff --git a/common.c b/common.c index 8d3ed60..c266acb 100644 --- a/common.c +++ b/common.c @@ -219,6 +219,12 @@ queue_t *queue_new(void) { } void queue_free(queue_t *q) { + for (queue_item_t *i = q->first; i; ) { + queue_item_t *tmp = i->next; + free(i->data); + free(i); + i = tmp; + } pthread_mutex_destroy(&q->mutex); pthread_cond_destroy(&q->pop_cond); free(q); diff --git a/pixz.h b/pixz.h index e7e893b..ef76bff 100644 --- a/pixz.h +++ b/pixz.h @@ -65,5 +65,6 @@ void free_file_index(void); queue_t *queue_new(void); void queue_free(queue_t *q); +// data should be on heap, if present void queue_push(queue_t *q, int type, void *data); int queue_pop(queue_t *q, void **datap); diff --git a/test.sh b/test.sh index 855621f..12c9b40 100755 --- a/test.sh +++ b/test.sh @@ -1,10 +1,10 @@ #!/bin/sh -base=lmnopuz -file=lmnopuz/CheckPUZ.app/Contents/Resources/script +#base=lmnopuz +#file=lmnopuz/CheckPUZ.app/Contents/Resources/script -#base=nicotine -#file=nicotine/museek+-0.1.13/doc/SConscript +base=nicotine +file=nicotine/museek+-0.1.13/doc/SConscript #base=simbl #file=Users/vasi/Desktop/SIMBL/keywurl/SIMBL.pkg/Contents/Info.plist diff --git a/write.c b/write.c index 220ed7c..020ffb8 100644 --- a/write.c +++ b/write.c @@ -6,6 +6,11 @@ #include +#pragma mark DEFINES + +#define ENCODE_THREADS 2 + + #pragma mark TYPES typedef enum { @@ -13,22 +18,28 @@ typedef enum { MSG_STOP, } msg_type_t; -typedef struct { +typedef struct io_block_t io_block_t; +struct io_block_t { + size_t seq; + io_block_t *next; + lzma_block block; uint8_t *input, *output; size_t insize, outsize; -} io_block_t; +}; #pragma mark GLOBALS -static pthread_t gReadThread, gEncodeThread; +static pthread_t gEncodeThreads[ENCODE_THREADS]; +static pthread_t gReadThread; static queue_t *gEncodeQ, *gWriteQ; static size_t gBlockOutSize = 0; static off_t gMultiHeaderStart = 0; static bool gMultiHeader = false; static off_t gTotalRead = 0; +static size_t gBlockNum = 0; static io_block_t *gReadBlock = NULL; static lzma_filter gFilters[LZMA_FILTERS_MAX + 1]; @@ -53,6 +64,7 @@ static archive_close_callback tar_ok; static void block_init(lzma_block *block); static void stream_edge(lzma_vli backward_size); +static void write_blocks(io_block_t **ibs, size_t *seq); static void encode_index(void); static void write_file_index(void); @@ -85,9 +97,10 @@ int main(int argc, char **argv) { gWriteQ = queue_new(); if (pthread_create(&gReadThread, NULL, &read_thread, NULL)) die("Error creating read thread"); - if (pthread_create(&gEncodeThread, NULL, &encode_thread, NULL)) - die("Error creating encode thread"); - + for (int i = 0; i < ENCODE_THREADS; ++i) { + if (pthread_create(&gEncodeThreads[i], NULL, &encode_thread, NULL)) + die("Error creating encode thread"); + } // pre-block setup: header, index if (!(gIndex = lzma_index_init(NULL, NULL))) @@ -95,23 +108,17 @@ int main(int argc, char **argv) { stream_edge(LZMA_VLI_UNKNOWN); // write blocks + size_t seq = 0; + io_block_t *ibs = NULL; while (true) { io_block_t *ib; int msg = queue_pop(gWriteQ, (void**)&ib); if (msg == MSG_STOP) break; - if (fwrite(ib->output, ib->outsize, 1, gOutFile) != 1) - die("Error writing block"); - - if (lzma_index_append(gIndex, NULL, - lzma_block_unpadded_size(&ib->block), - ib->block.uncompressed_size) != LZMA_OK) - die("Error adding to index"); - - free(ib->input); - free(ib->output); - free(ib); + ib->next = ibs; + ibs = ib; + write_blocks(&ibs, &seq); } // file index @@ -125,12 +132,10 @@ int main(int argc, char **argv) { fclose(gOutFile); // thread cleanup - queue_free(gEncodeQ); - queue_free(gWriteQ); if (pthread_join(gReadThread, NULL)) die("Error joining read thread"); - if (pthread_join(gEncodeThread, NULL)) - die("Error joining encode thread"); + queue_free(gEncodeQ); + queue_free(gWriteQ); return 0; } @@ -173,7 +178,16 @@ static void *read_thread(void *data) { } } - queue_push(gEncodeQ, MSG_STOP, NULL); + // stop the other threads + for (int i = 0; i < ENCODE_THREADS; ++i) { + queue_push(gEncodeQ, MSG_STOP, NULL); + } + for (int i = 0; i < ENCODE_THREADS; ++i) { + if (pthread_join(gEncodeThreads[i], NULL)) + die("Error joining encode thread"); + } + queue_push(gWriteQ, MSG_STOP, NULL); + return NULL; } @@ -183,6 +197,7 @@ static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) { gReadBlock->input = malloc(BLOCKSIZE); gReadBlock->output = malloc(gBlockOutSize); gReadBlock->insize = 0; + gReadBlock->seq = gBlockNum++; } size_t space = BLOCKSIZE - gReadBlock->insize; @@ -257,7 +272,6 @@ static void *encode_thread(void *vp) { queue_push(gWriteQ, MSG_BLOCK, ib); } - queue_push(gWriteQ, MSG_STOP, NULL); return NULL; } @@ -290,6 +304,42 @@ static void stream_edge(lzma_vli backward_size) { die("Error writing stream edge"); } +static void write_blocks(io_block_t **ibs, size_t *seq) { + // check if we can write anything + bool block_missing = false; + while (!block_missing) { + block_missing = true; // assume no match + + io_block_t *prev = NULL; + for (io_block_t *ib = *ibs; ib; ib = ib->next) { + if (ib->seq == *seq) { // we have the next block + if (fwrite(ib->output, ib->outsize, 1, gOutFile) != 1) + die("Error writing block data"); + if (lzma_index_append(gIndex, NULL, + lzma_block_unpadded_size(&ib->block), + ib->block.uncompressed_size) != LZMA_OK) + die("Error adding to index"); + + // remove the found block + if (prev) { + prev->next = ib->next; + } else { + *ibs = ib->next; + } + free(ib->input); + free(ib->output); + free(ib); + + ++*seq; + block_missing = false; + break; + } + + prev = ib; + } // for io_block_t + } // while !block_missing +} + static void encode_index(void) { if (lzma_index_encoder(&gStream, gIndex) != LZMA_OK) die("Error creating index encoder");