diff --git a/common.c b/common.c index a0e0b30..2f0f051 100644 --- a/common.c +++ b/common.c @@ -437,10 +437,14 @@ void pipeline_destroy(void) { free(gPLProcessThreads); } -void pipeline_split(pipeline_item_t *item) { +void pipeline_dispatch(pipeline_item_t *item, queue_t *q) { item->seq = gPLSplitSeq++; item->next = NULL; - queue_push(gPipelineSplitQ, PIPELINE_ITEM, item); + queue_push(q, PIPELINE_ITEM, item); +} + +void pipeline_split(pipeline_item_t *item) { + pipeline_dispatch(item, gPipelineSplitQ); } pipeline_item_t *pipeline_merged() { diff --git a/pixz.h b/pixz.h index ee45a2e..7e7b5fc 100644 --- a/pixz.h +++ b/pixz.h @@ -135,5 +135,6 @@ void pipeline_create( void pipeline_stop(void); void pipeline_destroy(void); +void pipeline_dispatch(pipeline_item_t *item, queue_t *q); void pipeline_split(pipeline_item_t *item); pipeline_item_t *pipeline_merged(); diff --git a/read.c b/read.c index dd143df..9d71005 100644 --- a/read.c +++ b/read.c @@ -57,11 +57,14 @@ static pipeline_item_t *gRbufPI = NULL; static io_block_t *gRbuf = NULL; static void block_capacity(io_block_t *ib, size_t incap, size_t outcap); -static void stream_write(pipeline_item_t *pi); -static ssize_t rbuf_read(size_t bytes); +typedef enum { + RBUF_ERR, RBUF_EOF, RBUF_PART, RBUF_FULL +} rbuf_read_status; + +static rbuf_read_status rbuf_read(size_t bytes); static void rbuf_consume(size_t bytes); -static void rbuf_dispatch(); +static void rbuf_dispatch(void); #pragma mark DECLARE UTILS @@ -250,6 +253,40 @@ static void block_capacity(io_block_t *ib, size_t incap, size_t outcap) { } } +// Ensure at least this many bytes available +// Return 1 on success, zero on EOF, -1 on error +static rbuf_read_status rbuf_read(size_t bytes) { + if (!gRbufPI) { + queue_pop(gPipelineStartQ, (void**)&gRbufPI); + gRbuf = (io_block_t*)(gRbufPI->data); + gRbuf->insize = gRbuf->outsize = 0; + } + + if (gRbuf->insize >= bytes) + return RBUF_FULL; + + block_capacity(gRbuf, bytes, 0); + size_t r = fread(gRbuf->input + gRbuf->insize, 1, bytes - gRbuf->insize, + gInFile); + gRbuf->insize += r; + + if (r) + return (gRbuf->insize == bytes) ? RBUF_FULL : RBUF_PART; + return feof(gInFile) ? RBUF_EOF : RBUF_ERR; +} + +static void rbuf_consume(size_t bytes) { + if (bytes < gRbuf->insize) + memmove(gRbuf->input, gRbuf->input + bytes, gRbuf->insize - bytes); + gRbuf->insize -= bytes; +} + +static void rbuf_dispatch(void) { + pipeline_split(gRbufPI); + gRbufPI = NULL; + gRbuf = NULL; +} + static void read_thread_noindex(void) { size_t bytes; lzma_ret err; @@ -270,40 +307,30 @@ static void read_thread_noindex(void) { lzma_filter filters[LZMA_FILTERS_MAX + 1]; lzma_block block = { .filters = filters, .check = gCheck, .version = 0 }; while (true) { - // Get pipeline item - pipeline_item_t *pi; - queue_pop(gPipelineStartQ, (void**)&pi); - io_block_t *ib = (io_block_t*)(pi->data); - block_capacity(ib, LZMA_BLOCK_HEADER_SIZE_MAX, 0); - // Check for index - if (ib->insize < 1 && fread(ib->input, 1, 1, gInFile) != 1) + if (rbuf_read(1) != RBUF_FULL) die("Error reading block header size"); - if (ib->input[0] == 0) - break; // Found the index - + if (gRbuf->input[0] == 0) + break; // Found the index. FIXME: multi-stream? + // Decode header - block.header_size = lzma_block_header_size_decode(ib->input[0]); + block.header_size = lzma_block_header_size_decode(gRbuf->input[0]); if (block.header_size > LZMA_BLOCK_HEADER_SIZE_MAX) die("Block header size too large"); - size_t rest = block.header_size - 1; - if (fread(ib->input + 1, 1, rest, gInFile) != rest) + if (rbuf_read(block.header_size) != RBUF_FULL) die("Error reading block header"); - if (lzma_block_header_decode(&block, NULL, ib->input) != LZMA_OK) + if (lzma_block_header_decode(&block, NULL, gRbuf->input) != LZMA_OK) die("Error decoding block header"); - lzma_vli comp = block.compressed_size; - ib->insize = lzma_block_total_size(&block); - ib->outsize = block.uncompressed_size; - if (comp == LZMA_VLI_UNKNOWN || ib->outsize == LZMA_VLI_UNKNOWN) + size_t comp = block.compressed_size, outsize = block.uncompressed_size; + if (comp == LZMA_VLI_UNKNOWN || outsize == LZMA_VLI_UNKNOWN) die("No sizes in header!!!"); // FIXME: streaming; file index - block_capacity(ib, ib->insize, ib->outsize); + block_capacity(gRbuf, 0, outsize); + gRbuf->outsize = outsize; - rest = ib->insize - block.header_size; - bytes = fread(ib->input + block.header_size, 1, rest, gInFile); - if (bytes != rest) + if (rbuf_read(lzma_block_total_size(&block)) != RBUF_FULL) die("Error reading block contents"); - pipeline_split(pi); + rbuf_dispatch(); } pipeline_stop();