[perf] try to overlap line buffer i/o

pull/824/merge
Timothy Stack 2 years ago
parent 09bd5396b2
commit 36e9433891

@ -48,6 +48,9 @@ lnav v0.10.2:
* Added a "glob" property to search tables defined in log formats
to constrain searches to log messages from files that have a
matching log_path value.
* Initial indexing of large files should be faster. Decompression
and searching for line-endings are now pipelined, so they happen
in a thread that is separate from the regular expression matcher.
Breaking Changes:
* Added a 'language' column to the lnav_view_filters table that
@ -77,7 +80,6 @@ lnav v0.10.2:
* Toggling enabled/disabled filters when there is a SQL expression
no longer causes a crash.
* Fix a crash related to long lines that are word wrapped.
*
lnav v0.10.1:
Features:

@ -156,8 +156,16 @@ while True:
for fname, gen in FILES:
for i in range(random.randrange(0, 4)):
with open(fname, "a+") as fp:
fp.write(next(gen))
#if random.uniform(0.0, 1.0) < 0.010:
if random.uniform(0.0, 1.0) < 0.01:
line = next(gen)
prefix = line[:50]
suffix = line[50:]
fp.write(prefix)
time.sleep(random.uniform(0.5, 0.6))
fp.write(suffix)
else:
fp.write(next(gen))
# if random.uniform(0.0, 1.0) < 0.010:
# fp.truncate(0)
time.sleep(random.uniform(0.01, 0.02))
#if random.uniform(0.0, 1.0) < 0.001:

@ -506,6 +506,8 @@ add_library(
third-party/intervaltree/IntervalTree.h
third-party/md4c/md4c.h
third-party/robin_hood/robin_hood.h
)
set(lnav_SRCS lnav.cc)

@ -355,6 +355,7 @@ THIRD_PARTY_SRCS = \
third-party/doctest-root/doctest/doctest.h \
third-party/intervaltree/IntervalTree.h \
third-party/md4c/md4c.c \
third-party/robin_hood/robin_hood.h \
third-party/sqlite/ext/dbdump.c \
third-party/sqlite/ext/series.c

@ -149,16 +149,29 @@ private:
class auto_buffer {
public:
static auto_buffer alloc(size_t size)
static auto_buffer alloc(size_t capacity)
{
return auto_buffer{(char*) malloc(size), size};
return auto_buffer{(char*) malloc(capacity), capacity};
}
static auto_buffer from(const char* mem, size_t size)
{
auto retval = alloc(size);
retval.resize(size);
memcpy(retval.in(), mem, size);
return retval;
}
auto_buffer(const auto_buffer&) = delete;
auto_buffer(auto_buffer&& other) noexcept
: ab_buffer(other.ab_buffer), ab_size(other.ab_size)
: ab_buffer(other.ab_buffer), ab_size(other.ab_size),
ab_capacity(other.ab_capacity)
{
other.ab_buffer = nullptr;
other.ab_size = 0;
other.ab_capacity = 0;
}
~auto_buffer()
@ -166,48 +179,128 @@ public:
free(this->ab_buffer);
this->ab_buffer = nullptr;
this->ab_size = 0;
this->ab_capacity = 0;
}
auto_buffer& operator=(auto_buffer&) = delete;
auto_buffer& operator=(auto_buffer&& other) noexcept
{
this->ab_buffer = other.ab_buffer;
this->ab_size = other.ab_size;
this->ab_capacity = other.ab_capacity;
return *this;
}
void swap(auto_buffer& other)
{
std::swap(this->ab_buffer, other.ab_buffer);
std::swap(this->ab_size, other.ab_size);
std::swap(this->ab_capacity, other.ab_capacity);
}
char* in() { return this->ab_buffer; }
char* at(size_t offset) { return &this->ab_buffer[offset]; }
const char* at(size_t offset) const { return &this->ab_buffer[offset]; }
char* begin() { return this->ab_buffer; }
const char* begin() const { return this->ab_buffer; }
std::reverse_iterator<char*> rbegin()
{
return std::reverse_iterator<char*>(this->end());
}
std::reverse_iterator<const char*> rbegin() const
{
return std::reverse_iterator<const char*>(this->end());
}
char* end() { return &this->ab_buffer[this->ab_size]; }
const char* end() const { return &this->ab_buffer[this->ab_size]; }
std::reverse_iterator<char*> rend()
{
return std::reverse_iterator<char*>(this->begin());
}
std::reverse_iterator<const char*> rend() const
{
return std::reverse_iterator<const char*>(this->begin());
}
std::pair<char*, size_t> release()
{
auto retval = std::make_pair(this->ab_buffer, this->ab_size);
this->ab_buffer = nullptr;
this->ab_size = 0;
this->ab_capacity = 0;
return retval;
}
size_t size() const { return this->ab_size; }
void expand_by(size_t amount)
bool empty() const { return this->ab_size == 0; }
bool full() const { return this->ab_size == this->ab_capacity; }
size_t capacity() const { return this->ab_capacity; }
size_t available() const { return this->ab_capacity - this->ab_size; }
void clear() { this->resize(0); }
auto_buffer& resize(size_t new_size)
{
if (amount == 0) {
assert(new_size <= this->ab_capacity);
this->ab_size = new_size;
return *this;
}
auto_buffer& resize_by(ssize_t amount)
{
return this->resize(this->ab_size + amount);
}
void expand_to(size_t new_capacity)
{
if (new_capacity <= this->ab_capacity) {
return;
}
auto new_size = this->ab_size + amount;
auto new_buffer = (char*) realloc(this->ab_buffer, new_size);
auto* new_buffer = (char*) realloc(this->ab_buffer, new_capacity);
if (new_buffer == nullptr) {
throw std::bad_alloc();
}
this->ab_buffer = new_buffer;
this->ab_size = new_size;
this->ab_capacity = new_capacity;
}
auto_buffer& shrink_to(size_t new_size)
void expand_by(size_t amount)
{
this->ab_size = new_size;
return *this;
if (amount == 0) {
return;
}
this->expand_to(this->ab_capacity + amount);
}
private:
auto_buffer(char* buffer, size_t size) : ab_buffer(buffer), ab_size(size) {}
auto_buffer(char* buffer, size_t capacity)
: ab_buffer(buffer), ab_capacity(capacity)
{
}
char* ab_buffer;
size_t ab_size;
size_t ab_size{0};
size_t ab_capacity;
};
struct text_auto_buffer {

@ -154,9 +154,25 @@ date_time_scanner::scan(const char* time_dest,
this->to_localtime(gmt, *tm_out);
}
tv_out.tv_sec = tm2sec(&tm_out->et_tm);
const auto& last_tm = this->dts_last_tm.et_tm;
if (last_tm.tm_year == tm_out->et_tm.tm_year
&& last_tm.tm_mon == tm_out->et_tm.tm_mon
&& last_tm.tm_mday == tm_out->et_tm.tm_mday
&& last_tm.tm_hour == tm_out->et_tm.tm_hour
&& last_tm.tm_min == tm_out->et_tm.tm_min)
{
const auto sec_diff = tm_out->et_tm.tm_sec - last_tm.tm_sec;
// log_debug("diff %d", sec_diff);
tv_out = this->dts_last_tv;
tv_out.tv_sec += sec_diff;
tm_out->et_tm.tm_wday = last_tm.tm_wday;
} else {
// log_debug("doing tm2sec");
tv_out.tv_sec = tm2sec(&tm_out->et_tm);
secs2wday(tv_out, &tm_out->et_tm);
}
tv_out.tv_usec = tm_out->et_nsec / 1000;
secs2wday(tv_out, &tm_out->et_tm);
this->dts_fmt_lock = curr_time_fmt;
this->dts_fmt_len = retval - time_dest;
@ -211,6 +227,11 @@ date_time_scanner::scan(const char* time_dest,
retval = nullptr;
}
if (retval != nullptr) {
this->dts_last_tm = *tm_out;
this->dts_last_tv = tv_out;
}
if (retval != nullptr && static_cast<size_t>(retval - time_dest) < time_len)
{
/* Try to pull out the milli/micro-second value. */

@ -55,6 +55,8 @@ struct date_time_scanner {
this->dts_base_tm = exttm{};
this->dts_fmt_lock = -1;
this->dts_fmt_len = -1;
this->dts_last_tv = timeval{};
this->dts_last_tm = exttm{};
}
/**
@ -70,6 +72,8 @@ struct date_time_scanner {
{
this->dts_base_time = base_time;
localtime_r(&base_time, &this->dts_base_tm.et_tm);
this->dts_last_tm = exttm{};
this->dts_last_tv = timeval{};
}
/**
@ -87,6 +91,8 @@ struct date_time_scanner {
struct exttm dts_base_tm;
int dts_fmt_lock{-1};
int dts_fmt_len{-1};
struct exttm dts_last_tm {};
struct timeval dts_last_tv {};
time_t dts_local_offset_cache{0};
time_t dts_local_offset_valid{0};
time_t dts_local_offset_expiry{0};

@ -333,6 +333,12 @@ struct string_fragment {
int sf_end;
};
inline bool
operator==(const std::string& left, const string_fragment& right)
{
return right == left;
}
inline bool
operator<(const char* left, const string_fragment& right)
{

@ -56,7 +56,7 @@ compress(const void* input, size_t len)
zs.opaque = Z_NULL;
zs.avail_in = (uInt) len;
zs.next_in = (Bytef*) input;
zs.avail_out = (uInt) retval.size();
zs.avail_out = (uInt) retval.capacity();
zs.next_out = (Bytef*) retval.in();
zs.total_out = 0;
@ -76,7 +76,7 @@ compress(const void* input, size_t len)
return Err(fmt::format(
FMT_STRING("unable to finalize compression -- {}"), zError(rc)));
}
return Ok(std::move(retval.shrink_to(zs.total_out)));
return Ok(std::move(retval.resize(zs.total_out)));
}
Result<auto_buffer, std::string>
@ -107,7 +107,7 @@ uncompress(const std::string& src, const void* buffer, size_t size)
}
strm.next_out = (Bytef*) (uncomp.in() + strm.total_out);
strm.avail_out = uncomp.size() - strm.total_out;
strm.avail_out = uncomp.capacity() - strm.total_out;
// Inflate another chunk.
err = inflate(&strm, Z_SYNC_FLUSH);
@ -127,7 +127,7 @@ uncompress(const std::string& src, const void* buffer, size_t size)
strm.msg ? strm.msg : zError(err)));
}
return Ok(std::move(uncomp.shrink_to(strm.total_out)));
return Ok(std::move(uncomp.resize(strm.total_out)));
}
} // namespace gzip

@ -53,3 +53,15 @@ TEST_CASE("lnav::gzip::uncompress")
== "unable to uncompress: garbage -- incorrect header check");
}
}
TEST_CASE("lnav::gzip::roundtrip")
{
const char msg[] = "Hello, World!";
auto c_res = lnav::gzip::compress(msg, sizeof(msg));
auto buf = c_res.unwrap();
auto u_res = lnav::gzip::uncompress("test", buf.in(), buf.size());
auto buf2 = u_res.unwrap();
CHECK(std::string(msg) == std::string(buf2.in()));
}

@ -114,8 +114,9 @@ secs2wday(const struct timeval& tv, struct tm* res)
}
/* compute day of week */
if ((res->tm_wday = ((EPOCH_WDAY + days) % DAYSPERWEEK)) < 0)
if ((res->tm_wday = ((EPOCH_WDAY + days) % DAYSPERWEEK)) < 0) {
res->tm_wday += DAYSPERWEEK;
}
}
struct tm*

@ -36,6 +36,7 @@
#include <limits>
#include <map>
#include <string>
#include <unordered_map>
#include <vector>
#include "base/lnav_log.hh"
@ -300,7 +301,7 @@ protected:
bool sbc_do_stacking{true};
unsigned long sbc_left{0}, sbc_right{0};
std::vector<struct chart_ident> sbc_idents;
std::map<T, unsigned int> sbc_ident_lookup;
std::unordered_map<T, unsigned int> sbc_ident_lookup;
show_state sbc_show_state{show_all()};
};

