multi-threaded encoding

pull/2/head
Dave Vasilevsky 15 years ago
parent 42302434a7
commit 804124e82b

@ -219,6 +219,12 @@ queue_t *queue_new(void) {
} }
void queue_free(queue_t *q) { void queue_free(queue_t *q) {
for (queue_item_t *i = q->first; i; ) {
queue_item_t *tmp = i->next;
free(i->data);
free(i);
i = tmp;
}
pthread_mutex_destroy(&q->mutex); pthread_mutex_destroy(&q->mutex);
pthread_cond_destroy(&q->pop_cond); pthread_cond_destroy(&q->pop_cond);
free(q); free(q);

@ -65,5 +65,6 @@ void free_file_index(void);
queue_t *queue_new(void); queue_t *queue_new(void);
void queue_free(queue_t *q); void queue_free(queue_t *q);
// data should be on heap, if present
void queue_push(queue_t *q, int type, void *data); void queue_push(queue_t *q, int type, void *data);
int queue_pop(queue_t *q, void **datap); int queue_pop(queue_t *q, void **datap);

@ -1,10 +1,10 @@
#!/bin/sh #!/bin/sh
base=lmnopuz #base=lmnopuz
file=lmnopuz/CheckPUZ.app/Contents/Resources/script #file=lmnopuz/CheckPUZ.app/Contents/Resources/script
#base=nicotine base=nicotine
#file=nicotine/museek+-0.1.13/doc/SConscript file=nicotine/museek+-0.1.13/doc/SConscript
#base=simbl #base=simbl
#file=Users/vasi/Desktop/SIMBL/keywurl/SIMBL.pkg/Contents/Info.plist #file=Users/vasi/Desktop/SIMBL/keywurl/SIMBL.pkg/Contents/Info.plist

@ -6,6 +6,11 @@
#include <libkern/OSByteOrder.h> #include <libkern/OSByteOrder.h>
#pragma mark DEFINES
#define ENCODE_THREADS 2
#pragma mark TYPES #pragma mark TYPES
typedef enum { typedef enum {
@ -13,22 +18,28 @@ typedef enum {
MSG_STOP, MSG_STOP,
} msg_type_t; } msg_type_t;
typedef struct { typedef struct io_block_t io_block_t;
struct io_block_t {
size_t seq;
io_block_t *next;
lzma_block block; lzma_block block;
uint8_t *input, *output; uint8_t *input, *output;
size_t insize, outsize; size_t insize, outsize;
} io_block_t; };
#pragma mark GLOBALS #pragma mark GLOBALS
static pthread_t gReadThread, gEncodeThread; static pthread_t gEncodeThreads[ENCODE_THREADS];
static pthread_t gReadThread;
static queue_t *gEncodeQ, *gWriteQ; static queue_t *gEncodeQ, *gWriteQ;
static size_t gBlockOutSize = 0; static size_t gBlockOutSize = 0;
static off_t gMultiHeaderStart = 0; static off_t gMultiHeaderStart = 0;
static bool gMultiHeader = false; static bool gMultiHeader = false;
static off_t gTotalRead = 0; static off_t gTotalRead = 0;
static size_t gBlockNum = 0;
static io_block_t *gReadBlock = NULL; static io_block_t *gReadBlock = NULL;
static lzma_filter gFilters[LZMA_FILTERS_MAX + 1]; static lzma_filter gFilters[LZMA_FILTERS_MAX + 1];
@ -53,6 +64,7 @@ static archive_close_callback tar_ok;
static void block_init(lzma_block *block); static void block_init(lzma_block *block);
static void stream_edge(lzma_vli backward_size); static void stream_edge(lzma_vli backward_size);
static void write_blocks(io_block_t **ibs, size_t *seq);
static void encode_index(void); static void encode_index(void);
static void write_file_index(void); static void write_file_index(void);
@ -85,9 +97,10 @@ int main(int argc, char **argv) {
gWriteQ = queue_new(); gWriteQ = queue_new();
if (pthread_create(&gReadThread, NULL, &read_thread, NULL)) if (pthread_create(&gReadThread, NULL, &read_thread, NULL))
die("Error creating read thread"); die("Error creating read thread");
if (pthread_create(&gEncodeThread, NULL, &encode_thread, NULL)) for (int i = 0; i < ENCODE_THREADS; ++i) {
die("Error creating encode thread"); if (pthread_create(&gEncodeThreads[i], NULL, &encode_thread, NULL))
die("Error creating encode thread");
}
// pre-block setup: header, index // pre-block setup: header, index
if (!(gIndex = lzma_index_init(NULL, NULL))) if (!(gIndex = lzma_index_init(NULL, NULL)))
@ -95,23 +108,17 @@ int main(int argc, char **argv) {
stream_edge(LZMA_VLI_UNKNOWN); stream_edge(LZMA_VLI_UNKNOWN);
// write blocks // write blocks
size_t seq = 0;
io_block_t *ibs = NULL;
while (true) { while (true) {
io_block_t *ib; io_block_t *ib;
int msg = queue_pop(gWriteQ, (void**)&ib); int msg = queue_pop(gWriteQ, (void**)&ib);
if (msg == MSG_STOP) if (msg == MSG_STOP)
break; break;
if (fwrite(ib->output, ib->outsize, 1, gOutFile) != 1) ib->next = ibs;
die("Error writing block"); ibs = ib;
write_blocks(&ibs, &seq);
if (lzma_index_append(gIndex, NULL,
lzma_block_unpadded_size(&ib->block),
ib->block.uncompressed_size) != LZMA_OK)
die("Error adding to index");
free(ib->input);
free(ib->output);
free(ib);
} }
// file index // file index
@ -125,12 +132,10 @@ int main(int argc, char **argv) {
fclose(gOutFile); fclose(gOutFile);
// thread cleanup // thread cleanup
queue_free(gEncodeQ);
queue_free(gWriteQ);
if (pthread_join(gReadThread, NULL)) if (pthread_join(gReadThread, NULL))
die("Error joining read thread"); die("Error joining read thread");
if (pthread_join(gEncodeThread, NULL)) queue_free(gEncodeQ);
die("Error joining encode thread"); queue_free(gWriteQ);
return 0; return 0;
} }
@ -173,7 +178,16 @@ static void *read_thread(void *data) {
} }
} }
queue_push(gEncodeQ, MSG_STOP, NULL); // stop the other threads
for (int i = 0; i < ENCODE_THREADS; ++i) {
queue_push(gEncodeQ, MSG_STOP, NULL);
}
for (int i = 0; i < ENCODE_THREADS; ++i) {
if (pthread_join(gEncodeThreads[i], NULL))
die("Error joining encode thread");
}
queue_push(gWriteQ, MSG_STOP, NULL);
return NULL; return NULL;
} }
@ -183,6 +197,7 @@ static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) {
gReadBlock->input = malloc(BLOCKSIZE); gReadBlock->input = malloc(BLOCKSIZE);
gReadBlock->output = malloc(gBlockOutSize); gReadBlock->output = malloc(gBlockOutSize);
gReadBlock->insize = 0; gReadBlock->insize = 0;
gReadBlock->seq = gBlockNum++;
} }
size_t space = BLOCKSIZE - gReadBlock->insize; size_t space = BLOCKSIZE - gReadBlock->insize;
@ -257,7 +272,6 @@ static void *encode_thread(void *vp) {
queue_push(gWriteQ, MSG_BLOCK, ib); queue_push(gWriteQ, MSG_BLOCK, ib);
} }
queue_push(gWriteQ, MSG_STOP, NULL);
return NULL; return NULL;
} }
@ -290,6 +304,42 @@ static void stream_edge(lzma_vli backward_size) {
die("Error writing stream edge"); die("Error writing stream edge");
} }
static void write_blocks(io_block_t **ibs, size_t *seq) {
// check if we can write anything
bool block_missing = false;
while (!block_missing) {
block_missing = true; // assume no match
io_block_t *prev = NULL;
for (io_block_t *ib = *ibs; ib; ib = ib->next) {
if (ib->seq == *seq) { // we have the next block
if (fwrite(ib->output, ib->outsize, 1, gOutFile) != 1)
die("Error writing block data");
if (lzma_index_append(gIndex, NULL,
lzma_block_unpadded_size(&ib->block),
ib->block.uncompressed_size) != LZMA_OK)
die("Error adding to index");
// remove the found block
if (prev) {
prev->next = ib->next;
} else {
*ibs = ib->next;
}
free(ib->input);
free(ib->output);
free(ib);
++*seq;
block_missing = false;
break;
}
prev = ib;
} // for io_block_t
} // while !block_missing
}
static void encode_index(void) { static void encode_index(void) {
if (lzma_index_encoder(&gStream, gIndex) != LZMA_OK) if (lzma_index_encoder(&gStream, gIndex) != LZMA_OK)
die("Error creating index encoder"); die("Error creating index encoder");

Loading…
Cancel
Save