[logfile_sub_source] try to avoid a full rebuild in some cases

This commit is contained in:
Timothy Stack 2021-04-24 14:38:26 -07:00
parent 2748171d2c
commit 94498878c8
14 changed files with 206 additions and 48 deletions

View File

@ -95,10 +95,12 @@ TEST_AGENTS = [
def access_log_msgs():
while True:
now = datetime.datetime.now()
ts = now - datetime.timedelta(seconds=1)
yield '%s - %s [%s +0000] "%s %s %s" %s %s "%s" "%s"\n' % (
random.choice(TEST_ADDRESSES),
random.choice(TEST_USERNAMES),
datetime.datetime.now().strftime(ACCESS_LOG_DATE_FMT),
ts.strftime(ACCESS_LOG_DATE_FMT),
random.choice(TEST_METHODS),
random.choice(TEST_URLS),
random.choice(TEST_VERSIONS),
@ -155,8 +157,8 @@ while True:
for i in range(random.randrange(0, 4)):
with open(fname, "a+") as fp:
fp.write(gen.next())
if random.uniform(0.0, 1.0) < 0.010:
fp.truncate(0)
time.sleep(random.uniform(0.00, 0.01))
if random.uniform(0.0, 1.0) < 0.001:
os.remove(fname)
#if random.uniform(0.0, 1.0) < 0.010:
# fp.truncate(0)
time.sleep(random.uniform(0.05, 0.10))
#if random.uniform(0.0, 1.0) < 0.001:
# os.remove(fname)

View File

@ -78,6 +78,12 @@ struct big_array {
return this->ba_size;
};
void shrink_to(size_t new_size) {
require(new_size <= this->ba_size);
this->ba_size = new_size;
}
bool empty() const {
return this->ba_size == 0;
};

View File

@ -111,6 +111,10 @@ void hist_source2::text_attrs_for_line(textview_curses &tc, int row,
void hist_source2::add_value(time_t row, hist_source2::hist_type_t htype,
double value)
{
if (row < this->hs_last_row) {
log_error("time mismatch %ld %ld", row, this->hs_last_row);
}
require(row >= this->hs_last_row);
row = rounddown(row, this->hs_time_slice);

View File

@ -263,6 +263,11 @@ template<>
void force_linking(services::curl_streamer_t anno)
{
}
template<>
void force_linking(services::remote_tailer_t anno)
{
}
}
bool setup_logline_table(exec_context &ec)

View File

@ -664,3 +664,16 @@ intern_string_t logfile::get_format_name() const
return {};
}
nonstd::optional<logfile::const_iterator>
logfile::find_from_time(const timeval &tv) const
{
auto retval = std::lower_bound(this->lf_index.begin(),
this->lf_index.end(),
tv);
if (retval == this->lf_index.end()) {
return nonstd::nullopt;
}
return retval;
}

View File

@ -244,6 +244,8 @@ public:
/** @return The number of lines in the index. */
size_t size() const { return this->lf_index.size(); }
nonstd::optional<const_iterator> find_from_time(const struct timeval& tv) const;
logline &operator[](int index) { return this->lf_index[index]; };
logline &front() {

View File

@ -120,11 +120,13 @@ shared_ptr<logfile> logfile_sub_source::find(const char *fn,
for (iter = this->lss_files.begin();
iter != this->lss_files.end() && retval == nullptr;
iter++) {
logfile_data &ld = *(*iter);
if (ld.get_file() == nullptr) {
auto &ld = *(*iter);
auto lf = ld.get_file_ptr();
if (lf == nullptr) {
continue;
}
if (strcmp(ld.get_file()->get_filename().c_str(), fn) == 0) {
if (strcmp(lf->get_filename().c_str(), fn) == 0) {
retval = ld.get_file();
}
else {
@ -583,28 +585,32 @@ logfile_sub_source::rebuild_result logfile_sub_source::rebuild_index()
int file_count = 0;
bool force = this->lss_force_rebuild;
rebuild_result retval = rebuild_result::rr_no_change;
nonstd::optional<struct timeval> lowest_tv = nonstd::nullopt;
vis_line_t search_start = 0_vl;
this->lss_force_rebuild = false;
if (force) {
log_debug("forced to full rebuild");
retval = rebuild_result::rr_full_rebuild;
}
for (iter = this->lss_files.begin();
iter != this->lss_files.end();
iter++) {
logfile_data &ld = *(*iter);
auto &ld = *(*iter);
auto lf = ld.get_file_ptr();
if (ld.get_file() == NULL) {
if (lf == nullptr) {
if (ld.ld_lines_indexed > 0) {
log_debug("%d: file closed, doing full rebuild",
ld.ld_file_index);
force = true;
retval = rebuild_result::rr_full_rebuild;
}
}
else {
logfile &lf = *ld.get_file();
if (!this->tss_view->is_paused()) {
switch (lf.rebuild_index()) {
switch (lf->rebuild_index()) {
case logfile::RR_NO_NEW_LINES:
// No changes
break;
@ -613,8 +619,8 @@ logfile_sub_source::rebuild_result logfile_sub_source::rebuild_index()
retval = rebuild_result::rr_appended_lines;
}
if (!this->lss_index.empty() &&
lf.size() > ld.ld_lines_indexed) {
logline &new_file_line = lf[ld.ld_lines_indexed];
lf->size() > ld.ld_lines_indexed) {
logline &new_file_line = (*lf)[ld.ld_lines_indexed];
content_line_t cl = this->lss_index.back();
logline *last_indexed_line = this->find_line(cl);
@ -623,25 +629,42 @@ logfile_sub_source::rebuild_result logfile_sub_source::rebuild_index()
if (last_indexed_line == nullptr ||
new_file_line <
last_indexed_line->get_timeval()) {
force = true;
retval = rebuild_result::rr_full_rebuild;
log_debug("%s:%ld: found older lines, full "
"rebuild: %p %lld < %lld",
lf->get_filename().c_str(),
ld.ld_lines_indexed,
last_indexed_line,
new_file_line.get_time_in_millis(),
last_indexed_line->get_time_in_millis());
if (retval <= rebuild_result::rr_partial_rebuild) {
retval = rebuild_result::rr_partial_rebuild;
if (!lowest_tv) {
lowest_tv = new_file_line.get_timeval();
} else if (new_file_line.get_timeval() <
lowest_tv.value()) {
lowest_tv = new_file_line.get_timeval();
}
}
}
}
break;
case logfile::RR_INVALID:
case logfile::RR_NEW_ORDER:
log_debug("%s: log file has a new order, full rebuild",
lf->get_filename().c_str());
retval = rebuild_result::rr_full_rebuild;
force = true;
break;
}
}
file_count += 1;
total_lines += (*iter)->get_file()->size();
total_lines += lf->size();
}
}
if (this->lss_index.reserve(total_lines)) {
force = true;
retval = rebuild_result::rr_full_rebuild;
}
if (force) {
@ -657,6 +680,66 @@ logfile_sub_source::rebuild_result logfile_sub_source::rebuild_index()
this->lss_longest_line = 0;
this->lss_basename_width = 0;
this->lss_filename_width = 0;
} else if (retval == rebuild_result::rr_partial_rebuild) {
size_t remaining = 0;
log_debug("partial rebuild with lowest time: %ld",
lowest_tv.value().tv_sec);
for (iter = this->lss_files.begin();
iter != this->lss_files.end();
iter++) {
logfile_data &ld = *(*iter);
auto lf = ld.get_file_ptr();
if (lf == nullptr) {
continue;
}
auto line_iter = lf->find_from_time(lowest_tv.value());
if (line_iter) {
log_debug("%s: lowest line time %ld; line %ld; size %ld",
lf->get_filename().c_str(),
line_iter.value()->get_timeval().tv_sec,
std::distance(lf->cbegin(), line_iter.value()),
lf->size());
}
ld.ld_lines_indexed = std::distance(
lf->cbegin(), line_iter.value_or(lf->cend()));
remaining += lf->size() - ld.ld_lines_indexed;
}
auto row_iter = lower_bound(this->lss_index.begin(),
this->lss_index.end(),
*lowest_tv,
logline_cmp(*this));
this->lss_index.shrink_to(std::distance(
this->lss_index.begin(), row_iter));
log_debug("new index size %ld/%ld; remain %ld",
this->lss_index.ba_size,
this->lss_index.ba_capacity,
remaining);
auto filt_row_iter = lower_bound(this->lss_filtered_index.begin(),
this->lss_filtered_index.end(),
*lowest_tv,
filtered_logline_cmp(*this));
this->lss_filtered_index.resize(std::distance(
this->lss_filtered_index.begin(), filt_row_iter));
search_start = vis_line_t(this->lss_filtered_index.size());
if (this->lss_index_delegate) {
this->lss_index_delegate->index_start(*this);
for (const auto row_in_full_index : this->lss_filtered_index) {
auto cl = this->lss_index[row_in_full_index];
uint64_t line_number;
auto ld_iter = this->find_data(cl, line_number);
auto& ld = *ld_iter;
auto line_iter = ld->get_file_ptr()->begin() + line_number;
this->lss_index_delegate->index_line(
*this, ld->get_file_ptr(), line_iter);
}
}
}
if (retval != rebuild_result::rr_no_change || force) {
@ -664,7 +747,7 @@ logfile_sub_source::rebuild_result logfile_sub_source::rebuild_index()
logline_cmp line_cmper(*this);
for (auto& ld : this->lss_files) {
std::shared_ptr<logfile> lf = ld->get_file();
auto lf = ld->get_file_ptr();
if (lf == nullptr) {
continue;
@ -679,7 +762,7 @@ logfile_sub_source::rebuild_result logfile_sub_source::rebuild_index()
if (full_sort) {
for (auto& ld : this->lss_files) {
shared_ptr<logfile> lf = ld->get_file();
auto lf = ld->get_file_ptr();
if (lf == nullptr) {
continue;
@ -704,7 +787,7 @@ logfile_sub_source::rebuild_result logfile_sub_source::rebuild_index()
iter != this->lss_files.end();
iter++) {
logfile_data *ld = iter->get();
shared_ptr<logfile> lf = ld->get_file();
auto lf = ld->get_file_ptr();
if (lf == nullptr) {
continue;
}
@ -725,7 +808,7 @@ logfile_sub_source::rebuild_result logfile_sub_source::rebuild_index()
}
int file_index = ld->ld_file_index;
int line_index = lf_iter - ld->get_file()->begin();
int line_index = lf_iter - ld->get_file_ptr()->begin();
content_line_t con_line(file_index * MAX_LINES_PER_FILE +
line_index);
@ -739,10 +822,13 @@ logfile_sub_source::rebuild_result logfile_sub_source::rebuild_index()
for (iter = this->lss_files.begin();
iter != this->lss_files.end();
iter++) {
if ((*iter)->get_file() == nullptr)
continue;
auto lf = (*iter)->get_file_ptr();
(*iter)->ld_lines_indexed = (*iter)->get_file()->size();
if (lf == nullptr) {
continue;
}
(*iter)->ld_lines_indexed = lf->size();
}
this->lss_filtered_index.reserve(this->lss_index.size());
@ -765,7 +851,8 @@ logfile_sub_source::rebuild_result logfile_sub_source::rebuild_index()
continue;
}
auto line_iter = (*ld)->get_file()->begin() + line_number;
auto lf = (*ld)->get_file_ptr();
auto line_iter = lf->begin() + line_number;
if (line_iter->is_ignored()) {
continue;
@ -777,9 +864,8 @@ logfile_sub_source::rebuild_result logfile_sub_source::rebuild_index()
this->check_extra_filters(ld, line_iter))) {
this->lss_filtered_index.push_back(index_index);
if (this->lss_index_delegate != nullptr) {
shared_ptr<logfile> lf = (*ld)->get_file();
this->lss_index_delegate->index_line(
*this, lf.get(), lf->begin() + line_number);
*this, lf, lf->begin() + line_number);
}
}
}
@ -793,8 +879,13 @@ logfile_sub_source::rebuild_result logfile_sub_source::rebuild_index()
case rebuild_result::rr_no_change:
break;
case rebuild_result::rr_full_rebuild:
log_debug("redoing search");
this->tss_view->redo_search();
break;
case rebuild_result::rr_partial_rebuild:
log_debug("redoing search from: %d", (int) search_start);
this->tss_view->search_new_data(search_start);
break;
case rebuild_result::rr_appended_lines:
this->tss_view->search_new_data();
break;
@ -889,7 +980,7 @@ log_accel::direction_t logfile_sub_source::get_line_accel_direction(
void logfile_sub_source::text_filters_changed()
{
for (auto& ld : *this) {
shared_ptr<logfile> lf = ld->get_file();
auto lf = ld->get_file_ptr();
if (lf != nullptr) {
ld->ld_filter_state.clear_deleted_filter_state();
@ -915,7 +1006,8 @@ void logfile_sub_source::text_filters_changed()
continue;
}
auto line_iter = (*ld)->get_file()->begin() + line_number;
auto lf = (*ld)->get_file_ptr();
auto line_iter = lf->begin() + line_number;
if (!this->tss_apply_filters ||
(!(*ld)->ld_filter_state.excluded(filtered_in_mask, filtered_out_mask,
@ -923,9 +1015,7 @@ void logfile_sub_source::text_filters_changed()
this->check_extra_filters(ld, line_iter))) {
this->lss_filtered_index.push_back(index_index);
if (this->lss_index_delegate != nullptr) {
shared_ptr<logfile> lf = (*ld)->get_file();
this->lss_index_delegate->index_line(
*this, lf.get(), lf->begin() + line_number);
this->lss_index_delegate->index_line(*this, lf, line_iter);
}
}
}
@ -1027,7 +1117,7 @@ void logfile_sub_source::set_sql_filter(std::string stmt_str, sqlite3_stmt *stmt
bool logfile_sub_source::eval_sql_filter(sqlite3_stmt *stmt, iterator ld, logfile::const_iterator ll)
{
auto lf = (*ld)->get_file();
auto lf = (*ld)->get_file_ptr();
char timestamp_buffer[64];
shared_buffer_ref sbr, raw_sbr;
lf->read_full_message(ll, sbr);

View File

@ -439,6 +439,7 @@ public:
enum class rebuild_result {
rr_no_change,
rr_appended_lines,
rr_partial_rebuild,
rr_full_rebuild,
};

View File

@ -440,13 +440,12 @@ static void rl_search_internal(readline_curses *rc, ln_mode_t mode, bool complet
return;
}
case LNM_PAGING:
case LNM_FILTER:
case LNM_FILES:
case LNM_EXEC:
case LNM_USER:
return;
default:
require(0);
break;
}
if (!complete) {

View File

@ -36,6 +36,7 @@ namespace services {
struct ui_t {};
struct curl_streamer_t {};
struct remote_tailer_t {};
}

View File

@ -1,4 +1,13 @@
include $(top_srcdir)/aminclude_static.am
AM_CPPFLAGS = \
$(CODE_COVERAGE_CPPFLAGS)
AM_LIBS = $(CODE_COVERAGE_LIBS)
AM_CFLAGS = $(CODE_COVERAGE_CFLAGS)
AM_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS)
noinst_LIBRARIES = libtailercommon.a libtailerpp.a
noinst_HEADERS = \

View File

@ -127,6 +127,16 @@ int main(int argc, char *const *argv)
return;
}
struct stat st;
if (fstat(fd, &st) == -1 || !S_ISREG(st.st_mode)) {
ghc::filesystem::remove_all(local_path);
send_packet(to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING, tob.tob_path.c_str(),
TPPT_DONE);
return;
}
auto_mem<char> buffer;
buffer = (char *) malloc(tob.tob_length);
@ -146,7 +156,7 @@ int main(int argc, char *const *argv)
return;
}
} else if (bytes_read == -1) {
ghc::filesystem::remove(local_path);
ghc::filesystem::remove_all(local_path);
}
send_packet(to_child.get(),
TPT_NEED_BLOCK,
@ -168,6 +178,7 @@ int main(int argc, char *const *argv)
if (fd == -1) {
perror("open");
} else {
ftruncate(fd, ttb.ptb_offset);
pwrite(fd, ttb.ttb_bits.data(), ttb.ttb_bits.size(), ttb.ptb_offset);
}
}

View File

@ -136,7 +136,7 @@ struct client_path_state *create_client_path_state(const char *path)
retval->cps_path = strdup(path);
retval->cps_last_path_state = PS_UNKNOWN;
memset(&retval->cps_last_stat, 0, sizeof(retval->cps_last_stat));
retval->cps_client_file_offset = 0;
retval->cps_client_file_offset = -1;
retval->cps_client_file_read_length = 0;
retval->cps_client_state = CS_INIT;
list_init(&retval->cps_children);
@ -198,7 +198,7 @@ void set_client_path_state_error(struct client_path_state *cps)
send_error(cps, "unable to open -- %s", strerror(errno));
}
cps->cps_last_path_state = PS_ERROR;
cps->cps_client_file_offset = 0;
cps->cps_client_file_offset = -1;
cps->cps_client_state = CS_INIT;
}
@ -306,6 +306,13 @@ int poll_paths(struct list *path_list)
int rc = lstat(curr->cps_path, &st);
if (rc == -1) {
memset(&st, 0, sizeof(st));
set_client_path_state_error(curr);
} else if (curr->cps_client_file_offset >= 0 &&
((curr->cps_last_stat.st_dev != st.st_dev &&
curr->cps_last_stat.st_ino != st.st_ino) ||
(st.st_size < curr->cps_last_stat.st_size))) {
send_error(curr, "replaced");
set_client_path_state_error(curr);
} else if (S_ISREG(st.st_mode)) {
switch (curr->cps_client_state) {
@ -318,9 +325,12 @@ int poll_paths(struct list *path_list)
set_client_path_state_error(curr);
} else {
char buffer[64 * 1024];
int64_t bytes_read = pread(fd,
buffer, sizeof(buffer),
curr->cps_client_file_offset);
int64_t bytes_read = pread(
fd,
buffer, sizeof(buffer),
curr->cps_client_file_offset < 0 ?
0 :
curr->cps_client_file_offset);
if (bytes_read == -1) {
set_client_path_state_error(curr);
@ -340,6 +350,10 @@ int poll_paths(struct list *path_list)
TPPT_DONE);
curr->cps_client_state = CS_OFFERED;
} else {
if (curr->cps_client_file_offset < 0) {
curr->cps_client_file_offset = 0;
}
send_packet(STDOUT_FILENO,
TPT_TAIL_BLOCK,
TPPT_STRING, curr->cps_path,
@ -407,6 +421,8 @@ int poll_paths(struct list *path_list)
}
}
curr->cps_last_stat = st;
curr = (struct client_path_state *) curr->cps_node.n_succ;
}
@ -503,7 +519,6 @@ int main(int argc, char *argv[])
timeout = 0;
} else {
timeout = 1000;
fprintf(stderr, "all synced!\n");
}
}
}

View File

@ -801,8 +801,8 @@ public:
}
}
void search_new_data() {
this->search_range(-1_vl);
void search_new_data(vis_line_t start = -1_vl) {
this->search_range(start);
if (this->tc_search_child) {
this->tc_search_child->get_grep_proc()->start();
}