Use the read buffer

stream
Dave Vasilevsky 12 years ago
parent 9694d22dcd
commit 70a3c58520

@ -437,10 +437,14 @@ void pipeline_destroy(void) {
free(gPLProcessThreads); free(gPLProcessThreads);
} }
void pipeline_split(pipeline_item_t *item) { void pipeline_dispatch(pipeline_item_t *item, queue_t *q) {
item->seq = gPLSplitSeq++; item->seq = gPLSplitSeq++;
item->next = NULL; item->next = NULL;
queue_push(gPipelineSplitQ, PIPELINE_ITEM, item); queue_push(q, PIPELINE_ITEM, item);
}
void pipeline_split(pipeline_item_t *item) {
pipeline_dispatch(item, gPipelineSplitQ);
} }
pipeline_item_t *pipeline_merged() { pipeline_item_t *pipeline_merged() {

@ -135,5 +135,6 @@ void pipeline_create(
void pipeline_stop(void); void pipeline_stop(void);
void pipeline_destroy(void); void pipeline_destroy(void);
void pipeline_dispatch(pipeline_item_t *item, queue_t *q);
void pipeline_split(pipeline_item_t *item); void pipeline_split(pipeline_item_t *item);
pipeline_item_t *pipeline_merged(); pipeline_item_t *pipeline_merged();

@ -57,11 +57,14 @@ static pipeline_item_t *gRbufPI = NULL;
static io_block_t *gRbuf = NULL; static io_block_t *gRbuf = NULL;
static void block_capacity(io_block_t *ib, size_t incap, size_t outcap); static void block_capacity(io_block_t *ib, size_t incap, size_t outcap);
static void stream_write(pipeline_item_t *pi);
static ssize_t rbuf_read(size_t bytes); typedef enum {
RBUF_ERR, RBUF_EOF, RBUF_PART, RBUF_FULL
} rbuf_read_status;
static rbuf_read_status rbuf_read(size_t bytes);
static void rbuf_consume(size_t bytes); static void rbuf_consume(size_t bytes);
static void rbuf_dispatch(); static void rbuf_dispatch(void);
#pragma mark DECLARE UTILS #pragma mark DECLARE UTILS
@ -250,6 +253,40 @@ static void block_capacity(io_block_t *ib, size_t incap, size_t outcap) {
} }
} }
// Ensure at least this many bytes available
// Return 1 on success, zero on EOF, -1 on error
static rbuf_read_status rbuf_read(size_t bytes) {
if (!gRbufPI) {
queue_pop(gPipelineStartQ, (void**)&gRbufPI);
gRbuf = (io_block_t*)(gRbufPI->data);
gRbuf->insize = gRbuf->outsize = 0;
}
if (gRbuf->insize >= bytes)
return RBUF_FULL;
block_capacity(gRbuf, bytes, 0);
size_t r = fread(gRbuf->input + gRbuf->insize, 1, bytes - gRbuf->insize,
gInFile);
gRbuf->insize += r;
if (r)
return (gRbuf->insize == bytes) ? RBUF_FULL : RBUF_PART;
return feof(gInFile) ? RBUF_EOF : RBUF_ERR;
}
static void rbuf_consume(size_t bytes) {
if (bytes < gRbuf->insize)
memmove(gRbuf->input, gRbuf->input + bytes, gRbuf->insize - bytes);
gRbuf->insize -= bytes;
}
static void rbuf_dispatch(void) {
pipeline_split(gRbufPI);
gRbufPI = NULL;
gRbuf = NULL;
}
static void read_thread_noindex(void) { static void read_thread_noindex(void) {
size_t bytes; size_t bytes;
lzma_ret err; lzma_ret err;
@ -270,40 +307,30 @@ static void read_thread_noindex(void) {
lzma_filter filters[LZMA_FILTERS_MAX + 1]; lzma_filter filters[LZMA_FILTERS_MAX + 1];
lzma_block block = { .filters = filters, .check = gCheck, .version = 0 }; lzma_block block = { .filters = filters, .check = gCheck, .version = 0 };
while (true) { while (true) {
// Get pipeline item
pipeline_item_t *pi;
queue_pop(gPipelineStartQ, (void**)&pi);
io_block_t *ib = (io_block_t*)(pi->data);
block_capacity(ib, LZMA_BLOCK_HEADER_SIZE_MAX, 0);
// Check for index // Check for index
if (ib->insize < 1 && fread(ib->input, 1, 1, gInFile) != 1) if (rbuf_read(1) != RBUF_FULL)
die("Error reading block header size"); die("Error reading block header size");
if (ib->input[0] == 0) if (gRbuf->input[0] == 0)
break; // Found the index break; // Found the index. FIXME: multi-stream?
// Decode header // Decode header
block.header_size = lzma_block_header_size_decode(ib->input[0]); block.header_size = lzma_block_header_size_decode(gRbuf->input[0]);
if (block.header_size > LZMA_BLOCK_HEADER_SIZE_MAX) if (block.header_size > LZMA_BLOCK_HEADER_SIZE_MAX)
die("Block header size too large"); die("Block header size too large");
size_t rest = block.header_size - 1; if (rbuf_read(block.header_size) != RBUF_FULL)
if (fread(ib->input + 1, 1, rest, gInFile) != rest)
die("Error reading block header"); die("Error reading block header");
if (lzma_block_header_decode(&block, NULL, ib->input) != LZMA_OK) if (lzma_block_header_decode(&block, NULL, gRbuf->input) != LZMA_OK)
die("Error decoding block header"); die("Error decoding block header");
lzma_vli comp = block.compressed_size; size_t comp = block.compressed_size, outsize = block.uncompressed_size;
ib->insize = lzma_block_total_size(&block); if (comp == LZMA_VLI_UNKNOWN || outsize == LZMA_VLI_UNKNOWN)
ib->outsize = block.uncompressed_size;
if (comp == LZMA_VLI_UNKNOWN || ib->outsize == LZMA_VLI_UNKNOWN)
die("No sizes in header!!!"); // FIXME: streaming; file index die("No sizes in header!!!"); // FIXME: streaming; file index
block_capacity(ib, ib->insize, ib->outsize); block_capacity(gRbuf, 0, outsize);
gRbuf->outsize = outsize;
rest = ib->insize - block.header_size; if (rbuf_read(lzma_block_total_size(&block)) != RBUF_FULL)
bytes = fread(ib->input + block.header_size, 1, rest, gInFile);
if (bytes != rest)
die("Error reading block contents"); die("Error reading block contents");
pipeline_split(pi); rbuf_dispatch();
} }
pipeline_stop(); pipeline_stop();

Loading…
Cancel
Save