Merge branch 'stream'

Support streaming input, still decompressing in parallel. Extract mode is not
supported while streaming. Heuristics are used to skip the file index when in
streaming mode.

This also ensures pixz properly reads files with bizarre block sizes, or
with multiple streams.
pull/16/head
Dave Vasilevsky 12 years ago
commit 154dc842a1

@ -2,22 +2,23 @@ ifneq ($(shell gcc -v 2>&1 | grep 'Apple Inc'),)
APPLE=1
endif
ifdef APPLE
ifeq ($(CC),gcc)
LDFLAGS += -search_paths_first
endif
endif
OPT = -g -O0
CFLAGS = $(patsubst %,-I%/include,$(LIBPREFIX)) $(OPT) -std=gnu99 \
MYCFLAGS = $(patsubst %,-I%/include,$(LIBPREFIX)) $(OPT) -std=gnu99 \
-Wall -Wno-unknown-pragmas
LDFLAGS = $(patsubst %,-L%/lib,$(LIBPREFIX)) $(OPT) -Wall
MYLDFLAGS = $(patsubst %,-L%/lib,$(LIBPREFIX)) $(OPT) -Wall
THREADS = -lpthread
LIBADD = $(THREADS) -llzma -larchive
CC = gcc
COMPILE = $(CC) $(CFLAGS) -c -o
LD = $(CC) $(LDFLAGS) -o
COMPILE = $(CC) $(MYCFLAGS) $(CFLAGS) -c -o
LD = $(CC) $(MYLDFLAGS) $(LDFLAGS) -o
ifdef APPLE
ifeq ($(CC),gcc)
MYLDFLAGS += -search_paths_first
endif
endif
PROGS = pixz
COMMON = common.o endian.o cpu.o read.o write.o list.o

@ -15,7 +15,13 @@ BUGS
* performance lags under IO?
* slow input -> CPUs idle while waiting for input
* safe extraction
* abort if block size exceeded
* sanity checks, from spec:
- CRCs are already tested, i think?
- backward size should match file
- reserved flags must be zero
- header vs footer flags
- uncompressed size field vs actual uncompressed size
- index vs actual blocks
EFFICIENCY
* more efficient indexing: ranges? sorted? mtree?

