|
|
@ -24,6 +24,8 @@ struct io_block_t {
|
|
|
|
|
|
|
|
|
|
|
|
#pragma mark GLOBALS
|
|
|
|
#pragma mark GLOBALS
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#define DEBUG 0
|
|
|
|
|
|
|
|
|
|
|
|
static size_t gNumEncodeThreads = 0;
|
|
|
|
static size_t gNumEncodeThreads = 0;
|
|
|
|
static pthread_t *gEncodeThreads = NULL;
|
|
|
|
static pthread_t *gEncodeThreads = NULL;
|
|
|
|
static pthread_t gReadThread;
|
|
|
|
static pthread_t gReadThread;
|
|
|
@ -45,6 +47,12 @@ static size_t gFileIndexBufPos = 0;
|
|
|
|
|
|
|
|
|
|
|
|
#pragma mark FUNCTION DECLARATIONS
|
|
|
|
#pragma mark FUNCTION DECLARATIONS
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#if DEBUG
|
|
|
|
|
|
|
|
#define debug(str, ...) fprintf(stderr, str "\n", ##__VA_ARGS__)
|
|
|
|
|
|
|
|
#else
|
|
|
|
|
|
|
|
#define debug(...)
|
|
|
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
|
|
static void *read_thread(void *data);
|
|
|
|
static void *read_thread(void *data);
|
|
|
|
static void *encode_thread(void *data);
|
|
|
|
static void *encode_thread(void *data);
|
|
|
|
static void block_queue_free(int type, void *p);
|
|
|
|
static void block_queue_free(int type, void *p);
|
|
|
@ -70,6 +78,7 @@ static void write_file_index_buf(lzma_action action);
|
|
|
|
#pragma mark FUNCTION DEFINITIONS
|
|
|
|
#pragma mark FUNCTION DEFINITIONS
|
|
|
|
|
|
|
|
|
|
|
|
int main(int argc, char **argv) {
|
|
|
|
int main(int argc, char **argv) {
|
|
|
|
|
|
|
|
debug("launch");
|
|
|
|
if (argc != 3)
|
|
|
|
if (argc != 3)
|
|
|
|
die("Need two arguments");
|
|
|
|
die("Need two arguments");
|
|
|
|
if (!(gInFile = fopen(argv[1], "r")))
|
|
|
|
if (!(gInFile = fopen(argv[1], "r")))
|
|
|
@ -104,9 +113,10 @@ int main(int argc, char **argv) {
|
|
|
|
if (pthread_create(&gReadThread, NULL, &read_thread, NULL))
|
|
|
|
if (pthread_create(&gReadThread, NULL, &read_thread, NULL))
|
|
|
|
die("Error creating read thread");
|
|
|
|
die("Error creating read thread");
|
|
|
|
for (int i = 0; i < gNumEncodeThreads; ++i) {
|
|
|
|
for (int i = 0; i < gNumEncodeThreads; ++i) {
|
|
|
|
if (pthread_create(&gEncodeThreads[i], NULL, &encode_thread, NULL))
|
|
|
|
if (pthread_create(&gEncodeThreads[i], NULL, &encode_thread, (void*)(uintptr_t)i))
|
|
|
|
die("Error creating encode thread");
|
|
|
|
die("Error creating encode thread");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
debug("writer: start");
|
|
|
|
|
|
|
|
|
|
|
|
// pre-block setup: header, index
|
|
|
|
// pre-block setup: header, index
|
|
|
|
if (!(gIndex = lzma_index_init(NULL, NULL)))
|
|
|
|
if (!(gIndex = lzma_index_init(NULL, NULL)))
|
|
|
@ -122,6 +132,7 @@ int main(int argc, char **argv) {
|
|
|
|
if (msg == MSG_STOP)
|
|
|
|
if (msg == MSG_STOP)
|
|
|
|
break;
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
debug("writer: received %zu", ib->seq);
|
|
|
|
write_blocks(&seq, &ibs, ib);
|
|
|
|
write_blocks(&seq, &ibs, ib);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -136,6 +147,7 @@ int main(int argc, char **argv) {
|
|
|
|
fclose(gOutFile);
|
|
|
|
fclose(gOutFile);
|
|
|
|
|
|
|
|
|
|
|
|
// thread cleanup
|
|
|
|
// thread cleanup
|
|
|
|
|
|
|
|
debug("writer: cleaning up reader");
|
|
|
|
if (pthread_join(gReadThread, NULL))
|
|
|
|
if (pthread_join(gReadThread, NULL))
|
|
|
|
die("Error joining read thread");
|
|
|
|
die("Error joining read thread");
|
|
|
|
queue_free(gEncodeQ);
|
|
|
|
queue_free(gEncodeQ);
|
|
|
@ -143,6 +155,7 @@ int main(int argc, char **argv) {
|
|
|
|
queue_free(gReadQ);
|
|
|
|
queue_free(gReadQ);
|
|
|
|
free(gEncodeThreads);
|
|
|
|
free(gEncodeThreads);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
debug("exit");
|
|
|
|
return 0;
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -150,6 +163,8 @@ int main(int argc, char **argv) {
|
|
|
|
#pragma mark READING
|
|
|
|
#pragma mark READING
|
|
|
|
|
|
|
|
|
|
|
|
static void *read_thread(void *data) {
|
|
|
|
static void *read_thread(void *data) {
|
|
|
|
|
|
|
|
debug("reader: start");
|
|
|
|
|
|
|
|
|
|
|
|
struct archive *ar = archive_read_new();
|
|
|
|
struct archive *ar = archive_read_new();
|
|
|
|
archive_read_support_compression_none(ar);
|
|
|
|
archive_read_support_compression_none(ar);
|
|
|
|
archive_read_support_format_tar(ar);
|
|
|
|
archive_read_support_format_tar(ar);
|
|
|
@ -176,11 +191,13 @@ static void *read_thread(void *data) {
|
|
|
|
// write last block, if necessary
|
|
|
|
// write last block, if necessary
|
|
|
|
if (gReadBlock) {
|
|
|
|
if (gReadBlock) {
|
|
|
|
// if this block had only one read, and it was EOF, it's waste
|
|
|
|
// if this block had only one read, and it was EOF, it's waste
|
|
|
|
|
|
|
|
debug("reader: handling last block %zu", gReadBlock->seq);
|
|
|
|
queue_push(gReadBlock->insize ? gEncodeQ : gReadQ, MSG_BLOCK, gReadBlock);
|
|
|
|
queue_push(gReadBlock->insize ? gEncodeQ : gReadQ, MSG_BLOCK, gReadBlock);
|
|
|
|
gReadBlock = NULL;
|
|
|
|
gReadBlock = NULL;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// stop the other threads
|
|
|
|
// stop the other threads
|
|
|
|
|
|
|
|
debug("reader: cleaning up encoders");
|
|
|
|
for (int i = 0; i < gNumEncodeThreads; ++i) {
|
|
|
|
for (int i = 0; i < gNumEncodeThreads; ++i) {
|
|
|
|
queue_push(gEncodeQ, MSG_STOP, NULL);
|
|
|
|
queue_push(gEncodeQ, MSG_STOP, NULL);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -190,6 +207,7 @@ static void *read_thread(void *data) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
queue_push(gWriteQ, MSG_STOP, NULL);
|
|
|
|
queue_push(gWriteQ, MSG_STOP, NULL);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
debug("reader: end");
|
|
|
|
return NULL;
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -198,6 +216,7 @@ static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) {
|
|
|
|
queue_pop(gReadQ, (void**)&gReadBlock);
|
|
|
|
queue_pop(gReadQ, (void**)&gReadBlock);
|
|
|
|
gReadBlock->insize = 0;
|
|
|
|
gReadBlock->insize = 0;
|
|
|
|
gReadBlock->seq = gBlockNum++;
|
|
|
|
gReadBlock->seq = gBlockNum++;
|
|
|
|
|
|
|
|
debug("reader: reading %zu", gReadBlock->seq);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
size_t space = gBlockInSize - gReadBlock->insize;
|
|
|
|
size_t space = gBlockInSize - gReadBlock->insize;
|
|
|
@ -212,6 +231,7 @@ static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) {
|
|
|
|
*bufp = buf;
|
|
|
|
*bufp = buf;
|
|
|
|
|
|
|
|
|
|
|
|
if (gReadBlock->insize == gBlockInSize) {
|
|
|
|
if (gReadBlock->insize == gBlockInSize) {
|
|
|
|
|
|
|
|
debug("reader: sending %zu", gReadBlock->seq);
|
|
|
|
queue_push(gEncodeQ, MSG_BLOCK, gReadBlock);
|
|
|
|
queue_push(gEncodeQ, MSG_BLOCK, gReadBlock);
|
|
|
|
gReadBlock = NULL;
|
|
|
|
gReadBlock = NULL;
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -272,19 +292,23 @@ static void block_queue_free(int type, void *p) {
|
|
|
|
|
|
|
|
|
|
|
|
#pragma mark ENCODING
|
|
|
|
#pragma mark ENCODING
|
|
|
|
|
|
|
|
|
|
|
|
static void *encode_thread(void *vp) {
|
|
|
|
static void *encode_thread(void *arg) {
|
|
|
|
|
|
|
|
int thnum = (uintptr_t)arg;
|
|
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
while (true) {
|
|
|
|
io_block_t *ib;
|
|
|
|
io_block_t *ib;
|
|
|
|
int msg = queue_pop(gEncodeQ, (void**)&ib);
|
|
|
|
int msg = queue_pop(gEncodeQ, (void**)&ib);
|
|
|
|
if (msg == MSG_STOP)
|
|
|
|
if (msg == MSG_STOP)
|
|
|
|
break;
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
debug("encoder %d: received %zu", thnum, ib->seq);
|
|
|
|
block_init(&ib->block);
|
|
|
|
block_init(&ib->block);
|
|
|
|
ib->outsize = 0;
|
|
|
|
ib->outsize = 0;
|
|
|
|
if (lzma_block_buffer_encode(&ib->block, NULL, ib->input, ib->insize,
|
|
|
|
if (lzma_block_buffer_encode(&ib->block, NULL, ib->input, ib->insize,
|
|
|
|
ib->output, &ib->outsize, gBlockOutSize) != LZMA_OK)
|
|
|
|
ib->output, &ib->outsize, gBlockOutSize) != LZMA_OK)
|
|
|
|
die("Error encoding block");
|
|
|
|
die("Error encoding block");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
debug("encoder %d: sending %zu", thnum, ib->seq);
|
|
|
|
queue_push(gWriteQ, MSG_BLOCK, ib);
|
|
|
|
queue_push(gWriteQ, MSG_BLOCK, ib);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -321,6 +345,8 @@ static void stream_edge(lzma_vli backward_size) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static void write_block(io_block_t *ib) {
|
|
|
|
static void write_block(io_block_t *ib) {
|
|
|
|
|
|
|
|
debug("writer: writing %zu", ib->seq);
|
|
|
|
|
|
|
|
|
|
|
|
// Does it make sense to chunk this?
|
|
|
|
// Does it make sense to chunk this?
|
|
|
|
size_t written = 0;
|
|
|
|
size_t written = 0;
|
|
|
|
while (ib->outsize > written) {
|
|
|
|
while (ib->outsize > written) {
|
|
|
@ -336,6 +362,8 @@ static void write_block(io_block_t *ib) {
|
|
|
|
lzma_block_unpadded_size(&ib->block),
|
|
|
|
lzma_block_unpadded_size(&ib->block),
|
|
|
|
ib->block.uncompressed_size) != LZMA_OK)
|
|
|
|
ib->block.uncompressed_size) != LZMA_OK)
|
|
|
|
die("Error adding to index");
|
|
|
|
die("Error adding to index");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
debug("writer: writing %zu complete", ib->seq);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static void write_blocks(size_t *seq, io_block_t **ibs, io_block_t *ib) {
|
|
|
|
static void write_blocks(size_t *seq, io_block_t **ibs, io_block_t *ib) {
|
|
|
|