Generalize pipeline

pull/2/head
Dave Vasilevsky 14 years ago
parent a1cd0e174e
commit 6da292fe53

@ -17,23 +17,15 @@ COMPILE = $(CC) $(CFLAGS) -c -o
LD = $(CC) $(LDFLAGS) -o
PROGS = write read list
COMMON = common.o endian.o cpu.o
all: $(PROGS)
%.o: %.c pixz.h
$(COMPILE) $@ $<
list: list.o common.o endian.o
$(LD) $@ $^ -llzma
write: write.o common.o endian.o cpu.o
$(LD) $@ $^ -larchive -llzma
read: read.o common.o endian.o
$(LD) $@ $^ -llzma
pread: pread.o common.o endian.o
$(LD) $@ $^ -llzma
$(PROGS): %: %.o $(COMMON)
$(LD) $@ $^ -llzma -larchive
clean:
rm -f *.o $(PROGS)

@ -3,40 +3,18 @@
#include <stdarg.h>
#pragma mark TYPES
#pragma mark UTILS
typedef struct {
lzma_block block;
lzma_filter filters[LZMA_FILTERS_MAX + 1];
} block_wrapper_t;
#pragma mark GLOBALS
FILE *gInFile = NULL;
lzma_stream gStream = LZMA_STREAM_INIT;
lzma_index *gIndex = NULL;
file_index_t *gFileIndex = NULL, *gLastFile = NULL;
static lzma_check gCheck = LZMA_CHECK_NONE;
static uint8_t *gFileIndexBuf = NULL;
static size_t gFIBSize = CHUNKSIZE, gFIBPos = 0;
static lzma_ret gFIBErr = LZMA_OK;
static uint8_t gFIBInputBuf[CHUNKSIZE];
static size_t gMoved = 0;
#pragma mark FUNCTION DECLARATIONS
static char *read_file_index_name(void);
static void read_file_index_make_space(void);
static void read_file_index_data(void);
#pragma mark FUNCTION DEFINITIONS
void die(const char *fmt, ...) {
va_list args;
@ -58,6 +36,48 @@ char *xstrdup(const char *s) {
return memcpy(r, s, len + 1);
}
void *decode_block_start(off_t block_seek) {
if (fseeko(gInFile, block_seek, SEEK_SET) == -1)
die("Error seeking to block");
block_wrapper_t *bw = malloc(sizeof(block_wrapper_t));
bw->block = (lzma_block){ .check = gCheck, .filters = bw->filters,
.version = 0 };
int b = fgetc(gInFile);
if (b == EOF || b == 0)
die("Error reading block size");
bw->block.header_size = lzma_block_header_size_decode(b);
uint8_t hdrbuf[bw->block.header_size];
hdrbuf[0] = (uint8_t)b;
if (fread(hdrbuf + 1, bw->block.header_size - 1, 1, gInFile) != 1)
die("Error reading block header");
if (lzma_block_header_decode(&bw->block, NULL, hdrbuf) != LZMA_OK)
die("Error decoding file index block header");
if (lzma_block_decoder(&gStream, &bw->block) != LZMA_OK)
die("Error initializing file index stream");
return bw;
}
#pragma mark INDEX
lzma_index *gIndex = NULL;
file_index_t *gFileIndex = NULL, *gLastFile = NULL;
static uint8_t *gFileIndexBuf = NULL;
static size_t gFIBSize = CHUNKSIZE, gFIBPos = 0;
static lzma_ret gFIBErr = LZMA_OK;
static uint8_t gFIBInputBuf[CHUNKSIZE];
static size_t gMoved = 0;
static char *read_file_index_name(void);
static void read_file_index_make_space(void);
static void read_file_index_data(void);
void dump_file_index(void) {
for (file_index_t *f = gFileIndex; f != NULL; f = f->next) {
fprintf(stderr, "%10"PRIuMAX" %s\n", (uintmax_t)f->offset, f->name ? f->name : "");
@ -205,30 +225,8 @@ void decode_index(void) {
}
}
void *decode_block_start(off_t block_seek) {
if (fseeko(gInFile, block_seek, SEEK_SET) == -1)
die("Error seeking to block");
block_wrapper_t *bw = malloc(sizeof(block_wrapper_t));
bw->block = (lzma_block){ .check = gCheck, .filters = bw->filters,
.version = 0 };
int b = fgetc(gInFile);
if (b == EOF || b == 0)
die("Error reading block size");
bw->block.header_size = lzma_block_header_size_decode(b);
uint8_t hdrbuf[bw->block.header_size];
hdrbuf[0] = (uint8_t)b;
if (fread(hdrbuf + 1, bw->block.header_size - 1, 1, gInFile) != 1)
die("Error reading block header");
if (lzma_block_header_decode(&bw->block, NULL, hdrbuf) != LZMA_OK)
die("Error decoding file index block header");
if (lzma_block_decoder(&gStream, &bw->block) != LZMA_OK)
die("Error initializing file index stream");
return bw;
}
#pragma mark QUEUE
queue_t *queue_new(queue_free_t freer) {
queue_t *q = malloc(sizeof(queue_t));
@ -288,3 +286,138 @@ int queue_pop(queue_t *q, void **datap) {
pthread_mutex_unlock(&q->mutex);
return type;
}
#pragma mark PIPELINE
queue_t *gPipelineStartQ = NULL,
*gPipelineSplitQ = NULL,
*gPipelineMergeQ = NULL;
pipeline_data_free_t gPLFreer = NULL;
pipeline_split_t gPLSplit = NULL;
pipeline_process_t gPLProcess = NULL;
size_t gPLProcessCount = 0;
pthread_t *gPLProcessThreads = NULL;
pthread_t gPLSplitThread;
ssize_t gPLSplitSeq = 0;
ssize_t gPLMergeSeq = 0;
pipeline_item_t *gPLMergedItems = NULL;
static void pipeline_qfree(int type, void *p);
static void *pipeline_thread_split(void *);
static void *pipeline_thread_process(void *arg);
void pipeline_create(
pipeline_data_create_t create,
pipeline_data_free_t destroy,
pipeline_split_t split,
pipeline_process_t process) {
gPLFreer = destroy;
gPLSplit = split;
gPLProcess = process;
gPipelineStartQ = queue_new(pipeline_qfree);
gPipelineSplitQ = queue_new(pipeline_qfree);
gPipelineMergeQ = queue_new(pipeline_qfree);
gPLSplitSeq = 0;
gPLMergeSeq = 0;
gPLMergedItems = NULL;
gPLProcessCount = num_threads();
gPLProcessThreads = malloc(gPLProcessCount * sizeof(pthread_t));
for (size_t i = 0; i < (int)(gPLProcessCount * 1.5 + 2); ++i) {
// create blocks, including a margin of error
pipeline_item_t *item = malloc(sizeof(pipeline_item_t));
item->data = create();
// seq and next are garbage
queue_push(gPipelineStartQ, PIPELINE_ITEM, item);
}
if (pthread_create(&gPLSplitThread, NULL, &pipeline_thread_split, NULL))
die("Error creating read thread");
for (size_t i = 0; i < gPLProcessCount; ++i) {
if (pthread_create(&gPLProcessThreads[i], NULL,
&pipeline_thread_process, (void*)(uintptr_t)i))
die("Error creating encode thread");
}
}
static void pipeline_qfree(int type, void *p) {
switch (type) {
case PIPELINE_ITEM: {
pipeline_item_t *item = (pipeline_item_t*)p;
gPLFreer(item->data);
free(item);
break;
}
case PIPELINE_STOP:
break;
default:
die("Unknown msg type %d", type);
}
}
static void *pipeline_thread_split(void *ignore) {
gPLSplit();
return NULL;
}
static void *pipeline_thread_process(void *arg) {
size_t thnum = (uintptr_t)arg;
gPLProcess(thnum);
return NULL;
}
void pipeline_stop(void) {
// ask the other threads to stop
for (size_t i = 0; i < gPLProcessCount; ++i)
queue_push(gPipelineSplitQ, PIPELINE_STOP, NULL);
for (size_t i = 0; i < gPLProcessCount; ++i) {
if (pthread_join(gPLProcessThreads[i], NULL))
die("Error joining processing thread");
}
queue_push(gPipelineMergeQ, PIPELINE_STOP, NULL);
}
void pipeline_destroy(void) {
if (pthread_join(gPLSplitThread, NULL))
die("Error joining splitter thread");
queue_free(gPipelineStartQ);
queue_free(gPipelineSplitQ);
queue_free(gPipelineMergeQ);
free(gPLProcessThreads);
}
void pipeline_split(pipeline_item_t *item) {
item->seq = gPLSplitSeq++;
item->next = NULL;
queue_push(gPipelineSplitQ, PIPELINE_ITEM, item);
}
pipeline_item_t *pipeline_merged() {
pipeline_item_t *item;
while (!gPLMergedItems || gPLMergedItems->seq != gPLMergeSeq) {
// We don't have the next item, wait for a new one
pipeline_tag_t tag = queue_pop(gPipelineMergeQ, (void**)&item);
if (tag == PIPELINE_STOP)
return NULL; // Done processing items
// Insert the item into the queue
pipeline_item_t **prev = &gPLMergedItems;
while (*prev && (*prev)->seq < item->seq) {
prev = &(*prev)->next;
}
item->next = *prev;
*prev = item;
}
// Got the next item
item = gPLMergedItems;
gPLMergedItems = item->next;
++gPLMergeSeq;
return item;
}

@ -21,7 +21,25 @@
#define CHUNKSIZE 4096
#pragma mark TYPES
#pragma mark UTILS
FILE *gInFile;
lzma_stream gStream;
extern lzma_index *gIndex;
void die(const char *fmt, ...);
char *xstrdup(const char *s);
uint64_t xle64dec(const uint8_t *d);
void xle64enc(uint8_t *d, uint64_t n);
size_t num_threads(void);
void *decode_block_start(off_t block_seek);
#pragma mark INDEX
typedef struct file_index_t file_index_t;
struct file_index_t {
@ -30,6 +48,16 @@ struct file_index_t {
file_index_t *next;
};
extern file_index_t *gFileIndex, *gLastFile;
void decode_index(void);
bool read_file_index(void);
void dump_file_index(void);
void free_file_index(void);
#pragma mark QUEUE
typedef struct queue_item_t queue_item_t;
struct queue_item_t {
@ -51,32 +79,41 @@ typedef struct {
} queue_t;
#pragma mark GLOBALS
FILE *gInFile;
lzma_stream gStream;
extern lzma_index *gIndex;
extern file_index_t *gFileIndex, *gLastFile;
queue_t *queue_new(queue_free_t freer);
void queue_free(queue_t *q);
void queue_push(queue_t *q, int type, void *data);
int queue_pop(queue_t *q, void **datap);
#pragma mark FUNCTION DECLARATIONS
#pragma mark PIPELINE
void die(const char *fmt, ...);
char *xstrdup(const char *s);
extern queue_t *gPipelineStartQ, *gPipelineSplitQ, *gPipelineMergeQ;
uint64_t xle64dec(const uint8_t *d);
void xle64enc(uint8_t *d, uint64_t n);
size_t num_threads(void);
typedef enum {
PIPELINE_ITEM,
PIPELINE_STOP
} pipeline_tag_t;
void decode_index(void);
void *decode_block_start(off_t block_seek);
bool read_file_index(void);
void dump_file_index(void);
void free_file_index(void);
typedef struct pipeline_item_t pipeline_item_t;
struct pipeline_item_t {
size_t seq;
pipeline_item_t *next;
void *data;
};
queue_t *queue_new(queue_free_t freer);
void queue_free(queue_t *q);
void queue_push(queue_t *q, int type, void *data);
int queue_pop(queue_t *q, void **datap);
typedef void* (*pipeline_data_create_t)(void);
typedef void (*pipeline_data_free_t)(void*);
typedef void (*pipeline_split_t)(void);
typedef void (*pipeline_process_t)(size_t);
void pipeline_create(
pipeline_data_create_t create,
pipeline_data_free_t destroy,
pipeline_split_t split,
pipeline_process_t process);
void pipeline_stop(void);
void pipeline_destroy(void);
void pipeline_split(pipeline_item_t *item);
pipeline_item_t *pipeline_merged();

@ -14,9 +14,6 @@ typedef enum {
typedef struct io_block_t io_block_t;
struct io_block_t {
size_t seq;
io_block_t *next;
lzma_block block;
uint8_t *input, *output;
size_t insize, outsize;
@ -30,17 +27,15 @@ struct io_block_t {
static bool gTar = true;
static uint32_t gPreset = LZMA_PRESET_DEFAULT;
static size_t gNumEncodeThreads = 0;
static pthread_t *gEncodeThreads = NULL;
static pthread_t gReadThread;
static queue_t *gReadQ, *gEncodeQ, *gWriteQ;
static size_t gBlockInSize = 0, gBlockOutSize = 0;
static off_t gMultiHeaderStart = 0;
static bool gMultiHeader = false;
static off_t gTotalRead = 0;
static size_t gBlockNum = 0;
static pipeline_item_t *gReadItem = NULL;
static io_block_t *gReadBlock = NULL;
static size_t gReadItemCount = 0;
static lzma_filter gFilters[LZMA_FILTERS_MAX + 1];
@ -57,9 +52,10 @@ static size_t gFileIndexBufPos = 0;
#define debug(...)
#endif
static void *read_thread(void *data);
static void *encode_thread(void *data);
static void block_queue_free(int type, void *p);
static void read_thread();
static void encode_thread(size_t thnum);
static void *block_create();
static void block_free(void *data);
static bool is_multi_header(const char *name);
static void add_file(off_t offset, const char *name);
@ -70,8 +66,7 @@ static archive_close_callback tar_ok;
static void block_init(lzma_block *block);
static void stream_edge(lzma_vli backward_size);
static void write_blocks(size_t *seq, io_block_t **ibs, io_block_t *ib);
static void write_block(io_block_t *ib);
static void write_block(pipeline_item_t *pi);
static void encode_index(void);
static void write_file_index(void);
@ -126,25 +121,7 @@ int main(int argc, char **argv) {
gBlockInSize = lzma_opts.dict_size * 1.0;
gBlockOutSize = lzma_block_buffer_bound(gBlockInSize);
// thread setup
gNumEncodeThreads = num_threads();
gEncodeThreads = malloc(gNumEncodeThreads * sizeof(pthread_t));
gReadQ = queue_new(block_queue_free);
gEncodeQ = queue_new(block_queue_free);
gWriteQ = queue_new(block_queue_free);
for (size_t i = 0; i < (int)(gNumEncodeThreads * 2 + 4); ++i) {
// create blocks, including a margin of error
io_block_t *ib = malloc(sizeof(io_block_t));
ib->input = malloc(gBlockInSize);
ib->output = malloc(gBlockOutSize);
queue_push(gReadQ, MSG_BLOCK, ib);
}
if (pthread_create(&gReadThread, NULL, &read_thread, NULL))
die("Error creating read thread");
for (int i = 0; i < gNumEncodeThreads; ++i) {
if (pthread_create(&gEncodeThreads[i], NULL, &encode_thread, (void*)(uintptr_t)i))
die("Error creating encode thread");
}
pipeline_create(block_create, block_free, read_thread, encode_thread);
debug("writer: start");
// pre-block setup: header, index
@ -153,16 +130,14 @@ int main(int argc, char **argv) {
stream_edge(LZMA_VLI_UNKNOWN);
// write blocks
size_t seq = 0;
io_block_t *ibs = NULL;
while (true) {
io_block_t *ib;
int msg = queue_pop(gWriteQ, (void**)&ib);
if (msg == MSG_STOP)
pipeline_item_t *pi = pipeline_merged();
if (!pi)
break;
debug("writer: received %zu", ib->seq);
write_blocks(&seq, &ibs, ib);
debug("writer: received %zu", pi->seq);
write_block(pi);
queue_push(gPipelineStartQ, PIPELINE_ITEM, pi);
}
// file index
@ -177,14 +152,8 @@ int main(int argc, char **argv) {
lzma_index_end(gIndex, NULL);
fclose(gOutFile);
// thread cleanup
debug("writer: cleaning up reader");
if (pthread_join(gReadThread, NULL))
die("Error joining read thread");
queue_free(gEncodeQ);
queue_free(gWriteQ);
queue_free(gReadQ);
free(gEncodeThreads);
pipeline_destroy();
debug("exit");
return 0;
@ -193,7 +162,7 @@ int main(int argc, char **argv) {
#pragma mark READING
static void *read_thread(void *data) {
static void read_thread() {
debug("reader: start");
struct archive *ar = archive_read_new();
@ -227,34 +196,28 @@ static void *read_thread(void *data) {
add_file(gTotalRead, NULL);
// write last block, if necessary
if (gReadBlock) {
if (gReadItem) {
// if this block had only one read, and it was EOF, it's waste
debug("reader: handling last block %zu", gReadBlock->seq);
queue_push(gReadBlock->insize ? gEncodeQ : gReadQ, MSG_BLOCK, gReadBlock);
gReadBlock = NULL;
debug("reader: handling last block %zu", gReadItemCount);
if (gReadBlock->insize)
pipeline_split(gReadItem);
else
queue_push(gPipelineStartQ, MSG_BLOCK, gReadItem);
gReadItem = NULL;
}
// stop the other threads
debug("reader: cleaning up encoders");
for (int i = 0; i < gNumEncodeThreads; ++i) {
queue_push(gEncodeQ, MSG_STOP, NULL);
}
for (int i = 0; i < gNumEncodeThreads; ++i) {
if (pthread_join(gEncodeThreads[i], NULL))
die("Error joining encode thread");
}
queue_push(gWriteQ, MSG_STOP, NULL);
pipeline_stop();
debug("reader: end");
return NULL;
}
static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) {
if (!gReadBlock) {
queue_pop(gReadQ, (void**)&gReadBlock);
if (!gReadItem) {
queue_pop(gPipelineStartQ, (void**)&gReadItem);
gReadBlock = (io_block_t*)(gReadItem->data);
gReadBlock->insize = 0;
gReadBlock->seq = gBlockNum++;
debug("reader: reading %zu", gReadBlock->seq);
debug("reader: reading %zu", gReadItemCount);
}
size_t space = gBlockInSize - gReadBlock->insize;
@ -269,9 +232,10 @@ static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) {
*bufp = buf;
if (gReadBlock->insize == gBlockInSize) {
debug("reader: sending %zu", gReadBlock->seq);
queue_push(gEncodeQ, MSG_BLOCK, gReadBlock);
gReadBlock = NULL;
debug("reader: sending %zu", gReadItemCount);
pipeline_split(gReadItem);
++gReadItemCount;
gReadItem = NULL;
}
return rd;
@ -311,36 +275,33 @@ static void add_file(off_t offset, const char *name) {
gLastFile = f;
}
static void block_queue_free(int type, void *p) {
switch (type) {
case MSG_BLOCK: {
io_block_t *ib = (io_block_t*)p;
free(ib->input);
free(ib->output);
free(ib);
break;
}
case MSG_STOP:
break;
default:
die("Unknown msg type %d", type);
}
static void block_free(void *data) {
io_block_t *ib = (io_block_t*)data;
free(ib->input);
free(ib->output);
free(ib);
}
static void *block_create() {
io_block_t *ib = malloc(sizeof(io_block_t));
ib->input = malloc(gBlockInSize);
ib->output = malloc(gBlockOutSize);
return ib;
}
#pragma mark ENCODING
static void *encode_thread(void *arg) {
int __attribute__((unused)) thnum = (uintptr_t)arg;
lzma_stream stream = LZMA_STREAM_INIT;
static void encode_thread(size_t thnum) {
lzma_stream stream = LZMA_STREAM_INIT;
while (true) {
io_block_t *ib;
int msg = queue_pop(gEncodeQ, (void**)&ib);
pipeline_item_t *pi;
int msg = queue_pop(gPipelineSplitQ, (void**)&pi);
if (msg == MSG_STOP)
break;
debug("encoder %d: received %zu", thnum, ib->seq);
debug("encoder %zu: received %zu", thnum, pi->seq);
io_block_t *ib = (io_block_t*)(pi->data);
block_init(&ib->block);
if (lzma_block_header_encode(&ib->block, ib->output) != LZMA_OK)
@ -362,12 +323,11 @@ static void *encode_thread(void *arg) {
}
ib->outsize = stream.next_out - ib->output;
debug("encoder %d: sending %zu", thnum, ib->seq);
queue_push(gWriteQ, MSG_BLOCK, ib);
debug("encoder %zu: sending %zu", thnum, pi->seq);
queue_push(gPipelineMergeQ, PIPELINE_ITEM, pi);
}
lzma_end(&stream);
return NULL;
}
@ -399,9 +359,10 @@ static void stream_edge(lzma_vli backward_size) {
die("Error writing stream edge");
}
static void write_block(io_block_t *ib) {
debug("writer: writing %zu", ib->seq);
static void write_block(pipeline_item_t *pi) {
debug("writer: writing %zu", pi->seq);
io_block_t *ib = (io_block_t*)(pi->data);
// Does it make sense to chunk this?
size_t written = 0;
while (ib->outsize > written) {
@ -418,28 +379,7 @@ static void write_block(io_block_t *ib) {
ib->block.uncompressed_size) != LZMA_OK)
die("Error adding to index");
debug("writer: writing %zu complete", ib->seq);
}
static void write_blocks(size_t *seq, io_block_t **ibs, io_block_t *ib) {
// insert it into the queue, in order
io_block_t **prev = ibs, *post = *ibs;
while (post && post->seq < ib->seq) {
prev = &post->next;
post = post->next;
}
ib->next = post;
*prev = ib;
// write the blocks that we can
io_block_t *cur = *ibs;
while (cur && cur->seq == *seq) {
write_block(cur);
queue_push(gReadQ, MSG_BLOCK, cur);
++*seq;
cur = cur->next;
}
*ibs = cur;
debug("writer: writing %zu complete", pi->seq);
}
static void encode_index(void) {

Loading…
Cancel
Save