@ -47,7 +47,10 @@
# include "simdutf8check.h"
#endif
#include "base/injector.bind.hh"
#include "base/injector.hh"
#include "base/is_utf8.hh"
#include "base/isc.hh"
#include "base/math_util.hh"
#include "fmtlib/fmt/format.h"
#include "line_buffer.hh"
@ -56,6 +59,21 @@ static const ssize_t INITIAL_REQUEST_SIZE = 16 * 1024;
static const ssize_t DEFAULT_INCREMENT = 128 * 1024;
static const ssize_t MAX_COMPRESSED_BUFFER_SIZE = 32 * 1024 * 1024;
class io_looper : public isc::service<io_looper> {};
struct io_looper_tag {};
static auto bound_io = injector::bind_multiple<isc::service_base>()
.add_singleton<io_looper, io_looper_tag>();
namespace injector {
template<>
void
force_linking(io_looper_tag anno)
{
}
} // namespace injector
/*
* XXX REMOVE ME
*
@ -284,6 +302,7 @@ int
line_buffer::gz_indexed::read(void* buf, size_t offset, size_t size)
{
if (offset != this->strm.total_out) {
// log_debug("doing seek! %d %d", offset, this->strm.total_out);
this->seek(offset);
}
@ -293,15 +312,7 @@ line_buffer::gz_indexed::read(void* buf, size_t offset, size_t size)
}
line_buffer::line_buffer()
: lb_bz_file(false), lb_compressed_offset(0), lb_file_size(-1),
lb_file_offset(0), lb_file_time(0), lb_buffer_size(0),
lb_buffer_max(DEFAULT_LINE_BUFFER_SIZE), lb_seekable(false),
lb_last_line_offset(-1)
{
if ((this->lb_buffer = (char*) malloc(this->lb_buffer_max)) == nullptr) {
throw std::bad_alloc();
}
ensure(this->invariant());
}
@ -319,8 +330,12 @@ line_buffer::set_fd(auto_fd& fd)
{
file_off_t newoff = 0;
if (this->lb_gz_file) {
this->lb_gz_file.close();
{
safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
if (*gi) {
gi->close();
}
}
if (this->lb_bz_file) {
@ -350,7 +365,8 @@ line_buffer::set_fd(auto_fd& fd)
close(gzfd);
throw error(errno);
}
lb_gz_file.open(gzfd);
lb_gz_file.writeAccess()->open(gzfd);
this->lb_compressed = true;
this->lb_file_time
= read_le32((const unsigned char*) &gz_id[4]);
if (this->lb_file_time < 0) {
@ -366,6 +382,7 @@ line_buffer::set_fd(auto_fd& fd)
throw error(errno);
}
this->lb_bz_file = true;
this->lb_compressed = true;
/*
* Loading data from a bzip2 file is pretty slow, so we try
@ -380,8 +397,9 @@ line_buffer::set_fd(auto_fd& fd)
this->lb_seekable = true;
}
}
log_debug("newoff %d", newoff);
this->lb_file_offset = newoff;
this->lb_buffer_size = 0;
this->lb_buffer.clear();
this->lb_fd = std::move(fd);
ensure(this->invariant());
@ -390,24 +408,10 @@ line_buffer::set_fd(auto_fd& fd)
void
line_buffer::resize_buffer(size_t new_max)
{
require(this->lb_bz_file || this->lb_gz_file
|| new_max <= MAX_LINE_BUFFER_SIZE);
if (new_max > (size_t) this->lb_buffer_max) {
char *tmp, *old;
if (new_max > (size_t) this->lb_buffer.capacity()) {
/* Still need more space, try a realloc. */
old = this->lb_buffer.release();
this->lb_share_manager.invalidate_refs();
tmp = (char*) realloc(old, new_max);
if (tmp != NULL) {
this->lb_buffer = tmp;
this->lb_buffer_max = new_max;
} else {
this->lb_buffer = old;
throw error(ENOMEM);
}
this->lb_buffer.expand_to(new_max);
}
}
@ -416,7 +420,9 @@ line_buffer::ensure_available(file_off_t start, ssize_t max_length)
{
ssize_t prefill, available;
require(max_length <= MAX_LINE_BUFFER_SIZE);
require(this->lb_compressed || max_length <= MAX_LINE_BUFFER_SIZE);
// log_debug("ensure avail %d %d", start, max_length);
if (this->lb_file_size != -1) {
if (start + (file_off_t) max_length > this->lb_file_size) {
@ -429,7 +435,7 @@ line_buffer::ensure_available(file_off_t start, ssize_t max_length)
* after.
*/
if (start < this->lb_file_offset
|| start > (file_off_t) (this->lb_file_offset + this->lb_buffer_size))
|| start > (file_off_t) (this->lb_file_offset + this->lb_buffer.size()))
{
/*
* The request is outside the cached range, need to reload the
@ -437,20 +443,22 @@ line_buffer::ensure_available(file_off_t start, ssize_t max_length)
*/
this->lb_share_manager.invalidate_refs();
prefill = 0;
this->lb_buffer_size = 0;
this->lb_buffer.clear();
if ((this->lb_file_size != (ssize_t) -1)
&& (start + this->lb_buffer_max > this->lb_file_size))
&& (start + this->lb_buffer.capacity() > this->lb_file_size))
{
require(start < this->lb_file_size);
/*
* If the start is near the end of the file, move the offset back a
* bit so we can get more of the file in the cache.
*/
this->lb_file_offset = this->lb_file_size
- std::min(this->lb_file_size,
(file_ssize_t) this->lb_buffer_max);
(file_ssize_t) this->lb_buffer.capacity());
} else {
this->lb_file_offset = start;
}
log_debug("adjusted file offset %d %d", start, this->lb_file_offset);
} else {
/* The request is in the cached range. Record how much extra data is in
* the buffer before the requested range.
@ -458,29 +466,205 @@ line_buffer::ensure_available(file_off_t start, ssize_t max_length)
prefill = start - this->lb_file_offset;
}
require(this->lb_file_offset <= start);
require(prefill <= this->lb_buffer_size);
require(prefill <= this->lb_buffer.size());
available = this->lb_buffer_max - (start - this->lb_file_offset);
require(available <= this->lb_buffer_max);
available = this->lb_buffer.capacity() - (start - this->lb_file_offset);
require(available <= this->lb_buffer.capacity());
if (max_length > available) {
// log_debug("need more space!");
/*
* Need more space, move any existing data to the front of the
* buffer.
*/
this->lb_share_manager.invalidate_refs();
this->lb_buffer_size -= prefill;
this->lb_buffer.resize_by(-prefill);
this->lb_file_offset += prefill;
memmove(&this->lb_buffer[0],
&this->lb_buffer[prefill],
this->lb_buffer_size);
// log_debug("adjust file offset for prefill %d", this->lb_file_offset);
memmove(this->lb_buffer.at(0),
this->lb_buffer.at(prefill),
this->lb_buffer.size());
available = this->lb_buffer_max - (start - this->lb_file_offset);
available = this->lb_buffer.capacity() - (start - this->lb_file_offset);
if (max_length > available) {
this->resize_buffer(roundup_size(max_length, DEFAULT_INCREMENT));
}
}
this->lb_line_starts.clear();
this->lb_line_is_utf.clear();
}
bool
line_buffer::load_next_buffer()
{
// log_debug("loader here!");
auto retval = false;
auto start = this->lb_loader_file_offset.value();
ssize_t rc = 0;
safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
// log_debug("BEGIN preload read");
/* ... read in the new data. */
if (*gi) {
if (this->lb_file_size != (ssize_t) -1 && this->in_range(start)
&& this->in_range(this->lb_file_size - 1))
{
rc = 0;
} else {
// log_debug("decomp start");
rc = gi->read(this->lb_alt_buffer.value().end(),
start + this->lb_alt_buffer.value().size(),
this->lb_alt_buffer.value().available());
this->lb_compressed_offset = gi->get_source_offset();
if (rc != -1 && (rc < this->lb_alt_buffer.value().available())
&& (start + this->lb_alt_buffer.value().size() + rc
> this->lb_file_size))
{
this->lb_file_size
= (start + this->lb_alt_buffer.value().size() + rc);
}
// log_debug("decomp end");
}
}
#ifdef HAVE_BZLIB_H
else if (this->lb_bz_file)
{
if (this->lb_file_size != (ssize_t) -1
&& (((ssize_t) start >= this->lb_file_size)
|| (this->in_range(start)
&& this->in_range(this->lb_file_size - 1))))
{
rc = 0;
} else {
lock_hack::guard guard;
char scratch[32 * 1024];
BZFILE* bz_file;
file_off_t seek_to;
int bzfd;
/*
* Unfortunately, there is no bzseek, so we need to reopen the
* file every time we want to do a read.
*/
bzfd = dup(this->lb_fd);
if (lseek(this->lb_fd, 0, SEEK_SET) < 0) {
close(bzfd);
throw error(errno);
}
if ((bz_file = BZ2_bzdopen(bzfd, "r")) == NULL) {
close(bzfd);
if (errno == 0) {
throw std::bad_alloc();
} else {
throw error(errno);
}
}
seek_to = start + this->lb_alt_buffer.value().size();
while (seek_to > 0) {
int count;
count = BZ2_bzread(bz_file,
scratch,
std::min((size_t) seek_to, sizeof(scratch)));
seek_to -= count;
}
rc = BZ2_bzread(bz_file,
this->lb_alt_buffer->end(),
this->lb_alt_buffer->available());
this->lb_compressed_offset = lseek(bzfd, 0, SEEK_SET);
BZ2_bzclose(bz_file);
if (rc != -1 && (rc < (this->lb_alt_buffer.value().available())) &&
(start + this->lb_alt_buffer.value().size() + rc > this->lb_file_size)) {
this->lb_file_size
= (start + this->lb_alt_buffer.value().size() + rc);
}
}
}
#endif
else
{
rc = pread(this->lb_fd,
this->lb_alt_buffer.value().end(),
this->lb_alt_buffer.value().available(),
start + this->lb_alt_buffer.value().size());
}
// XXX For some reason, cygwin is giving us a bogus return value when
// up to the end of the file.
if (rc > (this->lb_alt_buffer.value().available())) {
rc = -1;
#ifdef ENODATA
errno = ENODATA;
#else
errno = EAGAIN;
#endif
}
switch (rc) {
case 0:
if (start < (file_off_t) this->lb_file_size) {
retval = true;
}
break;
case (ssize_t) -1:
switch (errno) {
#ifdef ENODATA
/* Cygwin seems to return this when pread reaches the end of
* the file. */
case ENODATA:
#endif
case EINTR:
case EAGAIN:
break;
default:
throw error(errno);
}
break;
default:
this->lb_alt_buffer.value().resize_by(rc);
retval = true;
break;
}
// log_debug("END preload read");
if (start > this->lb_last_line_offset) {
auto* line_start = this->lb_alt_buffer.value().begin();
do {
const char* msg = nullptr;
int faulty_bytes = 0;
bool valid_utf = true;
char* lf = nullptr;
auto before = line_start - this->lb_alt_buffer->begin();
auto remaining = this->lb_alt_buffer.value().size() - before;
auto utf8_end = is_utf8(
(unsigned char*) line_start, remaining, &msg, &faulty_bytes);
if (msg != nullptr) {
lf = (char*) memchr(line_start, '\n', remaining);
utf8_end = lf - line_start;
valid_utf = false;
}
if (utf8_end >= 0) {
lf = line_start + utf8_end;
}
this->lb_alt_line_starts.emplace_back(before);
this->lb_alt_line_is_utf.emplace_back(valid_utf);
if (lf != nullptr) {
line_start = lf + 1;
} else {
line_start = nullptr;
}
} while (line_start != nullptr
&& line_start < this->lb_alt_buffer->end());
}
return retval;
}
bool
@ -490,34 +674,110 @@ line_buffer::fill_range(file_off_t start, ssize_t max_length)
require(start >= 0);
// log_debug("fill range %d %d", start, max_length);
#if 0
log_debug("(%p) fill range %d %d (%d) %d",
this,
start,
max_length,
this->lb_file_offset,
this->lb_buffer.size());
#endif
if (this->lb_loader_future.valid()
&& start >= this->lb_loader_file_offset.value())
{
#if 0
log_debug("getting preload! %d %d",
start,
this->lb_loader_file_offset.value());
#endif
nonstd::optional<std::chrono::system_clock::time_point> wait_start;
if (this->lb_loader_future.wait_for(std::chrono::seconds(0))
!= std::future_status::ready)
{
wait_start
= nonstd::make_optional(std::chrono::system_clock::now());
}
retval = this->lb_loader_future.get();
if (wait_start) {
auto diff = std::chrono::system_clock::now() - wait_start.value();
log_debug("wait done! %d", diff.count());
}
// log_debug("got preload");
this->lb_loader_future = {};
this->lb_file_offset = this->lb_loader_file_offset.value();
this->lb_loader_file_offset = nonstd::nullopt;
this->lb_buffer.swap(this->lb_alt_buffer.value());
this->lb_alt_buffer.value().clear();
this->lb_line_starts = std::move(this->lb_alt_line_starts);
this->lb_alt_line_starts.clear();
this->lb_line_is_utf = std::move(this->lb_alt_line_is_utf);
this->lb_alt_line_is_utf.clear();
}
if (this->in_range(start) && this->in_range(start + max_length - 1)) {
/* Cache already has the data, nothing to do. */
retval = true;
if (this->lb_seekable && this->lb_buffer.full()
&& !this->lb_loader_file_offset) {
// log_debug("loader available start=%d", start);
auto last_lf_iter = std::find(
this->lb_buffer.rbegin(), this->lb_buffer.rend(), '\n');
if (last_lf_iter != this->lb_buffer.rend()) {
auto usable_size
= std::distance(last_lf_iter, this->lb_buffer.rend());
// log_debug("found linefeed %d", usable_size);
if (!this->lb_alt_buffer) {
// log_debug("allocating new buffer!");
this->lb_alt_buffer
= auto_buffer::alloc(this->lb_buffer.capacity());
}
this->lb_alt_buffer->resize(this->lb_buffer.size()
- usable_size);
memcpy(this->lb_alt_buffer.value().begin(),
this->lb_buffer.at(usable_size),
this->lb_alt_buffer->size());
this->lb_loader_file_offset
= this->lb_file_offset + usable_size;
#if 0
log_debug("load offset %d",
this->lb_loader_file_offset.value());
log_debug("launch loader");
#endif
auto prom = std::make_shared<std::promise<bool>>();
this->lb_loader_future = prom->get_future();
isc::to<io_looper&, io_looper_tag>().send(
[this, prom](auto& ioloop) mutable {
prom->set_value(this->load_next_buffer());
});
}
}
} else if (this->lb_fd != -1) {
ssize_t rc;
/* Make sure there is enough space, then */
this->ensure_available(start, max_length);
safe::WriteAccess<safe_gz_indexed> gi(this->lb_gz_file);
/* ... read in the new data. */
if (this->lb_gz_file) {
if (*gi) {
// log_debug("old decomp start");
if (this->lb_file_size != (ssize_t) -1 && this->in_range(start)
&& this->in_range(this->lb_file_size - 1))
{
rc = 0;
} else {
rc = this->lb_gz_file.read(
&this->lb_buffer[this->lb_buffer_size],
this->lb_file_offset + this->lb_buffer_size,
this->lb_buffer_max - this->lb_buffer_size);
this->lb_compressed_offset
= this->lb_gz_file.get_source_offset();
if (rc != -1
&& (rc < (this->lb_buffer_max - this->lb_buffer_size))) {
rc = gi->read(this->lb_buffer.end(),
this->lb_file_offset + this->lb_buffer.size(),
this->lb_buffer.available());
this->lb_compressed_offset = gi->get_source_offset();
if (rc != -1 && (rc < this->lb_buffer.available())) {
this->lb_file_size
= (this->lb_file_offset + this->lb_buffer_size + rc);
= (this->lb_file_offset + this->lb_buffer.size() + rc);
}
}
// log_debug("old decomp end");
}
#ifdef HAVE_BZLIB_H
else if (this->lb_bz_file)
@ -553,7 +813,7 @@ line_buffer::fill_range(file_off_t start, ssize_t max_length)
}
}
seek_to = this->lb_file_offset + this->lb_buffer_size;
seek_to = this->lb_file_offset + this->lb_buffer.size();
while (seek_to > 0) {
int count;
@ -564,33 +824,39 @@ line_buffer::fill_range(file_off_t start, ssize_t max_length)
seek_to -= count;
}
rc = BZ2_bzread(bz_file,
&this->lb_buffer[this->lb_buffer_size],
this->lb_buffer_max - this->lb_buffer_size);
this->lb_buffer.end(),
this->lb_buffer.available());
this->lb_compressed_offset = lseek(bzfd, 0, SEEK_SET);
BZ2_bzclose(bz_file);
if (rc != -1
&& (rc < (this->lb_buffer_max - this->lb_buffer_size))) {
if (rc != -1 && (rc < (this->lb_buffer.available()))) {
this->lb_file_size
= (this->lb_file_offset + this->lb_buffer_size + rc);
= (this->lb_file_offset + this->lb_buffer.size() + rc);
}
}
}
#endif
else if (this->lb_seekable)
{
#if 1
log_debug("doing pread file_off=%d read_off=%d count=%d",
this->lb_file_offset,
this->lb_file_offset + this->lb_buffer.size(),
this->lb_buffer.available());
#endif
rc = pread(this->lb_fd,
&this->lb_buffer[this->lb_buffer_size],
this->lb_buffer_max - this->lb_buffer_size,
this->lb_file_offset + this->lb_buffer_size);
this->lb_buffer.end(),
this->lb_buffer.available(),
this->lb_file_offset + this->lb_buffer.size());
// log_debug("pread rc %d", rc);
} else {
rc = read(this->lb_fd,
&this->lb_buffer[this->lb_buffer_size],
this->lb_buffer_max - this->lb_buffer_size);
this->lb_buffer.end(),
this->lb_buffer.available());
}
// XXX For some reason, cygwin is giving us a bogus return value when
// up to the end of the file.
if (rc > (this->lb_buffer_max - this->lb_buffer_size)) {
if (rc > (this->lb_buffer.available())) {
rc = -1;
#ifdef ENODATA
errno = ENODATA;
@ -602,13 +868,13 @@ line_buffer::fill_range(file_off_t start, ssize_t max_length)
case 0:
if (!this->lb_seekable) {
this->lb_file_size
= this->lb_file_offset + this->lb_buffer_size;
= this->lb_file_offset + this->lb_buffer.size();
}
if (start < (file_off_t) this->lb_file_size) {
retval = true;
}
if (this->lb_gz_file || this->lb_bz_file) {
if (this->lb_compressed) {
/*
* For compressed files, increase the buffer size so we
* don't have to spend as much time uncompressing the data.
@ -635,12 +901,46 @@ line_buffer::fill_range(file_off_t start, ssize_t max_length)
break;
default:
this->lb_buffer_size += rc;
this->lb_buffer.resize_by(rc);
retval = true;
break;
}
ensure(this->lb_buffer_size <= this->lb_buffer_max);
if (this->lb_seekable && this->lb_buffer.full()
&& !this->lb_loader_file_offset) {
// log_debug("loader available2 start=%d", start);
auto last_lf_iter = std::find(
this->lb_buffer.rbegin(), this->lb_buffer.rend(), '\n');
if (last_lf_iter != this->lb_buffer.rend()) {
auto usable_size
= std::distance(last_lf_iter, this->lb_buffer.rend());
// log_debug("found linefeed %d", usable_size);
if (!this->lb_alt_buffer) {
// log_debug("allocating new buffer!");
this->lb_alt_buffer
= auto_buffer::alloc(this->lb_buffer.capacity());
}
this->lb_alt_buffer->resize(this->lb_buffer.size()
- usable_size);
memcpy(this->lb_alt_buffer->begin(),
this->lb_buffer.at(usable_size),
this->lb_alt_buffer->size());
this->lb_loader_file_offset
= this->lb_file_offset + usable_size;
#if 0
log_debug("load offset %d",
this->lb_loader_file_offset.value());
log_debug("launch loader");
#endif
auto prom = std::make_shared<std::promise<bool>>();
this->lb_loader_future = prom->get_future();
isc::to<io_looper&, io_looper_tag>().send(
[this, prom](auto& ioloop) mutable {
prom->set_value(this->load_next_buffer());
});
}
}
ensure(this->lb_buffer.size() <= this->lb_buffer.capacity());
}
return retval;
@ -655,26 +955,54 @@ line_buffer::load_next_line(file_range prev_line)
require(this->lb_fd != -1);
auto offset = prev_line.next_offset();
ssize_t request_size = this->lb_buffer_size == 0 ? DEFAULT_INCREMENT
: INITIAL_REQUEST_SIZE;
ssize_t request_size = INITIAL_REQUEST_SIZE;
retval.li_file_range.fr_offset = offset;
if (this->lb_buffer.empty()) {
this->fill_range(offset, this->lb_buffer.capacity());
} else if (offset == this->lb_file_offset + this->lb_buffer.size()) {
if (!this->fill_range(offset, INITIAL_REQUEST_SIZE)) {
retval.li_file_range.fr_offset = offset;
retval.li_file_range.fr_size = 0;
if (this->is_pipe()) {
retval.li_partial = !this->is_pipe_closed();
} else {
retval.li_partial = true;
}
return Ok(retval);
}
}
while (!done) {
auto old_retval_size = retval.li_file_range.fr_size;
char *line_start, *lf;
this->fill_range(offset, request_size);
/* Find the data in the cache and */
line_start = this->get_range(offset, retval.li_file_range.fr_size);
/* ... look for the end-of-line or end-of-file. */
ssize_t utf8_end = -1;
#ifdef HAVE_X86INTRIN_H
if (!validate_utf8_fast(
line_start, retval.li_file_range.fr_size, &utf8_end)) {
retval.li_valid_utf = false;
bool found_in_cache = false;
if (!this->lb_line_starts.empty()) {
auto buffer_offset = offset - this->lb_file_offset;
auto start_iter = std::lower_bound(this->lb_line_starts.begin(),
this->lb_line_starts.end(),
buffer_offset);
if (start_iter != this->lb_line_starts.end()) {
auto next_line_iter = start_iter + 1;
// log_debug("found offset %d %d", buffer_offset, *start_iter);
if (next_line_iter != this->lb_line_starts.end()) {
utf8_end = *next_line_iter - 1 - *start_iter;
found_in_cache = true;
} else {
// log_debug("no next iter");
}
} else {
// log_debug("no buffer_offset found");
}
}
#else
{
if (!found_in_cache) {
const char* msg;
int faulty_bytes;
@ -689,18 +1017,24 @@ line_buffer::load_next_line(file_range prev_line)
retval.li_valid_utf = false;
}
}
#endif
if (utf8_end >= 0) {
lf = line_start + utf8_end;
} else {
lf = nullptr;
}
auto got_new_data = old_retval_size != retval.li_file_range.fr_size;
#if 0
log_debug("load next loop %p reqsize %d lsize %d",
lf,
request_size,
retval.li_file_range.fr_size);
#endif
if (lf != nullptr
|| (retval.li_file_range.fr_size >= MAX_LINE_BUFFER_SIZE)
|| (request_size == MAX_LINE_BUFFER_SIZE)
|| ((request_size > retval.li_file_range.fr_size)
&& (retval.li_file_range.fr_size > 0)
|| (!got_new_data
&& (!this->is_pipe() || request_size > DEFAULT_INCREMENT)))
{
if ((lf != nullptr)
@ -756,14 +1090,22 @@ line_buffer::load_next_line(file_range prev_line)
request_size += DEFAULT_INCREMENT;
}
if (!done && !this->fill_range(offset, request_size)) {
if (!done
&& !this->fill_range(
offset,
std::max(request_size, (ssize_t) this->lb_buffer.available())))
{
break;
}
}
ensure(retval.li_file_range.fr_size <= this->lb_buffer_size);
ensure(retval.li_file_range.fr_size <= this->lb_buffer.size());
ensure(this->invariant());
#if 0
log_debug("got line part %d %d",
retval.li_file_range.fr_offset,
(int) retval.li_partial);
#endif
return Ok(retval);
}
@ -783,8 +1125,12 @@ line_buffer::read_range(const file_range fr)
return Err(std::string("out-of-bounds"));
}
if (!this->fill_range(fr.fr_offset, fr.fr_size)) {
return Err(std::string("unable to read file"));
if (!(this->in_range(fr.fr_offset)
&& this->in_range(fr.fr_offset + fr.fr_size - 1)))
{
if (!this->fill_range(fr.fr_offset, fr.fr_size)) {
return Err(std::string("unable to read file"));
}
}
line_start = this->get_range(fr.fr_offset, avail);
@ -794,13 +1140,14 @@ line_buffer::read_range(const file_range fr)
}
retval.share(this->lb_share_manager, line_start, fr.fr_size);
return Ok(retval);
return Ok(std::move(retval));
}
file_range
line_buffer::get_available()
{
return {this->lb_file_offset, this->lb_buffer_size};
return {this->lb_file_offset,
static_cast<file_ssize_t>(this->lb_buffer.size())};
}
line_buffer::gz_indexed::indexDict::indexDict(const z_stream& s,

@ -33,6 +33,7 @@
#define line_buffer_hh
#include <exception>
#include <future>
#include <vector>
#include <errno.h>
@ -45,6 +46,7 @@
#include "base/file_range.hh"
#include "base/lnav_log.hh"
#include "base/result.h"
#include "safe/safe.h"
#include "shared_buffer.hh"
struct line_info {
@ -139,7 +141,7 @@ public:
/** Construct an empty line_buffer. */
line_buffer();
line_buffer(line_buffer&& other) = default;
line_buffer(line_buffer&& other) = delete;
virtual ~line_buffer();
@ -177,16 +179,15 @@ public:
bool is_compressed() const
{
return this->lb_gz_file || this->lb_bz_file;
return this->lb_compressed;
}
file_off_t get_read_offset(file_off_t off) const
{
if (this->is_compressed()) {
return this->lb_compressed_offset;
} else {
return off;
}
return off;
}
bool is_data_available(file_off_t off, file_off_t stat_size) const
@ -212,9 +213,13 @@ public:
bool is_likely_to_flush(file_range prev_line);
void clear()
void flush_at(file_off_t off)
{
this->lb_buffer_size = 0;
if (this->in_range(off)) {
this->lb_buffer.resize(off - this->lb_file_offset);
} else {
this->lb_buffer.clear();
}
}
/** Release any resources held by this object. */
@ -224,15 +229,14 @@ public:
this->lb_file_offset = 0;
this->lb_file_size = (ssize_t) -1;
this->lb_buffer_size = 0;
this->lb_buffer.resize(0);
this->lb_last_line_offset = -1;
}
/** Check the invariants for this object. */
bool invariant() const
{
require(this->lb_buffer != nullptr);
require(this->lb_buffer_size <= this->lb_buffer_max);
require(this->lb_buffer.size() <= this->lb_buffer.capacity());
return true;
}
@ -245,7 +249,7 @@ private:
bool in_range(file_off_t off) const
{
return this->lb_file_offset <= off
&& off < (this->lb_file_offset + this->lb_buffer_size);
&& off < (this->lb_file_offset + this->lb_buffer.size());
}
void resize_buffer(size_t new_max);
@ -284,43 +288,56 @@ private:
* @return A pointer to the start of the cached data in the internal
* buffer.
*/
char* get_range(file_off_t start, file_ssize_t& avail_out) const
char* get_range(file_off_t start, file_ssize_t& avail_out)
{
auto buffer_offset = start - this->lb_file_offset;
char* retval;
require(buffer_offset >= 0);
require(this->lb_buffer_size >= buffer_offset);
require(this->lb_buffer.size() >= buffer_offset);
retval = &this->lb_buffer[buffer_offset];
avail_out = this->lb_buffer_size - buffer_offset;
retval = this->lb_buffer.at(buffer_offset);
avail_out = this->lb_buffer.size() - buffer_offset;
return retval;
}
bool load_next_buffer();
using safe_gz_indexed = safe::Safe<gz_indexed>;
shared_buffer lb_share_manager;
auto_fd lb_fd; /*< The file to read data from. */
gz_indexed lb_gz_file; /*< File reader for gzipped files. */
bool lb_bz_file; /*< Flag set for bzip2 compressed files. */
file_off_t lb_compressed_offset; /*< The offset into the compressed file. */
auto_mem<char> lb_buffer; /*< The internal buffer where data is cached */
file_ssize_t lb_file_size; /*<
* The size of the file. When lb_fd refers to
* a pipe, this is set to the amount of data
* read from the pipe when EOF is reached.
*/
file_off_t lb_file_offset; /*<
* Data cached in the buffer comes from this
* offset in the file.
*/
time_t lb_file_time;
ssize_t lb_buffer_size; /*< The amount of cached data in the buffer. */
ssize_t lb_buffer_max; /*< The amount of allocated memory for the
* buffer. */
bool lb_seekable; /*< Flag set for seekable file descriptors. */
file_off_t lb_last_line_offset; /*< */
safe_gz_indexed lb_gz_file; /*< File reader for gzipped files. */
bool lb_bz_file{false}; /*< Flag set for bzip2 compressed files. */
auto_buffer lb_buffer{auto_buffer::alloc(DEFAULT_LINE_BUFFER_SIZE)};
nonstd::optional<auto_buffer> lb_alt_buffer;
std::vector<uint32_t> lb_alt_line_starts;
std::vector<bool> lb_alt_line_is_utf;
std::future<bool> lb_loader_future;
nonstd::optional<file_off_t> lb_loader_file_offset;
file_off_t lb_compressed_offset{
0}; /*< The offset into the compressed file. */
file_ssize_t lb_file_size{
-1}; /*<
* The size of the file. When lb_fd refers to
* a pipe, this is set to the amount of data
* read from the pipe when EOF is reached.
*/
file_off_t lb_file_offset{0}; /*<
* Data cached in the buffer comes from this
* offset in the file.
*/
time_t lb_file_time{0};
bool lb_seekable{false}; /*< Flag set for seekable file descriptors. */
bool lb_compressed{false};
file_off_t lb_last_line_offset{-1}; /*< */
std::vector<uint32_t> lb_line_starts;
std::vector<bool> lb_line_is_utf;
};
#endif

@ -1365,8 +1365,9 @@ com_save_to(exec_context& ec,
lnav_data.ld_preview_status_source.get_description().set_value(
"First lines of file: %s", split_args[0].c_str());
} else {
retval = "info: Wrote " + std::to_string(line_count) + " rows to "
+ split_args[0];
retval = fmt::format(FMT_STRING("info: Wrote {:L} rows to {}"),
line_count,
split_args[0]);
}
if (toclose != nullptr) {
closer(toclose);

@ -409,7 +409,10 @@ log_format::check_for_new_year(std::vector<logline>& dst,
* XXX This needs some cleanup.
*/
struct json_log_userdata {
json_log_userdata(shared_buffer_ref& sbr) : jlu_shared_buffer(sbr) {}
json_log_userdata(shared_buffer_ref& sbr, scan_batch_context* sbc)
: jlu_shared_buffer(sbr), jlu_batch_context(sbc)
{
}
external_log_format* jlu_format{nullptr};
const logline* jlu_line{nullptr};
@ -420,6 +423,7 @@ struct json_log_userdata {
size_t jlu_line_size{0};
size_t jlu_sub_start{0};
shared_buffer_ref& jlu_shared_buffer;
scan_batch_context* jlu_batch_context;
};
static int read_json_field(yajlpp_parse_context* ypc,
@ -472,8 +476,8 @@ read_json_int(yajlpp_parse_context* ypc, long long val)
pcre_input pi(level_buf);
pcre_context::capture_t level_cap = {0, (int) strlen(level_buf)};
jlu->jlu_base_line->set_level(
jlu->jlu_format->convert_level(pi, &level_cap));
jlu->jlu_base_line->set_level(jlu->jlu_format->convert_level(
pi, &level_cap, jlu->jlu_batch_context));
} else {
std::vector<std::pair<int64_t, log_level_t>>::iterator iter;
@ -690,7 +694,7 @@ external_log_format::scan(logfile& lf,
yajlpp_parse_context& ypc = *(this->jlf_parse_context);
logline ll(li.li_file_range.fr_offset, 0, 0, LEVEL_INFO);
yajl_handle handle = this->jlf_yajl_handle.get();
json_log_userdata jlu(sbr);
json_log_userdata jlu(sbr, &sbc);
if (!this->lf_specialized && dst.size() >= 3) {
return log_format::SCAN_NO_MATCH;
@ -842,7 +846,7 @@ external_log_format::scan(logfile& lf,
}
}
auto level = this->convert_level(pi, level_cap);
auto level = this->convert_level(pi, level_cap, &sbc);
this->lf_timestamp_flags = log_time_tm.et_flags;
@ -901,11 +905,13 @@ external_log_format::scan(logfile& lf,
pattern& mod_pat
= *mod_elf->elf_pattern_order[mod_pat_index];
if (mod_pat.p_pcre->match(mod_pc, mod_pi)) {
if (mod_pat.p_pcre->match(
mod_pc, mod_pi, PCRE_NO_UTF8_CHECK)) {
auto* mod_level_cap
= mod_pc[mod_pat.p_level_field_index];
level = mod_elf->convert_level(mod_pi, mod_level_cap);
level = mod_elf->convert_level(
mod_pi, mod_level_cap, &sbc);
}
}
}
@ -974,6 +980,7 @@ external_log_format::scan(logfile& lf,
if (this->lf_specialized && !this->lf_multiline) {
auto& last_line = dst.back();
log_debug("invalid line %d %d", dst.size(), li.li_file_range.fr_offset);
dst.emplace_back(li.li_file_range.fr_offset,
last_line.get_timeval(),
log_level_t::LEVEL_INVALID);
@ -1246,19 +1253,20 @@ read_json_field(yajlpp_parse_context* ypc, const unsigned char* str, size_t len)
pcre_context_static<30> pc;
pcre_input pi(field_name);
if (jlu->jlu_format->elf_level_pointer.match(pc, pi)) {
if (jlu->jlu_format->elf_level_pointer.match(
pc, pi, PCRE_NO_UTF8_CHECK)) {
pcre_input pi_level((const char*) str, 0, len);
pcre_context::capture_t level_cap = {0, (int) len};
jlu->jlu_base_line->set_level(
jlu->jlu_format->convert_level(pi_level, &level_cap));
jlu->jlu_base_line->set_level(jlu->jlu_format->convert_level(
pi_level, &level_cap, jlu->jlu_batch_context));
}
} else if (jlu->jlu_format->elf_level_field == field_name) {
pcre_input pi((const char*) str, 0, len);
pcre_context::capture_t level_cap = {0, (int) len};
jlu->jlu_base_line->set_level(
jlu->jlu_format->convert_level(pi, &level_cap));
jlu->jlu_base_line->set_level(jlu->jlu_format->convert_level(
pi, &level_cap, jlu->jlu_batch_context));
} else if (jlu->jlu_format->elf_opid_field == field_name) {
uint8_t opid = hash_str((const char*) str, len);
jlu->jlu_base_line->set_opid(opid);
@ -1359,7 +1367,7 @@ external_log_format::get_subline(const logline& ll,
{
yajlpp_parse_context& ypc = *(this->jlf_parse_context);
yajl_handle handle = this->jlf_yajl_handle.get();
json_log_userdata jlu(sbr);
json_log_userdata jlu(sbr, nullptr);
this->jlf_share_manager.invalidate_refs();
this->jlf_cached_line.clear();
@ -2025,7 +2033,7 @@ external_log_format::build(std::vector<lnav::console::user_message>& errors)
.append(" property")));
}
log_level_t level = this->convert_level(pi, level_cap);
log_level_t level = this->convert_level(pi, level_cap, nullptr);
if (elf_sample.s_level != LEVEL_UNKNOWN
&& elf_sample.s_level != level) {
@ -2565,12 +2573,37 @@ external_log_format::value_line_count(const intern_string_t ist,
}
log_level_t
external_log_format::convert_level(
const pcre_input& pi, const pcre_context::capture_t* level_cap) const
external_log_format::convert_level(const pcre_input& pi,
const pcre_context::capture_t* level_cap,
scan_batch_context* sbc) const
{
log_level_t retval = LEVEL_INFO;
if (level_cap != nullptr && level_cap->is_valid()) {
if (sbc != nullptr && sbc->sbc_cached_level_count > 0) {
auto sf = pi.get_string_fragment(level_cap);
auto cached_level_iter
= std::find(std::begin(sbc->sbc_cached_level_strings),
std::begin(sbc->sbc_cached_level_strings)
+ sbc->sbc_cached_level_count,
sf);
if (cached_level_iter
!= std::begin(sbc->sbc_cached_level_strings)
+ sbc->sbc_cached_level_count)
{
auto cache_index
= std::distance(std::begin(sbc->sbc_cached_level_strings),
cached_level_iter);
if (cache_index != 0) {
std::swap(sbc->sbc_cached_level_strings[cache_index],
sbc->sbc_cached_level_strings[0]);
std::swap(sbc->sbc_cached_level_values[cache_index],
sbc->sbc_cached_level_values[0]);
}
return sbc->sbc_cached_level_values[0];
}
}
pcre_context_static<128> pc_level;
pcre_input pi_level(
pi.get_substr_start(level_cap), 0, level_cap->length());
@ -2579,13 +2612,28 @@ external_log_format::convert_level(
retval = string2level(pi_level.get_string(), level_cap->length());
} else {
for (const auto& elf_level_pattern : this->elf_level_patterns) {
if (elf_level_pattern.second.lp_pcre->match(pc_level, pi_level))
if (elf_level_pattern.second.lp_pcre->match(
pc_level, pi_level, PCRE_NO_UTF8_CHECK))
{
retval = elf_level_pattern.first;
break;
}
}
}
if (sbc != nullptr && level_cap->length() < 10) {
size_t cache_index;
if (sbc->sbc_cached_level_count == 4) {
cache_index = sbc->sbc_cached_level_count - 1;
} else {
cache_index = sbc->sbc_cached_level_count;
sbc->sbc_cached_level_count += 1;
}
sbc->sbc_cached_level_strings[cache_index]
= std::string(pi_level.get_string(), pi_level.pi_length);
sbc->sbc_cached_level_values[cache_index] = retval;
}
}
return retval;

@ -314,7 +314,8 @@ public:
}
log_level_t convert_level(const pcre_input& pi,
const pcre_context::capture_t* level_cap) const;
const pcre_context::capture_t* level_cap,
scan_batch_context* sbc) const;
using mod_map_t = std::map<intern_string_t, module_format>;
static mod_map_t MODULE_FORMATS;

@ -32,8 +32,6 @@
#ifndef lnav_log_format_fwd_hh
#define lnav_log_format_fwd_hh
#include <unordered_map>
#include <sys/types.h>
#include "ArenaAlloc/arenaalloc.h"
@ -42,6 +40,7 @@
#include "byte_array.hh"
#include "log_level.hh"
#include "ptimec.hh"
#include "robin_hood/robin_hood.h"
class log_format;
@ -50,16 +49,17 @@ struct opid_time_range {
struct timeval otr_end;
};
using log_opid_map = std::unordered_map<
string_fragment,
opid_time_range,
frag_hasher,
std::equal_to<string_fragment>,
ArenaAlloc::Alloc<std::pair<const string_fragment, opid_time_range>>>;
using log_opid_map = robin_hood::unordered_map<string_fragment,
opid_time_range,
frag_hasher,
std::equal_to<string_fragment>>;
struct scan_batch_context {
ArenaAlloc::Alloc<char>& sbc_allocator;
log_opid_map sbc_opids;
std::string sbc_cached_level_strings[4];
log_level_t sbc_cached_level_values[4];
size_t sbc_cached_level_count{0};
};
/**

@ -127,6 +127,7 @@ logfile::open(std::string filename, logfile_open_options& loo)
logfile::logfile(std::string filename, logfile_open_options& loo)
: lf_filename(std::move(filename)), lf_options(std::move(loo))
{
this->lf_opids.writeAccess()->reserve(64);
}
logfile::~logfile() {}
@ -414,12 +415,13 @@ logfile::rebuild_index(nonstd::optional<ui_clock::time_point> deadline)
this->lf_index.pop_back();
rollback_size += 1;
this->lf_line_buffer.clear();
if (!this->lf_index.empty()) {
auto last_line = this->lf_index.end();
--last_line;
auto check_line_off = last_line->get_offset();
auto last_length = ssize_t(this->line_length(last_line, false));
log_debug("flushing at %d", check_line_off);
this->lf_line_buffer.flush_at(check_line_off);
auto read_result = this->lf_line_buffer.read_range(
{check_line_off, last_length});
@ -463,6 +465,7 @@ logfile::rebuild_index(nonstd::optional<ui_clock::time_point> deadline)
"loading file... %s:%d", this->lf_filename.c_str(), begin_size);
}
scan_batch_context sbc{this->lf_allocator};
sbc.sbc_opids.reserve(32);
auto prev_range = file_range{off};
while (limit > 0) {
auto load_result = this->lf_line_buffer.load_next_line(prev_range);
@ -525,7 +528,8 @@ logfile::rebuild_index(nonstd::optional<ui_clock::time_point> deadline)
return rebuild_result_t::INVALID;
}
auto sbr = read_result.unwrap().rtrim(is_line_ending);
auto sbr = read_result.unwrap();
sbr.rtrim(is_line_ending);
this->lf_longest_line
= std::max(this->lf_longest_line, sbr.length());
this->lf_partial_line = li.li_partial;
@ -622,6 +626,7 @@ logfile::rebuild_index(nonstd::optional<ui_clock::time_point> deadline)
this->lf_index_size = prev_range.next_offset();
this->lf_stat = st;
log_debug("batch opid count %d", sbc.sbc_opids.size());
{
safe::WriteAccess<logfile::safe_opid_map> writable_opid_map(
this->lf_opids);

@ -34,7 +34,6 @@
#define logfile_hh
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
@ -420,7 +419,7 @@ private:
safe_notes lf_notes;
safe_opid_map lf_opids;
size_t lf_watch_count{0};
ArenaAlloc::Alloc<char> lf_allocator;
ArenaAlloc::Alloc<char> lf_allocator{64 * 1024};
nonstd::optional<std::pair<file_off_t, size_t>> lf_next_line_cache;
};

@ -63,10 +63,12 @@ int main(int argc, char *argv[])
const char* arg = argv[lpc];
printf(
"// %s\n"
"bool ptime_f%d(struct exttm *dst, const char *str, off_t &off, "
"ssize_t len) {\n"
" dst->et_flags = 0;\n"
" // log_debug(\"ptime_f%d\");\n",
arg,
lpc,
lpc);
for (int index = 0; arg[index]; arg++) {

@ -93,11 +93,13 @@ shared_buffer_ref::shared_buffer_ref(shared_buffer_ref&& other) noexcept
this->sb_data = nullptr;
this->sb_length = 0;
} else if (other.sb_owner != nullptr) {
other.sb_owner->add_ref(*this);
this->sb_owner = other.sb_owner;
this->sb_data = other.sb_data;
this->sb_length = other.sb_length;
other.disown();
auto owner_ref_iter = std::find(other.sb_owner->sb_refs.begin(),
other.sb_owner->sb_refs.end(),
&other);
*owner_ref_iter = this;
this->sb_owner = std::exchange(other.sb_owner, nullptr);
this->sb_data = std::exchange(other.sb_data, nullptr);
this->sb_length = std::exchange(other.sb_length, 0);
} else {
this->sb_owner = nullptr;
this->sb_data = other.sb_data;

@ -387,10 +387,8 @@ sql_gunzip(sqlite3_value* val)
auto len = sqlite3_value_bytes(val);
if (!lnav::gzip::is_gzipped((const char*) buffer, len)) {
auto retval = auto_buffer::alloc(len);
memcpy(retval.in(), buffer, len);
return blob_auto_buffer{std::move(retval)};
return blob_auto_buffer{
auto_buffer::from((const char*) buffer, len)};
}
auto res = lnav::gzip::uncompress("", buffer, len);

@ -31,6 +31,7 @@
#define textfile_sub_source_hh
#include <deque>
#include <unordered_map>
#include "filter_observer.hh"
#include "logfile.hh"

File diff suppressed because it is too large Load Diff

@ -139,7 +139,7 @@ timeslice(sqlite3_value* time_in, nonstd::optional<const char*> slice_in_opt)
auto actual_length
= sql_strftime(ts.in(), ts.size(), win_start.to_timeval());
ts.shrink_to(actual_length);
ts.resize(actual_length);
return text_auto_buffer{std::move(ts)};
}

@ -27,6 +27,8 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <algorithm>
#include <assert.h>
#include <stdlib.h>
@ -81,5 +83,20 @@ main(int argc, char* argv[])
assert(free_count == 3);
assert(last_free == &md1_val);
{
static const char* msg = "Hello, World!\nGoodbye, World!\nTest";
auto buf = auto_buffer::from(msg, strlen(msg));
auto first_lf = std::find(buf.begin(), buf.end(), '\n');
auto last_lf = std::find(buf.rbegin(), buf.rend(), '\n');
assert(std::distance(buf.begin(), first_lf) == 13);
assert(*first_lf == '\n');
assert(*last_lf == '\n');
auto last_lf_index = std::distance(last_lf, buf.rend()) - 1;
auto* last_lf_rchr = strrchr(msg, '\n');
assert(last_lf_index == (last_lf_rchr - msg));
}
return retval;
}

@ -79,7 +79,7 @@ main(int argc, char* argv[])
auto fd = auto_fd(mkstemp(fn_template));
remove(fn_template);
auto lb = line_buffer();
line_buffer lb;
write(fd, TEST_DATA, strlen(TEST_DATA));
lseek(fd, SEEK_SET, 0);

Loading…
Cancel
Save