diff --git a/src/line_buffer.cc b/src/line_buffer.cc index 4ece0c5c..7c4bdb19 100644 --- a/src/line_buffer.cc +++ b/src/line_buffer.cc @@ -60,9 +60,9 @@ static const ssize_t MAX_COMPRESSED_BUFFER_SIZE = 32 * 1024 * 1024; /* * XXX REMOVE ME * - * The stock gzipped file code does not use pread, so we need to use a lock to + * The stock bzipped file code does not use pread, so we need to use a lock to * get exclusive access to the file. In the future, we should just rewrite - * the gzipped file code to use pread. + * the bzipped file code to use pread. */ class lock_hack { public: @@ -116,9 +116,173 @@ private: }; /* XXX END */ + +#define Z_BUFSIZE 65536U +#define SYNCPOINT_SIZE (1024 * 1024) +line_buffer::gz_indexed::gz_indexed() +{ + if ((this->inbuf = (Bytef *)malloc(Z_BUFSIZE)) == NULL) { + throw bad_alloc(); + } +} + +void line_buffer::gz_indexed::close() +{ + // Release old stream, if we were open + if (*this) { + inflateEnd(&this->strm); + ::close(this->gz_fd); + this->syncpoints.clear(); + this->gz_fd = -1; + } +} + +void line_buffer::gz_indexed::init_stream() +{ + if (*this) { + inflateEnd(&this->strm); + } + + // initialize inflate struct + this->strm.zalloc = Z_NULL; + this->strm.zfree = Z_NULL; + this->strm.opaque = Z_NULL; + this->strm.avail_in = 0; + this->strm.next_in = Z_NULL; + this->strm.avail_out = 0; + int rc = inflateInit2(&strm, GZ_HEADER_MODE); + if (rc != Z_OK) { + throw(rc); // FIXME: exception wrapper + } +} + +void line_buffer::gz_indexed::continue_stream() +{ + // Save our position and output buffer + auto total_in = this->strm.total_in; + auto total_out = this->strm.total_out; + auto avail_out = this->strm.avail_out; + auto next_out = this->strm.next_out; + + init_stream(); + + // Restore position and output buffer + this->strm.total_in = total_in; + this->strm.total_out = total_out; + this->strm.avail_out = avail_out; + this->strm.next_out = next_out; +} + +void line_buffer::gz_indexed::open(int fd) +{ + this->close(); + this->init_stream(); + this->gz_fd = fd; +} + +int line_buffer::gz_indexed::stream_data(void * buf, size_t size) +{ + this->strm.avail_out = size; + this->strm.next_out = (unsigned char *) buf; + + int last = this->syncpoints.empty() ? 0 : + this->syncpoints.back().in; + while (this->strm.avail_out) { + if (!this->strm.avail_in) { + int rc = ::pread(this->gz_fd, + &this->inbuf[0], + Z_BUFSIZE, + this->strm.total_in); + if (rc < 0) { + return rc; + } + this->strm.next_in = this->inbuf; + this->strm.avail_in = rc; + } + if (this->strm.avail_in) { + int flush = last > this->strm.total_in + ? Z_SYNC_FLUSH : Z_BLOCK; + auto err = inflate(&this->strm, flush); + if (err == Z_STREAM_END) { + // Reached end of stream; re-init for a possible subsequent stream + continue_stream(); + } else if (err != Z_OK) { + log_error(" inflate-error: %d", (int)err); + throw error(err); // FIXME: exception wrapper + } + + if (this->strm.total_in >= last + SYNCPOINT_SIZE && + size > this->strm.avail_out + GZ_WINSIZE && + (this->strm.data_type & GZ_END_OF_BLOCK_MASK) && + !(this->strm.data_type & GZ_END_OF_FILE_MASK)) + { + this->syncpoints.emplace_back(this->strm, size); + last = this->strm.total_out; + } + } else if (this->strm.avail_out) { + // Processed all the gz file data but didn't fill + // the output buffer. We're done, even though we + // produced fewer bytes than requested. + break; + } + } + return size - this->strm.avail_out; +} + +void line_buffer::gz_indexed::seek(size_t offset) +{ + if (offset == this->strm.total_out) { + return; + } + + indexDict * dict = NULL; + // Find highest syncpoint not past offset + // FIXME: Make this a binary-tree search + for (auto &d : this->syncpoints) { + if (d.out <= offset) { + dict = &d; + } else { + break; + } + } + + // Choose highest available syncpoint, or keep current offset if it's ok + if (offset < this->strm.total_out || + (dict && this->strm.total_out < dict->out)) { + // Release the old z_stream + inflateEnd(&this->strm); + if (dict) { + dict->apply(&this->strm); + } else { + init_stream(); + } + } + + // Stream from compressed file until we reach our offset + unsigned char dummy[Z_BUFSIZE]; + while ( offset > this->strm.total_out) { + size_t to_copy = std::min(static_cast(Z_BUFSIZE), + offset - this->strm.total_out); + auto bytes = stream_data(dummy, to_copy); + if (bytes <= 0) { + break; + } + } +} + +int line_buffer::gz_indexed::read(void * buf, size_t offset, size_t size) +{ + if (offset != this->strm.total_out) { + this->seek(offset); + } + + int bytes = stream_data(buf, size); + + return bytes; +} + line_buffer::line_buffer() - : lb_gz_file(NULL), - lb_bz_file(false), + : lb_bz_file(false), lb_compressed_offset(0), lb_file_size(-1), lb_file_offset(0), @@ -149,8 +313,7 @@ void line_buffer::set_fd(auto_fd &fd) off_t newoff = 0; if (this->lb_gz_file) { - gzclose(this->lb_gz_file); - this->lb_gz_file = NULL; + this->lb_gz_file.close(); } if (this->lb_bz_file) { @@ -181,15 +344,7 @@ void line_buffer::set_fd(auto_fd &fd) close(gzfd); throw error(errno); } - if ((this->lb_gz_file = gzdopen(gzfd, "r")) == NULL) { - close(gzfd); - if (errno == 0) { - throw bad_alloc(); - } - else{ - throw error(errno); - } - } + lb_gz_file.open(gzfd); this->lb_file_time = read_le32( (const unsigned char *)&gz_id[4]); if (this->lb_file_time < 0) { @@ -343,16 +498,10 @@ bool line_buffer::fill_range(off_t start, ssize_t max_length) rc = 0; } else { - lock_hack::guard guard; - - lseek(this->lb_fd, this->lb_compressed_offset, SEEK_SET); - gzseek(this->lb_gz_file, - this->lb_file_offset + this->lb_buffer_size, - SEEK_SET); - rc = gzread(this->lb_gz_file, - &this->lb_buffer[this->lb_buffer_size], - this->lb_buffer_max - this->lb_buffer_size); - this->lb_compressed_offset = lseek(this->lb_fd, 0, SEEK_CUR); + rc = this->lb_gz_file.read(&this->lb_buffer[this->lb_buffer_size], + this->lb_file_offset + this->lb_buffer_size, + this->lb_buffer_max - this->lb_buffer_size); + this->lb_compressed_offset = this->lb_gz_file.get_source_offset(); if (rc != -1 && ( rc < (this->lb_buffer_max - this->lb_buffer_size))) { this->lb_file_size = ( diff --git a/src/line_buffer.hh b/src/line_buffer.hh index e5e323c5..4cc9002c 100644 --- a/src/line_buffer.hh +++ b/src/line_buffer.hh @@ -38,6 +38,7 @@ #include #include +#include #include "base/lnav_log.hh" #include "base/file_range.hh" @@ -74,6 +75,91 @@ public: int e_err; }; +#define GZ_WINSIZE 32768U /*> gzip's max supported dictionary is 15-bits */ +#define GZ_RAW_MODE (-15) /*> Raw inflate data mode */ +#define GZ_HEADER_MODE (15 + 32) /*> Automatic zstd or gzip decoding */ +#define GZ_BORROW_BITS_MASK 7 /*> Bits (0-7) consumed in previous block */ +#define GZ_END_OF_BLOCK_MASK 128 /*> Stopped because reached end-of-block */ +#define GZ_END_OF_FILE_MASK 64 /*> Stopped because reached end-of-file */ + + /** + * A memoized gzip file reader that can do random file access faster than + * gzseek/gzread alone. + */ + class gz_indexed { + public: + gz_indexed(); + gz_indexed(gz_indexed &&other) = default; + ~gz_indexed() { + this->close(); + } + + inline operator bool() const { + return this->gz_fd != -1; + } + + uLong get_source_offset() { + return !!*this ? this->strm.total_in + this->strm.avail_in : 0; + } + + void close(); + void init_stream(); + void continue_stream(); + void open(int fd); + int stream_data(void * buf, size_t size); + void seek(size_t offset); + + /** + * Decompress bytes from the gz file returning at most `size` bytes. + * offset is the byte-offset in the decompressed data stream. + */ + int read(void * buf, size_t offset, size_t size); + + struct indexDict { + off_t in = 0; + off_t out = 0; + unsigned char bits = 0; + unsigned char in_bits = 0; + Bytef index[GZ_WINSIZE]; + indexDict(z_stream const & s, const off_t size) { + assert((s.data_type & GZ_END_OF_BLOCK_MASK)); + assert(!(s.data_type & GZ_END_OF_FILE_MASK)); + assert(size >= s.avail_out + GZ_WINSIZE); + this->bits = s.data_type & GZ_BORROW_BITS_MASK; + this->in = s.total_in; + this->out = s.total_out; + auto last_byte_in = s.next_in[-1]; + this->in_bits = last_byte_in >> (8 - this->bits); + // Copy the last 32k uncompressed data (sliding window) to our index + memcpy(this->index, s.next_out - GZ_WINSIZE, GZ_WINSIZE); + } + + int apply(z_streamp s) { + s->zalloc = Z_NULL; + s->zfree = Z_NULL; + s->opaque = Z_NULL; + s->avail_in = 0; + s->next_in = Z_NULL; + auto ret = inflateInit2(s, GZ_RAW_MODE); + if (ret != Z_OK) { + return ret; + } + if (this->bits) { + inflatePrime(s, this->bits, this->in_bits); + } + s->total_in = this->in; + s->total_out = this->out; + inflateSetDictionary(s, this->index, GZ_WINSIZE); + return ret; + } + }; + private: + z_stream strm; /*< gzip streams structure */ + std::vector syncpoints; /*< indexed dictionaries as discovered */ + auto_mem inbuf; /*< Compressed data buffer */ + int gz_fd = -1; /*< The file to read data from. */ + }; + /** Construct an empty line_buffer. */ line_buffer(); @@ -103,7 +189,7 @@ public: }; bool is_compressed() const { - return this->lb_gz_file != NULL || this->lb_bz_file; + return this->lb_gz_file || this->lb_bz_file; }; off_t get_read_offset(off_t off) const @@ -235,7 +321,7 @@ private: shared_buffer lb_share_manager; auto_fd lb_fd; /*< The file to read data from. */ - gzFile lb_gz_file; /*< File handle for gzipped files. */ + gz_indexed lb_gz_file; /*< File reader for gzipped files. */ bool lb_bz_file; /*< Flag set for bzip2 compressed files. */ off_t lb_compressed_offset; /*< The offset into the compressed file. */ diff --git a/test/drive_line_buffer.cc b/test/drive_line_buffer.cc index 9dc8dafb..3e837ecf 100644 --- a/test/drive_line_buffer.cc +++ b/test/drive_line_buffer.cc @@ -56,7 +56,7 @@ int main(int argc, char *argv[]) { int c, rnd_iters = 5, retval = EXIT_SUCCESS; vector > index; - auto_fd fd = STDIN_FILENO; + auto_fd fd = STDIN_FILENO, fd_cmp; int offseti = 0; off_t offset = 0; int count = 1000; @@ -135,12 +135,20 @@ int main(int argc, char *argv[]) } else if ((argc > 0) && (fstat(fd, &st) == -1)) { perror("fstat"); retval = EXIT_FAILURE; + } else if ((argc > 1) && (fd_cmp = open(argv[1], O_RDONLY)) == -1) { + perror("open-cmp"); + retval = EXIT_FAILURE; + } else if ((argc > 1) && (fstat(fd_cmp, &st) == -1)) { + perror("fstat-cmp"); + retval = EXIT_FAILURE; } else { try { file_range last_range{offset}; line_buffer lb; char *maddr; + int fd2 = (argc > 1) ? fd_cmp.get() : fd.get(); + assert(fd2 >= 0); lb.set_fd(fd); if (index.size() == 0) { while (count) { @@ -179,7 +187,7 @@ int main(int argc, char *argv[]) st.st_size, PROT_READ, MAP_FILE | MAP_PRIVATE, - lb.get_fd(), + fd2, 0)) == MAP_FAILED) { perror("mmap"); retval = EXIT_FAILURE; diff --git a/test/test_line_buffer.sh b/test/test_line_buffer.sh index 18b798c3..5ee41175 100644 --- a/test/test_line_buffer.sh +++ b/test/test_line_buffer.sh @@ -56,3 +56,23 @@ run_test ./drive_line_buffer -i lb.index -n 10 lb-2.dat check_output "Random reads don't match input?" < lb-double.gz +gzip -c ${test_dir}/logfile_access_log.1 >> lb-double.gz +run_test ${lnav_test} -n lb-double.gz + +gzip -dc lb-double.gz | \ + check_output "concatenated gzip files don't parse correctly" + +> lb-3.gz +while test $(stat -c"%s" lb-3.gz) -le 5000000 ; do + cat lb-2.dat +done | gzip -c -1 > lb-3.gz +gzip -dc lb-3.gz > lb-3.dat +grep -b '$' lb-3.dat | cut -f 1 -d : > lb-3.index + +run_test ./drive_line_buffer -i lb-3.index -n 10 lb-3.gz lb-3.dat + +check_output "Random gzipped reads don't match input" <