@ -5,16 +5,9 @@
#pragma mark UTILS
typedef struct {
lzma_block block;
lzma_filter filters[LZMA_FILTERS_MAX + 1];
} block_wrapper_t;
FILE *gInFile = NULL;
lzma_stream gStream = LZMA_STREAM_INIT;
lzma_check gCheck = LZMA_CHECK_NONE;
void die(const char *fmt, ...) {
va_list args;
@ -36,32 +29,6 @@ char *xstrdup(const char *s) {
return memcpy(r, s, len + 1);
}
void *decode_block_start(off_t block_seek) {
if (fseeko(gInFile, block_seek, SEEK_SET) == -1)
die("Error seeking to block");
// Some memory in which to keep the discovered filters safe
block_wrapper_t *bw = malloc(sizeof(block_wrapper_t));
bw->block = (lzma_block){ .check = gCheck, .filters = bw->filters,
.version = 0 };
int b = fgetc(gInFile);
if (b == EOF || b == 0)
die("Error reading block size");
bw->block.header_size = lzma_block_header_size_decode(b);
uint8_t hdrbuf[bw->block.header_size];
hdrbuf[0] = (uint8_t)b;
if (fread(hdrbuf + 1, bw->block.header_size - 1, 1, gInFile) != 1)
die("Error reading block header");
if (lzma_block_header_decode(&bw->block, NULL, hdrbuf) != LZMA_OK)
die("Error decoding file index block header");
if (lzma_block_decoder(&gStream, &bw->block) != LZMA_OK)
die("Error initializing file index stream");
return bw;
}
bool is_multi_header(const char *name) {
size_t i = strlen(name);
while (i != 0 && name[i - 1] != '/')
@ -82,6 +49,9 @@ static lzma_ret gFIBErr = LZMA_OK;
static uint8_t gFIBInputBuf[CHUNKSIZE];
static size_t gMoved = 0;
static void *decode_file_index_start(off_t block_seek, lzma_check check);
static lzma_vli find_file_index(void **bdatap);
static char *read_file_index_name(void);
static void read_file_index_make_space(void);
static void read_file_index_data(void);
@ -109,7 +79,38 @@ void free_file_index(void) {
gFileIndex = gLastFile = NULL;
}
lzma_vli find_file_index(void **bdatap) {
typedef struct {
lzma_block block;
lzma_filter filters[LZMA_FILTERS_MAX + 1];
} block_wrapper_t;
static void *decode_file_index_start(off_t block_seek, lzma_check check) {
if (fseeko(gInFile, block_seek, SEEK_SET) == -1)
die("Error seeking to block");
// Some memory in which to keep the discovered filters safe
block_wrapper_t *bw = malloc(sizeof(block_wrapper_t));
bw->block = (lzma_block){ .check = check, .filters = bw->filters,
.version = 0 };
int b = fgetc(gInFile);
if (b == EOF || b == 0)
die("Error reading block size");
bw->block.header_size = lzma_block_header_size_decode(b);
uint8_t hdrbuf[bw->block.header_size];
hdrbuf[0] = (uint8_t)b;
if (fread(hdrbuf + 1, bw->block.header_size - 1, 1, gInFile) != 1)
die("Error reading block header");
if (lzma_block_header_decode(&bw->block, NULL, hdrbuf) != LZMA_OK)
die("Error decoding file index block header");
if (lzma_block_decoder(&gStream, &bw->block) != LZMA_OK)
die("Error initializing file index stream");
return bw;
}
static lzma_vli find_file_index(void **bdatap) {
if (!gIndex)
decode_index();
@ -119,7 +120,11 @@ lzma_vli find_file_index(void **bdatap) {
lzma_vli loc = lzma_index_uncompressed_size(gIndex) - 1;
if (lzma_index_iter_locate(&iter, loc))
die("Can't locate file index block");
void *bdata = decode_block_start(iter.block.compressed_file_offset);
if (iter.stream.number != 1)
return 0; // Too many streams for one file index
void *bdata = decode_file_index_start(iter.block.compressed_file_offset,
iter.stream.flags->check);
gFileIndexBuf = malloc(gFIBSize);
gStream.avail_out = gFIBSize;
@ -146,10 +151,9 @@ lzma_vli find_file_index(void **bdatap) {
return ret;
}
lzma_vli read_file_index(lzma_vli offset) {
lzma_vli read_file_index() {
void *bdata = NULL;
if (!offset)
offset = find_file_index(&bdata);
lzma_vli offset = find_file_index(&bdata);
if (!offset)
return 0;
@ -230,38 +234,118 @@ static void read_file_index_data(void) {
}
}
void decode_index(void) {
if (fseek(gInFile, -LZMA_STREAM_HEADER_SIZE, SEEK_END) == -1)
die("Error seeking to stream footer");
uint8_t hdrbuf[LZMA_STREAM_HEADER_SIZE];
if (fread(hdrbuf, LZMA_STREAM_HEADER_SIZE, 1, gInFile) != 1)
die("Error reading stream footer");
lzma_stream_flags flags;
if (lzma_stream_footer_decode(&flags, hdrbuf) != LZMA_OK)
#define BWCHUNK 512
typedef struct {
uint8_t buf[BWCHUNK];
off_t pos;
size_t size;
} bw;
static uint32_t *bw_read(bw *b) {
size_t sz = sizeof(uint32_t);
if (b->size < sz) {
if (b->pos < sz)
return NULL; // EOF
b->size = (b->pos > BWCHUNK) ? BWCHUNK : b->pos;
b->pos -= b->size;
if (fseeko(gInFile, b->pos, SEEK_SET) == -1)
return NULL;
if (fread(b->buf, b->size, 1, gInFile) != 1)
return NULL;
}
b->size -= sz;
return &((uint32_t*)b->buf)[b->size / sz];
}
static off_t stream_padding(bw *b, off_t pos) {
b->pos = pos;
b->size = 0;
for (off_t pad = 0; true; pad += sizeof(uint32_t)) {
uint32_t *i = bw_read(b);
if (!i)
die("Error reading stream padding");
if (*i != 0) {
b->size += sizeof(uint32_t);
return pad;
}
}
}
static void stream_footer(bw *b, lzma_stream_flags *flags) {
uint8_t ftr[LZMA_STREAM_HEADER_SIZE];
for (int i = sizeof(ftr) / sizeof(uint32_t) - 1; i >= 0; --i) {
uint32_t *p = bw_read(b);
if (!p)
die("Error reading stream footer");
*((uint32_t*)ftr + i) = *p;
}
if (lzma_stream_footer_decode(flags, ftr) != LZMA_OK)
die("Error decoding stream footer");
gCheck = flags.check;
size_t index_seek = -LZMA_STREAM_HEADER_SIZE - flags.backward_size;
if (fseek(gInFile, index_seek, SEEK_CUR) == -1)
}
static lzma_index *next_index(off_t *pos) {
bw b;
off_t pad = stream_padding(&b, *pos);
off_t eos = *pos - pad;
lzma_stream_flags flags;
stream_footer(&b, &flags);
*pos = eos - LZMA_STREAM_HEADER_SIZE - flags.backward_size;
if (fseeko(gInFile, *pos, SEEK_SET) == -1)
die("Error seeking to index");
if (lzma_index_decoder(&gStream, &gIndex, MEMLIMIT) != LZMA_OK)
lzma_stream strm = LZMA_STREAM_INIT;
lzma_index *index;
if (lzma_index_decoder(&strm, &index, MEMLIMIT) != LZMA_OK)
die("Error creating index decoder");
uint8_t ibuf[CHUNKSIZE];
gStream.avail_in = 0;
strm.avail_in = 0;
lzma_ret err = LZMA_OK;
while (err != LZMA_STREAM_END) {
if (gStream.avail_in == 0) {
gStream.avail_in = fread(ibuf, 1, CHUNKSIZE, gInFile);
if (strm.avail_in == 0) {
strm.avail_in = fread(ibuf, 1, CHUNKSIZE, gInFile);
if (ferror(gInFile))
die("Error reading index");
gStream.next_in = ibuf;
strm.next_in = ibuf;
}
err = lzma_code(&gStream, LZMA_RUN);
err = lzma_code(&strm, LZMA_RUN);
if (err != LZMA_OK && err != LZMA_STREAM_END)
die("Error decoding index");
}
*pos = eos - lzma_index_stream_size(index);
if (fseeko(gInFile, *pos, SEEK_SET) == -1)
die("Error seeking to beginning of stream");
if (lzma_index_stream_flags(index, &flags) != LZMA_OK)
die("Error setting stream flags");
if (lzma_index_stream_padding(index, pad) != LZMA_OK)
die("Error setting stream padding");
return index;
}
bool decode_index(void) {
if (fseeko(gInFile, 0, SEEK_END) == -1)
return false; // not seekable
off_t pos = ftello(gInFile);
gIndex = NULL;
while (pos > 0) {
lzma_index *index = next_index(&pos);
if (gIndex && lzma_index_cat(index, gIndex, NULL) != LZMA_OK)
die("Error concatenating indices");
gIndex = index;
}
return (gIndex != NULL);
}
@ -436,10 +520,14 @@ void pipeline_destroy(void) {
free(gPLProcessThreads);
}
void pipeline_split(pipeline_item_t *item) {
void pipeline_dispatch(pipeline_item_t *item, queue_t *q) {
item->seq = gPLSplitSeq++;
item->next = NULL;
queue_push(gPipelineSplitQ, PIPELINE_ITEM, item);
queue_push(q, PIPELINE_ITEM, item);
}
void pipeline_split(pipeline_item_t *item) {
pipeline_dispatch(item, gPipelineSplitQ);
}
pipeline_item_t *pipeline_merged() {

@ -3,11 +3,13 @@
#pragma mark FUNCTION DEFINITIONS
void pixz_list(bool tar) {
decode_index();
if (!decode_index())
die("Can't list non-seekable input");
lzma_index_iter iter;
lzma_index_iter_init(&iter, gIndex);
if (tar && read_file_index(0)) {
if (tar && read_file_index()) {
dump_file_index(stdout, false);
free_file_index();
} else {

@ -110,11 +110,13 @@ int main(int argc, char **argv) {
die("Can't open input file");
if (opath && !(gOutFile = fopen(opath, "w")))
die("Can't open output file");
if (op != OP_LIST && isatty(fileno(gOutFile)) == 1)
usage("Refusing to output to a TTY");
switch (op) {
case OP_WRITE: pixz_write(tar, level); break;
case OP_WRITE:
if (isatty(fileno(gOutFile)) == -1)
usage("Refusing to output to a TTY");
pixz_write(tar, level);
break;
case OP_READ: pixz_read(tar, 0, NULL); break;
case OP_EXTRACT: pixz_read(tar, argc, argv); break;
case OP_LIST: pixz_list(tar);

@ -20,7 +20,9 @@
#define CHUNKSIZE 4096
#define DEBUG 0
#ifndef DEBUG
#define DEBUG 0
#endif
#if DEBUG
#define debug(str, ...) fprintf(stderr, str "\n", ##__VA_ARGS__)
#else
@ -50,8 +52,6 @@ uint64_t xle64dec(const uint8_t *d);
void xle64enc(uint8_t *d, uint64_t n);
size_t num_threads(void);
void *decode_block_start(off_t block_seek);
#pragma mark INDEX
@ -64,14 +64,10 @@ struct file_index_t {
extern file_index_t *gFileIndex, *gLastFile;
// As discovered from footer
extern lzma_check gCheck;
bool is_multi_header(const char *name);
void decode_index(void);
bool decode_index(void); // true on success
lzma_vli find_file_index(void **bdatap);
lzma_vli read_file_index(lzma_vli offset);
lzma_vli read_file_index(void);
void dump_file_index(FILE *out, bool verbose);
void free_file_index(void);
@ -135,5 +131,6 @@ void pipeline_create(
void pipeline_stop(void);
void pipeline_destroy(void);
void pipeline_dispatch(pipeline_item_t *item, queue_t *q);
void pipeline_split(pipeline_item_t *item);
pipeline_item_t *pipeline_merged();

377
read.c

@ -23,15 +23,22 @@ static void wanted_free(wanted_t *w);
#pragma mark DECLARE PIPELINE
typedef enum { BLOCK_SIZED, BLOCK_UNSIZED, BLOCK_CONTINUATION } block_type;
typedef struct {
uint8_t *input, *output;
size_t incap, outcap;
size_t insize, outsize;
off_t uoffset; // uncompressed offset
lzma_check check;
block_type btype;
} io_block_t;
static void *block_create(void);
static void block_free(void *data);
static void read_thread(void);
static void read_thread_noindex(void);
static void decode_thread(size_t thnum);
@ -42,6 +49,7 @@ static off_t gArLastOffset;
static size_t gArLastSize;
static wanted_t *gArWanted = NULL;
static bool gArNextItem = false;
static bool gExplicitFiles = false;
static int tar_ok(struct archive *ar, void *ref);
static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp);
@ -49,29 +57,57 @@ static bool tar_next_block(void);
static void tar_write_last(void);
#pragma mark DECLARE READ BUFFER
#define STREAMSIZE (1024 * 1024)
#define MAXSPLITSIZE (64 * 1024 * 1024) // xz -9 blocksize
static pipeline_item_t *gRbufPI = NULL;
static io_block_t *gRbuf = NULL;
static void block_capacity(io_block_t *ib, size_t incap, size_t outcap);
typedef enum {
RBUF_ERR, RBUF_EOF, RBUF_PART, RBUF_FULL
} rbuf_read_status;
static rbuf_read_status rbuf_read(size_t bytes);
static bool rbuf_cycle(lzma_stream *stream, bool start, size_t skip);
static void rbuf_consume(size_t bytes);
static void rbuf_dispatch(void);
static bool read_header(lzma_check *check);
static bool read_block(bool force_stream, lzma_check check);
static void read_streaming(lzma_block *block);
static void read_index(void);
static void read_footer(void);
#pragma mark DECLARE UTILS
static lzma_vli gFileIndexOffset = 0;
static size_t gBlockInSize = 0, gBlockOutSize = 0;
static void set_block_sizes(void);
static bool taste_tar(io_block_t *ib);
static bool taste_file_index(io_block_t *ib);
#pragma mark MAIN
void pixz_read(bool verify, size_t nspecs, char **specs) {
decode_index();
if (verify)
gFileIndexOffset = read_file_index(0);
wanted_files(nspecs, specs);
set_block_sizes();
if (decode_index()) {
if (verify)
gFileIndexOffset = read_file_index();
wanted_files(nspecs, specs);
gExplicitFiles = nspecs;
}
#if DEBUG
for (wanted_t *w = gWantedFiles; w; w = w->next)
debug("want: %s", w->name);
#endif
pipeline_create(block_create, block_free, read_thread, decode_thread);
pipeline_create(block_create, block_free,
gIndex ? read_thread : read_thread_noindex, decode_thread);
if (verify && gFileIndexOffset) {
gArWanted = gWantedFiles;
wanted_t *w = gWantedFiles, *wlast = NULL;
@ -114,14 +150,39 @@ void pixz_read(bool verify, size_t nspecs, char **specs) {
wlast = w;
w = w->next;
}
archive_read_finish(ar);
if (w && w->name)
die("File %s missing in archive", w->name);
tar_write_last(); // write whatever's left
} else {
pipeline_item_t *pi;
}
if (!gExplicitFiles) {
/* Heuristics for detecting pixz file index:
* - Input must be streaming (otherwise read_thread does this)
* - Data must look tar-like
* - Must have all sized blocks, followed by unsized file index */
bool start = !gIndex && verify,
tar = false, all_sized = true, skipping = false;
pipeline_item_t *pi;
while ((pi = pipeline_merged())) {
io_block_t *ib = (io_block_t*)(pi->data);
fwrite(ib->output, ib->outsize, 1, gOutFile);
if (skipping && ib->btype != BLOCK_CONTINUATION) {
fprintf(stderr,
"Warning: File index heuristic failed, use -t flag.\n");
skipping = false;
}
if (!skipping && tar && !start && all_sized
&& ib->btype == BLOCK_UNSIZED && taste_file_index(ib))
skipping = true;
if (start) {
tar = taste_tar(ib);
start = false;
}
if (ib->btype != BLOCK_SIZED)
all_sized = false;
if (!skipping)
fwrite(ib->output, ib->outsize, 1, gOutFile);
queue_push(gPipelineStartQ, PIPELINE_ITEM, pi);
}
}
@ -135,8 +196,8 @@ void pixz_read(bool verify, size_t nspecs, char **specs) {
static void *block_create(void) {
io_block_t *ib = malloc(sizeof(io_block_t));
ib->input = malloc(gBlockInSize);
ib->output = malloc(gBlockOutSize);
ib->incap = ib->outcap = 0;
ib->input = ib->output = NULL;
return ib;
}
@ -150,25 +211,6 @@ static void block_free(void* data) {
#pragma mark SETUP
static void set_block_sizes() {
lzma_index_iter iter;
lzma_index_iter_init(&iter, gIndex);
while (!lzma_index_iter_next(&iter, LZMA_INDEX_ITER_BLOCK)) {
// exclude the file index block
lzma_vli off = iter.block.compressed_file_offset;
if (gFileIndexOffset && off == gFileIndexOffset)
continue;
size_t in = iter.block.total_size,
out = iter.block.uncompressed_size;
if (out > gBlockOutSize)
gBlockOutSize = out;
if (in > gBlockInSize)
gBlockInSize = in;
}
}
static void wanted_free(wanted_t *w) {
for (wanted_t *w = gWantedFiles; w; ) {
wanted_t *tmp = w->next;
@ -177,6 +219,7 @@ static void wanted_free(wanted_t *w) {
}
}
static bool spec_match(char *spec, char *name) {
bool match = true;
for (; *spec; ++spec, ++name) {
@ -242,7 +285,214 @@ static void wanted_files(size_t count, char **specs) {
}
#pragma mark THREADS
#pragma mark READ
static void block_capacity(io_block_t *ib, size_t incap, size_t outcap) {
if (incap > ib->incap) {
ib->incap = incap;
ib->input = realloc(ib->input, incap);
}
if (outcap > ib->outcap) {
ib->outcap = outcap;
ib->output = malloc(outcap);
}
}
// Ensure at least this many bytes available
// Return 1 on success, zero on EOF, -1 on error
static rbuf_read_status rbuf_read(size_t bytes) {
if (!gRbufPI) {
queue_pop(gPipelineStartQ, (void**)&gRbufPI);
gRbuf = (io_block_t*)(gRbufPI->data);
gRbuf->insize = gRbuf->outsize = 0;
}
if (gRbuf->insize >= bytes)
return RBUF_FULL;
block_capacity(gRbuf, bytes, 0);
size_t r = fread(gRbuf->input + gRbuf->insize, 1, bytes - gRbuf->insize,
gInFile);
gRbuf->insize += r;
if (r)
return (gRbuf->insize == bytes) ? RBUF_FULL : RBUF_PART;
return feof(gInFile) ? RBUF_EOF : RBUF_ERR;
}
static bool rbuf_cycle(lzma_stream *stream, bool start, size_t skip) {
if (!start) {
rbuf_consume(gRbuf->insize);
if (rbuf_read(CHUNKSIZE) < RBUF_PART)
return false;
}
stream->next_in = gRbuf->input + skip;
stream->avail_in = gRbuf->insize - skip;
return true;
}
static void rbuf_consume(size_t bytes) {
if (bytes < gRbuf->insize)
memmove(gRbuf->input, gRbuf->input + bytes, gRbuf->insize - bytes);
gRbuf->insize -= bytes;
}
static void rbuf_dispatch(void) {
pipeline_split(gRbufPI);
gRbufPI = NULL;
gRbuf = NULL;
}
static bool read_header(lzma_check *check) {
lzma_stream_flags stream_flags;
rbuf_read_status st = rbuf_read(LZMA_STREAM_HEADER_SIZE);
if (st == RBUF_EOF)
return false;
else if (st != RBUF_FULL)
die("Error reading stream header");
lzma_ret err = lzma_stream_header_decode(&stream_flags, gRbuf->input);
if (err == LZMA_FORMAT_ERROR)
die("Not an XZ file");
else if (err != LZMA_OK)
die("Error decoding XZ header");
*check = stream_flags.check;
rbuf_consume(LZMA_STREAM_HEADER_SIZE);
return true;
}
static bool read_block(bool force_stream, lzma_check check) {
lzma_filter filters[LZMA_FILTERS_MAX + 1];
lzma_block block = { .filters = filters, .check = check, .version = 0 };
if (rbuf_read(1) != RBUF_FULL)
die("Error reading block header size");
if (gRbuf->input[0] == 0)
return false;
block.header_size = lzma_block_header_size_decode(gRbuf->input[0]);
if (block.header_size > LZMA_BLOCK_HEADER_SIZE_MAX)
die("Block header size too large");
if (rbuf_read(block.header_size) != RBUF_FULL)
die("Error reading block header");
if (lzma_block_header_decode(&block, NULL, gRbuf->input) != LZMA_OK)
die("Error decoding block header");
size_t comp = block.compressed_size, outsize = block.uncompressed_size;
if (force_stream || comp == LZMA_VLI_UNKNOWN
|| outsize == LZMA_VLI_UNKNOWN
|| outsize > MAXSPLITSIZE) {
read_streaming(&block);
} else {
block_capacity(gRbuf, 0, outsize);
gRbuf->outsize = outsize;
gRbuf->check = check;
gRbuf->btype = BLOCK_SIZED;
if (rbuf_read(lzma_block_total_size(&block)) != RBUF_FULL)
die("Error reading block contents");
rbuf_dispatch();
}
return true;
}
static void read_streaming(lzma_block *block) {
lzma_stream stream = LZMA_STREAM_INIT;
if (lzma_block_decoder(&stream, block) != LZMA_OK)
die("Error initializing streaming block decode");
rbuf_cycle(&stream, true, block->header_size);
stream.avail_out = 0;
bool first = true;
pipeline_item_t *pi = NULL;
io_block_t *ib = NULL;
lzma_ret err = LZMA_OK;
while (err != LZMA_STREAM_END) {
if (err != LZMA_OK)
die("Error decoding streaming block");
if (stream.avail_out == 0) {
if (ib) {
ib->outsize = ib->outcap;
pipeline_dispatch(pi, gPipelineMergeQ);
first = false;
}
queue_pop(gPipelineStartQ, (void**)&pi);
ib = (io_block_t*)pi->data;
ib->btype = (first ? BLOCK_UNSIZED : BLOCK_CONTINUATION);
block_capacity(ib, 0, STREAMSIZE);
stream.next_out = ib->output;
stream.avail_out = ib->outcap;
}
if (stream.avail_in == 0 && !rbuf_cycle(&stream, false, 0))
die("Error reading streaming block");
err = lzma_code(&stream, LZMA_RUN);
}
if (ib && stream.avail_out != ib->outcap) {
ib->outsize = ib->outcap - stream.avail_out;
pipeline_dispatch(pi, gPipelineMergeQ);
}
rbuf_consume(gRbuf->insize - stream.avail_in);
lzma_end(&stream);
}
static void read_index(void) {
lzma_stream stream = LZMA_STREAM_INIT;
lzma_index *index;
if (lzma_index_decoder(&stream, &index, MEMLIMIT) != LZMA_OK)
die("Error initializing index decoder");
rbuf_cycle(&stream, true, 0);
lzma_ret err = LZMA_OK;
while (err != LZMA_STREAM_END) {
if (err != LZMA_OK)
die("Error decoding index");
if (stream.avail_in == 0 && !rbuf_cycle(&stream, false, 0))
die("Error reading index");
err = lzma_code(&stream, LZMA_RUN);
}
rbuf_consume(gRbuf->insize - stream.avail_in);
lzma_end(&stream);
}
static void read_footer(void) {
lzma_stream_flags stream_flags;
if (rbuf_read(LZMA_STREAM_HEADER_SIZE) != RBUF_FULL)
die("Error reading stream footer");
if (lzma_stream_footer_decode(&stream_flags, gRbuf->input) != LZMA_OK)
die("Error decoding XZ footer");
rbuf_consume(LZMA_STREAM_HEADER_SIZE);
char zeros[4] = "\0\0\0\0";
while (true) {
rbuf_read_status st = rbuf_read(4);
if (st == RBUF_EOF)
return;
if (st != RBUF_FULL)
die("Footer must be multiple of four bytes");
if (memcmp(zeros, gRbuf->input, 4) != 0)
return;
rbuf_consume(4);
}
}
static void read_thread_noindex(void) {
bool empty = true;
lzma_check check = LZMA_CHECK_NONE;
while (read_header(&check)) {
empty = false;
while (read_block(false, check))
; // pass
read_index();
read_footer();
}
if (empty)
die("Empty input");
pipeline_stop();
}
static void read_thread(void) {
off_t offset = ftello(gInFile);
@ -258,7 +508,7 @@ static void read_thread(void) {
continue;
// Do we need this block?
if (gWantedFiles) {
if (gWantedFiles && gExplicitFiles) {
off_t uend = iter.block.uncompressed_file_offset +
iter.block.uncompressed_size;
if (!w || w->start >= uend) {
@ -273,28 +523,42 @@ static void read_thread(void) {
pipeline_item_t *pi;
queue_pop(gPipelineStartQ, (void**)&pi);
io_block_t *ib = (io_block_t*)(pi->data);
block_capacity(ib, iter.block.unpadded_size,
iter.block.uncompressed_size);
// Seek if needed, and get the data
if (offset != boffset) {
fseeko(gInFile, boffset, SEEK_SET);
offset = boffset;
}
ib->insize = fread(ib->input, 1, bsize, gInFile);
if (ib->insize < bsize)
die("Error reading block contents");
offset += bsize;
ib->uoffset = iter.block.uncompressed_file_offset;
pipeline_split(pi);
}
if (iter.block.uncompressed_size > MAXSPLITSIZE) { // must stream
if (gRbuf)
rbuf_consume(gRbuf->insize); // clear
read_block(true, iter.stream.flags->check);
} else {
ib->insize = fread(ib->input, 1, bsize, gInFile);
if (ib->insize < bsize)
die("Error reading block contents");
offset += bsize;
ib->uoffset = iter.block.uncompressed_file_offset;
ib->check = iter.stream.flags->check;
ib->btype = BLOCK_SIZED; // Indexed blocks always sized
pipeline_split(pi);
}
}
pipeline_stop();
}
#pragma mark DECODE
static void decode_thread(size_t thnum) {
lzma_stream stream = LZMA_STREAM_INIT;
lzma_filter filters[LZMA_FILTERS_MAX + 1];
lzma_block block = { .filters = filters, .check = gCheck, .version = 0 };
lzma_block block = { .filters = filters, .check = LZMA_CHECK_NONE,
.version = 0 };
pipeline_item_t *pi;
io_block_t *ib;
@ -303,14 +567,15 @@ static void decode_thread(size_t thnum) {
ib = (io_block_t*)(pi->data);
block.header_size = lzma_block_header_size_decode(*(ib->input));
if (lzma_block_header_decode(&block, NULL, ib->input) != LZMA_OK)
block.check = ib->check;
if (lzma_block_header_decode(&block, NULL, ib->input) != LZMA_OK)
die("Error decoding block header");
if (lzma_block_decoder(&stream, &block) != LZMA_OK)
die("Error initializing block decode");
stream.avail_in = ib->insize - block.header_size;
stream.next_in = ib->input + block.header_size;
stream.avail_out = gBlockOutSize;
stream.avail_out = ib->outcap;
stream.next_out = ib->output;
lzma_ret err = LZMA_OK;
@ -334,7 +599,7 @@ static int tar_ok(struct archive *ar, void *ref) {
}
static bool tar_next_block(void) {
if (gArItem && !gArNextItem && gArWanted) {
if (gArItem && !gArNextItem && gArWanted && gExplicitFiles) {
io_block_t *ib = (io_block_t*)(gArItem->data);
if (gArWanted->start < ib->uoffset + ib->outsize)
return true; // No need
@ -367,7 +632,7 @@ static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) {
off_t off;
size_t size;
io_block_t *ib = (io_block_t*)(gArItem->data);
if (gWantedFiles) {
if (gWantedFiles && gExplicitFiles) {
debug("tar want: %s", gArWanted->name);
off = gArWanted->start - ib->uoffset;
size = gArWanted->size;
@ -393,3 +658,21 @@ static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) {
*bufp = ib->output + off;
return size;
}
#pragma mark UTILS
static bool taste_tar(io_block_t *ib) {
struct archive *ar = archive_read_new();
archive_read_support_compression_none(ar);
archive_read_support_format_tar(ar);
archive_read_open_memory(ar, ib->output, ib->outsize);
struct archive_entry *entry;
bool ok = (archive_read_next_header(ar, &entry) == ARCHIVE_OK);
archive_read_finish(ar);
return ok;
}
static bool taste_file_index(io_block_t *ib) {
return xle64dec(ib->output) == PIXZ_INDEX_MAGIC;
}

@ -93,10 +93,9 @@ void pixz_write(bool tar, uint32_t level) {
}
// file index
if (gTar) {
if (gTar)
write_file_index();
free_file_index();
}
free_file_index();
// post-block cleanup: index, footer
encode_index();
@ -126,7 +125,6 @@ static void read_thread() {
while (true) {
int aerr = archive_read_next_header(ar, &entry);
if (aerr == ARCHIVE_EOF) {
// TODO
break;
} else if (aerr != ARCHIVE_OK && aerr != ARCHIVE_WARN) {
// Some charset translations warn spuriously

Loading…
Cancel
Save