diff --git a/read.c b/read.c index 9d71005..b17a5fc 100644 --- a/read.c +++ b/read.c @@ -53,6 +53,8 @@ static void tar_write_last(void); #pragma mark DECLARE READ BUFFER +#define STREAMSIZE (1024 * 1024) + static pipeline_item_t *gRbufPI = NULL; static io_block_t *gRbuf = NULL; @@ -66,6 +68,8 @@ static rbuf_read_status rbuf_read(size_t bytes); static void rbuf_consume(size_t bytes); static void rbuf_dispatch(void); +static void read_streaming(lzma_block *block); + #pragma mark DECLARE UTILS @@ -287,11 +291,57 @@ static void rbuf_dispatch(void) { gRbuf = NULL; } +static void read_streaming(lzma_block *block) { + lzma_stream stream = LZMA_STREAM_INIT; + if (lzma_block_decoder(&stream, block) != LZMA_OK) + die("Error initializing streaming block decode"); + stream.next_in = gRbuf->input + block->header_size; + stream.avail_in = gRbuf->insize - block->header_size; + stream.avail_out = 0; + + pipeline_item_t *pi = NULL; + io_block_t *ib = NULL; + + lzma_ret err = LZMA_OK; + while (err != LZMA_STREAM_END) { + if (err != LZMA_OK) + die("Error decoding streaming block"); + + if (stream.avail_out == 0) { + if (ib) { + ib->outsize = ib->outcap; + pipeline_dispatch(pi, gPipelineMergeQ); + } + queue_pop(gPipelineStartQ, (void**)&pi); + ib = (io_block_t*)pi->data; + block_capacity(ib, 0, STREAMSIZE); + stream.next_out = ib->output; + stream.avail_out = ib->outcap; + } + if (stream.avail_in == 0) { + rbuf_consume(gRbuf->insize); + if (rbuf_read(CHUNKSIZE) < RBUF_PART) + die("Error reading streaming block contents"); + stream.next_in = gRbuf->input; + stream.avail_in = gRbuf->insize; + } + + err = lzma_code(&stream, LZMA_RUN); + } + + if (ib && stream.avail_out != ib->outcap) { + ib->outsize = ib->outcap - stream.avail_out; + pipeline_dispatch(pi, gPipelineMergeQ); + } + rbuf_consume(gRbuf->insize - stream.avail_in); + lzma_end(&stream); +} + static void read_thread_noindex(void) { size_t bytes; lzma_ret err; - // Read the header + // Stream header uint8_t stream_header[LZMA_STREAM_HEADER_SIZE]; bytes = fread(stream_header, 1, LZMA_STREAM_HEADER_SIZE, gInFile); if (bytes != LZMA_STREAM_HEADER_SIZE) @@ -307,13 +357,11 @@ static void read_thread_noindex(void) { lzma_filter filters[LZMA_FILTERS_MAX + 1]; lzma_block block = { .filters = filters, .check = gCheck, .version = 0 }; while (true) { - // Check for index if (rbuf_read(1) != RBUF_FULL) die("Error reading block header size"); if (gRbuf->input[0] == 0) break; // Found the index. FIXME: multi-stream? - // Decode header block.header_size = lzma_block_header_size_decode(gRbuf->input[0]); if (block.header_size > LZMA_BLOCK_HEADER_SIZE_MAX) die("Block header size too large"); @@ -323,14 +371,16 @@ static void read_thread_noindex(void) { die("Error decoding block header"); size_t comp = block.compressed_size, outsize = block.uncompressed_size; - if (comp == LZMA_VLI_UNKNOWN || outsize == LZMA_VLI_UNKNOWN) - die("No sizes in header!!!"); // FIXME: streaming; file index - block_capacity(gRbuf, 0, outsize); - gRbuf->outsize = outsize; + if (comp == LZMA_VLI_UNKNOWN || outsize == LZMA_VLI_UNKNOWN) { + read_streaming(&block); + } else { + block_capacity(gRbuf, 0, outsize); + gRbuf->outsize = outsize; - if (rbuf_read(lzma_block_total_size(&block)) != RBUF_FULL) - die("Error reading block contents"); - rbuf_dispatch(); + if (rbuf_read(lzma_block_total_size(&block)) != RBUF_FULL) + die("Error reading block contents"); + rbuf_dispatch(); + } } pipeline_stop();