From 5513deeade0f5c938f19ba0cb8fe4b9b3ca36053 Mon Sep 17 00:00:00 2001 From: Phil Hord Date: Tue, 30 Jul 2019 17:00:24 -0700 Subject: [PATCH] Add a gzip indexing class for faster gz navigation The gzread function is slow. Every time you seek to a new location, the whole file up to that position has to be decompressed again. This causes massive lags when trying to do simple things in lnav on a large .gz file. Use the zlib inflate* functions instead and record the dictionary periodically while processing the file the first time. Then use inflateSetDictionary to restore the dictionary to a convenient location when trying to seek into the file again in the future. Use a default period of 1MB of compressed data for syncpoints. Each syncpoint uses 32KB. This is a ratio of 3.2%. For example, a 1GB .gz file (compressed size) will require us to keep 32MB of index data in memory. A better method may be to use a fixed number of syncpoints and divide the file appropriately. This would keep the memory bounded at the cost of slower file navigation on large .gz files. Use pread to read the data for the stream decompressor and remove the lock_hack previously employed. NB. The documentation on these zlib functions is sparse. I followed the example in zlib/examples/zran.c, but I used the z_stream total_in and total_out variables instead of keeping my own separately as zran.c does. Maybe this is incompatible with some very old zlib versions. I haven't looked. --- src/line_buffer.cc | 199 +++++++++++++++++++++++++++++++++----- src/line_buffer.hh | 90 ++++++++++++++++- test/drive_line_buffer.cc | 12 ++- test/test_line_buffer.sh | 20 ++++ 4 files changed, 292 insertions(+), 29 deletions(-) diff --git a/src/line_buffer.cc b/src/line_buffer.cc index 4ece0c5c..7c4bdb19 100644 --- a/src/line_buffer.cc +++ b/src/line_buffer.cc @@ -60,9 +60,9 @@ static const ssize_t MAX_COMPRESSED_BUFFER_SIZE = 32 * 1024 * 1024; /* * XXX REMOVE ME * - * The stock gzipped file code does not use pread, so we need to use a lock to + * The stock bzipped file code does not use pread, so we need to use a lock to * get exclusive access to the file. In the future, we should just rewrite - * the gzipped file code to use pread. + * the bzipped file code to use pread. */ class lock_hack { public: @@ -116,9 +116,173 @@ private: }; /* XXX END */ + +#define Z_BUFSIZE 65536U +#define SYNCPOINT_SIZE (1024 * 1024) +line_buffer::gz_indexed::gz_indexed() +{ + if ((this->inbuf = (Bytef *)malloc(Z_BUFSIZE)) == NULL) { + throw bad_alloc(); + } +} + +void line_buffer::gz_indexed::close() +{ + // Release old stream, if we were open + if (*this) { + inflateEnd(&this->strm); + ::close(this->gz_fd); + this->syncpoints.clear(); + this->gz_fd = -1; + } +} + +void line_buffer::gz_indexed::init_stream() +{ + if (*this) { + inflateEnd(&this->strm); + } + + // initialize inflate struct + this->strm.zalloc = Z_NULL; + this->strm.zfree = Z_NULL; + this->strm.opaque = Z_NULL; + this->strm.avail_in = 0; + this->strm.next_in = Z_NULL; + this->strm.avail_out = 0; + int rc = inflateInit2(&strm, GZ_HEADER_MODE); + if (rc != Z_OK) { + throw(rc); // FIXME: exception wrapper + } +} + +void line_buffer::gz_indexed::continue_stream() +{ + // Save our position and output buffer + auto total_in = this->strm.total_in; + auto total_out = this->strm.total_out; + auto avail_out = this->strm.avail_out; + auto next_out = this->strm.next_out; + + init_stream(); + + // Restore position and output buffer + this->strm.total_in = total_in; + this->strm.total_out = total_out; + this->strm.avail_out = avail_out; + this->strm.next_out = next_out; +} + +void line_buffer::gz_indexed::open(int fd) +{ + this->close(); + this->init_stream(); + this->gz_fd = fd; +} + +int line_buffer::gz_indexed::stream_data(void * buf, size_t size) +{ + this->strm.avail_out = size; + this->strm.next_out = (unsigned char *) buf; + + int last = this->syncpoints.empty() ? 0 : + this->syncpoints.back().in; + while (this->strm.avail_out) { + if (!this->strm.avail_in) { + int rc = ::pread(this->gz_fd, + &this->inbuf[0], + Z_BUFSIZE, + this->strm.total_in); + if (rc < 0) { + return rc; + } + this->strm.next_in = this->inbuf; + this->strm.avail_in = rc; + } + if (this->strm.avail_in) { + int flush = last > this->strm.total_in + ? Z_SYNC_FLUSH : Z_BLOCK; + auto err = inflate(&this->strm, flush); + if (err == Z_STREAM_END) { + // Reached end of stream; re-init for a possible subsequent stream + continue_stream(); + } else if (err != Z_OK) { + log_error(" inflate-error: %d", (int)err); + throw error(err); // FIXME: exception wrapper + } + + if (this->strm.total_in >= last + SYNCPOINT_SIZE && + size > this->strm.avail_out + GZ_WINSIZE && + (this->strm.data_type & GZ_END_OF_BLOCK_MASK) && + !(this->strm.data_type & GZ_END_OF_FILE_MASK)) + { + this->syncpoints.emplace_back(this->strm, size); + last = this->strm.total_out; + } + } else if (this->strm.avail_out) { + // Processed all the gz file data but didn't fill + // the output buffer. We're done, even though we + // produced fewer bytes than requested. + break; + } + } + return size - this->strm.avail_out; +} + +void line_buffer::gz_indexed::seek(size_t offset) +{ + if (offset == this->strm.total_out) { + return; + } + + indexDict * dict = NULL; + // Find highest syncpoint not past offset + // FIXME: Make this a binary-tree search + for (auto &d : this->syncpoints) { + if (d.out <= offset) { + dict = &d; + } else { + break; + } + } + + // Choose highest available syncpoint, or keep current offset if it's ok + if (offset < this->strm.total_out || + (dict && this->strm.total_out < dict->out)) { + // Release the old z_stream + inflateEnd(&this->strm); + if (dict) { + dict->apply(&this->strm); + } else { + init_stream(); + } + } + + // Stream from compressed file until we reach our offset + unsigned char dummy[Z_BUFSIZE]; + while ( offset > this->strm.total_out) { + size_t to_copy = std::min(static_cast(Z_BUFSIZE), + offset - this->strm.total_out); + auto bytes = stream_data(dummy, to_copy); + if (bytes <= 0) { + break; + } + } +} + +int line_buffer::gz_indexed::read(void * buf, size_t offset, size_t size) +{ + if (offset != this->strm.total_out) { + this->seek(offset); + } + + int bytes = stream_data(buf, size); + + return bytes; +} + line_buffer::line_buffer() - : lb_gz_file(NULL), - lb_bz_file(false), + : lb_bz_file(false), lb_compressed_offset(0), lb_file_size(-1), lb_file_offset(0), @@ -149,8 +313,7 @@ void line_buffer::set_fd(auto_fd &fd) off_t newoff = 0; if (this->lb_gz_file) { - gzclose(this->lb_gz_file); - this->lb_gz_file = NULL; + this->lb_gz_file.close(); } if (this->lb_bz_file) { @@ -181,15 +344,7 @@ void line_buffer::set_fd(auto_fd &fd) close(gzfd); throw error(errno); } - if ((this->lb_gz_file = gzdopen(gzfd, "r")) == NULL) { - close(gzfd); - if (errno == 0) { - throw bad_alloc(); - } - else{ - throw error(errno); - } - } + lb_gz_file.open(gzfd); this->lb_file_time = read_le32( (const unsigned char *)&gz_id[4]); if (this->lb_file_time < 0) { @@ -343,16 +498,10 @@ bool line_buffer::fill_range(off_t start, ssize_t max_length) rc = 0; } else { - lock_hack::guard guard; - - lseek(this->lb_fd, this->lb_compressed_offset, SEEK_SET); - gzseek(this->lb_gz_file, - this->lb_file_offset + this->lb_buffer_size, - SEEK_SET); - rc = gzread(this->lb_gz_file, - &this->lb_buffer[this->lb_buffer_size], - this->lb_buffer_max - this->lb_buffer_size); - this->lb_compressed_offset = lseek(this->lb_fd, 0, SEEK_CUR); + rc = this->lb_gz_file.read(&this->lb_buffer[this->lb_buffer_size], + this->lb_file_offset + this->lb_buffer_size, + this->lb_buffer_max - this->lb_buffer_size); + this->lb_compressed_offset = this->lb_gz_file.get_source_offset(); if (rc != -1 && ( rc < (this->lb_buffer_max - this->lb_buffer_size))) { this->lb_file_size = ( diff --git a/src/line_buffer.hh b/src/line_buffer.hh index e5e323c5..4cc9002c 100644 --- a/src/line_buffer.hh +++ b/src/line_buffer.hh @@ -38,6 +38,7 @@ #include #include +#include #include "base/lnav_log.hh" #include "base/file_range.hh" @@ -74,6 +75,91 @@ public: int e_err; }; +#define GZ_WINSIZE 32768U /*> gzip's max supported dictionary is 15-bits */ +#define GZ_RAW_MODE (-15) /*> Raw inflate data mode */ +#define GZ_HEADER_MODE (15 + 32) /*> Automatic zstd or gzip decoding */ +#define GZ_BORROW_BITS_MASK 7 /*> Bits (0-7) consumed in previous block */ +#define GZ_END_OF_BLOCK_MASK 128 /*> Stopped because reached end-of-block */ +#define GZ_END_OF_FILE_MASK 64 /*> Stopped because reached end-of-file */ + + /** + * A memoized gzip file reader that can do random file access faster than + * gzseek/gzread alone. + */ + class gz_indexed { + public: + gz_indexed(); + gz_indexed(gz_indexed &&other) = default; + ~gz_indexed() { + this->close(); + } + + inline operator bool() const { + return this->gz_fd != -1; + } + + uLong get_source_offset() { + return !!*this ? this->strm.total_in + this->strm.avail_in : 0; + } + + void close(); + void init_stream(); + void continue_stream(); + void open(int fd); + int stream_data(void * buf, size_t size); + void seek(size_t offset); + + /** + * Decompress bytes from the gz file returning at most `size` bytes. + * offset is the byte-offset in the decompressed data stream. + */ + int read(void * buf, size_t offset, size_t size); + + struct indexDict { + off_t in = 0; + off_t out = 0; + unsigned char bits = 0; + unsigned char in_bits = 0; + Bytef index[GZ_WINSIZE]; + indexDict(z_stream const & s, const off_t size) { + assert((s.data_type & GZ_END_OF_BLOCK_MASK)); + assert(!(s.data_type & GZ_END_OF_FILE_MASK)); + assert(size >= s.avail_out + GZ_WINSIZE); + this->bits = s.data_type & GZ_BORROW_BITS_MASK; + this->in = s.total_in; + this->out = s.total_out; + auto last_byte_in = s.next_in[-1]; + this->in_bits = last_byte_in >> (8 - this->bits); + // Copy the last 32k uncompressed data (sliding window) to our index + memcpy(this->index, s.next_out - GZ_WINSIZE, GZ_WINSIZE); + } + + int apply(z_streamp s) { + s->zalloc = Z_NULL; + s->zfree = Z_NULL; + s->opaque = Z_NULL; + s->avail_in = 0; + s->next_in = Z_NULL; + auto ret = inflateInit2(s, GZ_RAW_MODE); + if (ret != Z_OK) { + return ret; + } + if (this->bits) { + inflatePrime(s, this->bits, this->in_bits); + } + s->total_in = this->in; + s->total_out = this->out; + inflateSetDictionary(s, this->index, GZ_WINSIZE); + return ret; + } + }; + private: + z_stream strm; /*< gzip streams structure */ + std::vector syncpoints; /*< indexed dictionaries as discovered */ + auto_mem inbuf; /*< Compressed data buffer */ + int gz_fd = -1; /*< The file to read data from. */ + }; + /** Construct an empty line_buffer. */ line_buffer(); @@ -103,7 +189,7 @@ public: }; bool is_compressed() const { - return this->lb_gz_file != NULL || this->lb_bz_file; + return this->lb_gz_file || this->lb_bz_file; }; off_t get_read_offset(off_t off) const @@ -235,7 +321,7 @@ private: shared_buffer lb_share_manager; auto_fd lb_fd; /*< The file to read data from. */ - gzFile lb_gz_file; /*< File handle for gzipped files. */ + gz_indexed lb_gz_file; /*< File reader for gzipped files. */ bool lb_bz_file; /*< Flag set for bzip2 compressed files. */ off_t lb_compressed_offset; /*< The offset into the compressed file. */ diff --git a/test/drive_line_buffer.cc b/test/drive_line_buffer.cc index 9dc8dafb..3e837ecf 100644 --- a/test/drive_line_buffer.cc +++ b/test/drive_line_buffer.cc @@ -56,7 +56,7 @@ int main(int argc, char *argv[]) { int c, rnd_iters = 5, retval = EXIT_SUCCESS; vector > index; - auto_fd fd = STDIN_FILENO; + auto_fd fd = STDIN_FILENO, fd_cmp; int offseti = 0; off_t offset = 0; int count = 1000; @@ -135,12 +135,20 @@ int main(int argc, char *argv[]) } else if ((argc > 0) && (fstat(fd, &st) == -1)) { perror("fstat"); retval = EXIT_FAILURE; + } else if ((argc > 1) && (fd_cmp = open(argv[1], O_RDONLY)) == -1) { + perror("open-cmp"); + retval = EXIT_FAILURE; + } else if ((argc > 1) && (fstat(fd_cmp, &st) == -1)) { + perror("fstat-cmp"); + retval = EXIT_FAILURE; } else { try { file_range last_range{offset}; line_buffer lb; char *maddr; + int fd2 = (argc > 1) ? fd_cmp.get() : fd.get(); + assert(fd2 >= 0); lb.set_fd(fd); if (index.size() == 0) { while (count) { @@ -179,7 +187,7 @@ int main(int argc, char *argv[]) st.st_size, PROT_READ, MAP_FILE | MAP_PRIVATE, - lb.get_fd(), + fd2, 0)) == MAP_FAILED) { perror("mmap"); retval = EXIT_FAILURE; diff --git a/test/test_line_buffer.sh b/test/test_line_buffer.sh index 18b798c3..5ee41175 100644 --- a/test/test_line_buffer.sh +++ b/test/test_line_buffer.sh @@ -56,3 +56,23 @@ run_test ./drive_line_buffer -i lb.index -n 10 lb-2.dat check_output "Random reads don't match input?" < lb-double.gz +gzip -c ${test_dir}/logfile_access_log.1 >> lb-double.gz +run_test ${lnav_test} -n lb-double.gz + +gzip -dc lb-double.gz | \ + check_output "concatenated gzip files don't parse correctly" + +> lb-3.gz +while test $(stat -c"%s" lb-3.gz) -le 5000000 ; do + cat lb-2.dat +done | gzip -c -1 > lb-3.gz +gzip -dc lb-3.gz > lb-3.dat +grep -b '$' lb-3.dat | cut -f 1 -d : > lb-3.index + +run_test ./drive_line_buffer -i lb-3.index -n 10 lb-3.gz lb-3.dat + +check_output "Random gzipped reads don't match input" <