From c3dc668b6943353d028cb4a4984a082341752d3b Mon Sep 17 00:00:00 2001 From: Timothy Stack Date: Wed, 19 May 2021 22:05:21 -0700 Subject: [PATCH] [remote] add some config options and remove the copied tailer binary Also fix time offset issue --- src/file_collection.cc | 4 +- src/file_collection.hh | 1 + src/internals/config-v1.schema.json | 53 +++++++ src/lnav.cc | 25 ++- src/lnav_commands.cc | 2 +- src/lnav_config.cc | 60 ++++++- src/lnav_config.hh | 2 + src/logfile.cc | 8 + src/logfile_fwd.hh | 7 + src/logfile_sub_source.cc | 9 +- src/ptimec.hh | 1 - src/ptimec_rt.cc | 8 +- src/root-config.json | 9 ++ src/tailer/CMakeLists.txt | 1 + src/tailer/Makefile.am | 1 + src/tailer/drive_tailer.cc | 3 + src/tailer/tailer.h | 1 + src/tailer/tailer.looper.cc | 232 ++++++++++++++++++++++++---- src/tailer/tailer.looper.cfg.hh | 47 ++++++ src/tailer/tailer.looper.hh | 27 +++- src/tailer/tailer.main.c | 65 ++++++-- src/tailer/tailerpp.cc | 11 ++ src/tailer/tailerpp.hh | 10 +- src/yajlpp/yajlpp.cc | 8 +- src/yajlpp/yajlpp.hh | 2 +- src/yajlpp/yajlpp_def.hh | 82 ++++++++++ test/Makefile.am | 12 ++ test/logfile_block.1 | 2 + test/logfile_block.2 | 2 + test/test_cmds.sh | 8 +- test/test_config.sh | 13 ++ test/test_remote.sh | 26 ++++ test/test_sql.sh | 12 ++ 33 files changed, 684 insertions(+), 70 deletions(-) create mode 100644 src/tailer/tailer.looper.cfg.hh create mode 100644 test/logfile_block.1 create mode 100644 test/logfile_block.2 create mode 100644 test/test_remote.sh diff --git a/src/file_collection.cc b/src/file_collection.cc index 0a9ce479..e44905b0 100644 --- a/src/file_collection.cc +++ b/src/file_collection.cc @@ -119,6 +119,8 @@ void file_collection::merge(const file_collection &other) this->fc_recursive = other.fc_recursive; this->fc_rotated = other.fc_rotated; + this->fc_synced_files.insert(other.fc_synced_files.begin(), + other.fc_synced_files.end()); this->fc_name_to_errors.insert(other.fc_name_to_errors.begin(), other.fc_name_to_errors.end()); this->fc_file_names.insert(other.fc_file_names.begin(), @@ -390,7 +392,7 @@ void file_collection::expand_filename(lnav::futures::future_queue() .send([=](auto &tlooper) { - tlooper.add_remote(rp); + tlooper.add_remote(rp, loo); }); retval.fc_other_files[path] = file_format_t::FF_REMOTE; diff --git a/src/file_collection.hh b/src/file_collection.hh index 9dbab7e1..1b2d2df8 100644 --- a/src/file_collection.hh +++ b/src/file_collection.hh @@ -62,6 +62,7 @@ struct file_collection { fc_renamed_files; std::set fc_closed_files; std::map fc_other_files; + std::set fc_synced_files; std::shared_ptr fc_progress; std::vector fc_new_stats; size_t fc_largest_path_length{0}; diff --git a/src/internals/config-v1.schema.json b/src/internals/config-v1.schema.json index 7e7e3288..aeba8d2c 100644 --- a/src/internals/config-v1.schema.json +++ b/src/internals/config-v1.schema.json @@ -62,6 +62,58 @@ } }, "additionalProperties": false + }, + "remote": { + "description": "Settings related to remote file support", + "title": "/tuning/remote", + "type": "object", + "properties": { + "ssh": { + "description": "Settings related to the ssh command used to contact remote machines", + "title": "/tuning/remote/ssh", + "type": "object", + "properties": { + "command": { + "title": "/tuning/remote/ssh/command", + "description": "The SSH command to execute", + "type": "string" + }, + "flags": { + "title": "/tuning/remote/ssh/flags", + "description": "The flags to pass to the SSH command", + "type": "string" + }, + "options": { + "description": "The options to pass to the SSH command", + "title": "/tuning/remote/ssh/options", + "type": "object", + "patternProperties": { + "(\\w+)": { + "title": "/tuning/remote/ssh/options/", + "description": "Set an option to be passed to the SSH command", + "type": "string" + } + }, + "additionalProperties": false + }, + "config": { + "description": "The ssh_config options to pass to SSH with the -o option", + "title": "/tuning/remote/ssh/config", + "type": "object", + "patternProperties": { + "(\\w+)": { + "title": "/tuning/remote/ssh/config/", + "description": "Set an SSH configuration value", + "type": "string" + } + }, + "additionalProperties": false + } + }, + "additionalProperties": false + } + }, + "additionalProperties": false } }, "additionalProperties": false @@ -105,6 +157,7 @@ "type": "object", "patternProperties": { "([\\w\\-]+)": { + "description": "Theme definitions", "title": "/ui/theme-defs/", "type": "object", "properties": { diff --git a/src/lnav.cc b/src/lnav.cc index ca0401a5..9a691154 100644 --- a/src/lnav.cc +++ b/src/lnav.cc @@ -904,13 +904,32 @@ bool update_active_files(const file_collection& new_files) bool rescan_files(bool req) { + auto& mlooper = injector::get(); bool done = false; + auto delay = 0ms; do { auto fc = lnav_data.ld_active_files.rescan_files(req); + bool all_synced = true; update_active_files(fc); - done = fc.fc_file_names.empty(); + mlooper.get_port().process_for(delay); + if (lnav_data.ld_flags & LNF_HEADLESS) { + for (const auto& pair : lnav_data.ld_active_files.fc_other_files) { + if (pair.second != file_format_t::FF_REMOTE) { + continue; + } + + if (lnav_data.ld_active_files.fc_synced_files + .count(pair.first) == 0) { + all_synced = false; + } + } + if (!all_synced) { + delay = 30ms; + } + } + done = fc.fc_file_names.empty() && all_synced; } while (!done); return true; } @@ -2470,8 +2489,8 @@ SELECT tbl_name FROM sqlite_master WHERE sql LIKE 'CREATE VIRTUAL TABLE%' } #endif else if (is_glob(argv[lpc]) || strchr(argv[lpc], ':') != nullptr) { - lnav_data.ld_active_files.fc_file_names - .emplace(argv[lpc], logfile_open_options()); + lnav_data.ld_active_files.fc_file_names[argv[lpc]] + .with_tail(!(lnav_data.ld_flags & LNF_HEADLESS)); } else if (stat(argv[lpc], &st) == -1) { fprintf(stderr, diff --git a/src/lnav_commands.cc b/src/lnav_commands.cc index 9c545575..6d279b09 100644 --- a/src/lnav_commands.cc +++ b/src/lnav_commands.cc @@ -3861,7 +3861,7 @@ static Result com_config(exec_context &ec, string cmdline, vecto retval = help_text; } else { - retval = "info: " + option + " = " + trim(old_value); + retval = fmt::format("{} = {}", option, trim(old_value)); } } else { diff --git a/src/lnav_config.cc b/src/lnav_config.cc index 0ed28dfa..f5f58012 100644 --- a/src/lnav_config.cc +++ b/src/lnav_config.cc @@ -87,6 +87,10 @@ static auto lc = injector::bind::to_instance(+[]() { return &lnav_config.lc_logfile; }); +static auto tc = injector::bind::to_instance(+[]() { + return &lnav_config.lc_tailer; +}); + bool check_experimental(const char *feature_name) { const char *env_value = getenv("LNAV_EXP"); @@ -445,7 +449,8 @@ static struct json_path_container global_var_handlers = { paths_out.emplace_back(iter.first); } }) - .FOR_FIELD(_lnav_config, lc_global_vars)}; + .FOR_FIELD(_lnav_config, lc_global_vars) +}; static struct json_path_container style_config_handlers = json_path_container{ @@ -801,6 +806,7 @@ static struct json_path_container theme_def_handlers = { static struct json_path_container theme_defs_handlers = { yajlpp::pattern_property_handler("(?[\\w\\-]+)") + .with_description("Theme definitions") .with_obj_provider([](const yajlpp_provider_context &ypc, _lnav_config *root) { lnav_theme < = root->lc_ui_theme_defs[ypc.ypc_extractor.get_substr("theme_name")]; @@ -891,6 +897,55 @@ static struct json_path_container logfile_handlers = { &lnav::logfile::config::lc_max_unrecognized_lines), }; +static struct json_path_container ssh_config_handlers = { + yajlpp::pattern_property_handler("(?\\w+)") + .with_synopsis("name") + .with_description("Set an SSH configuration value") + .with_path_provider<_lnav_config>([]( + auto *m, std::vector &paths_out) { + for (const auto& pair : m->lc_tailer.c_ssh_config) { + paths_out.emplace_back(pair.first); + } + }) + .for_field(&_lnav_config::lc_tailer, + &tailer::config::c_ssh_config), +}; + +static struct json_path_container ssh_option_handlers = { + yajlpp::pattern_property_handler("(?\\w+)") + .with_synopsis("name") + .with_description("Set an option to be passed to the SSH command") + .for_field(&_lnav_config::lc_tailer, + &tailer::config::c_ssh_options), +}; + +static struct json_path_container ssh_handlers = { + yajlpp::property_handler("command") + .with_synopsis("ssh-command") + .with_description("The SSH command to execute") + .for_field(&_lnav_config::lc_tailer, + &tailer::config::c_ssh_cmd), + yajlpp::property_handler("flags") + .with_description("The flags to pass to the SSH command") + .for_field(&_lnav_config::lc_tailer, + &tailer::config::c_ssh_flags), + yajlpp::property_handler("options") + .with_description("The options to pass to the SSH command") + .with_children(ssh_option_handlers), + yajlpp::property_handler("config") + .with_description( + "The ssh_config options to pass to SSH with the -o option") + .with_children(ssh_config_handlers), +}; + +static struct json_path_container remote_handlers = { + yajlpp::property_handler("ssh") + .with_description( + "Settings related to the ssh command used to contact remote " + "machines") + .with_children(ssh_handlers), +}; + static struct json_path_container tuning_handlers = { yajlpp::property_handler("archive-manager") .with_description("Settings related to opening archive files") @@ -901,6 +956,9 @@ static struct json_path_container tuning_handlers = { yajlpp::property_handler("logfile") .with_description("Settings related to log files") .with_children(logfile_handlers), + yajlpp::property_handler("remote") + .with_description("Settings related to remote file support") + .with_children(remote_handlers), }; static set SUPPORTED_CONFIG_SCHEMAS = { diff --git a/src/lnav_config.hh b/src/lnav_config.hh index a1420921..4a03e8e1 100644 --- a/src/lnav_config.hh +++ b/src/lnav_config.hh @@ -49,6 +49,7 @@ #include "archive_manager.cfg.hh" #include "file_vtab.cfg.hh" #include "logfile.cfg.hh" +#include "tailer/tailer.looper.cfg.hh" /** * Check if an experimental feature should be enabled by @@ -97,6 +98,7 @@ struct _lnav_config { archive_manager::config lc_archive_manager; file_vtab::config lc_file_vtab; lnav::logfile::config lc_logfile; + tailer::config lc_tailer; }; extern struct _lnav_config lnav_config; diff --git a/src/logfile.cc b/src/logfile.cc index ffc15d5c..1a70d0a3 100644 --- a/src/logfile.cc +++ b/src/logfile.cc @@ -293,6 +293,10 @@ bool logfile::process_prefix(shared_buffer_ref &sbr, const line_info &li) logfile::rebuild_result_t logfile::rebuild_index() { if (!this->lf_indexing) { + if (this->lf_sort_needed) { + this->lf_sort_needed = false; + return rebuild_result_t::RR_NEW_ORDER; + } return logfile::rebuild_result_t::RR_NO_NEW_LINES; } @@ -507,6 +511,10 @@ logfile::rebuild_result_t logfile::rebuild_index() retval = RR_NEW_LINES; } } + else if (this->lf_sort_needed) { + retval = RR_NEW_ORDER; + this->lf_sort_needed = false; + } this->lf_index_time = this->lf_line_buffer.get_file_time(); if (!this->lf_index_time) { diff --git a/src/logfile_fwd.hh b/src/logfile_fwd.hh index e99edc3f..48c6a508 100644 --- a/src/logfile_fwd.hh +++ b/src/logfile_fwd.hh @@ -94,6 +94,12 @@ struct logfile_open_options { return *this; } + logfile_open_options &with_tail(bool val) { + this->loo_tail = val; + + return *this; + } + std::string loo_filename; auto_fd loo_fd; logfile_name_source loo_source{logfile_name_source::USER}; @@ -102,6 +108,7 @@ struct logfile_open_options { bool loo_is_visible{true}; bool loo_non_utf_is_visible{true}; ssize_t loo_visible_size_limit{-1}; + bool loo_tail{true}; }; #endif diff --git a/src/logfile_sub_source.cc b/src/logfile_sub_source.cc index 20556f74..bc67d774 100644 --- a/src/logfile_sub_source.cc +++ b/src/logfile_sub_source.cc @@ -263,15 +263,8 @@ void logfile_sub_source::text_value_for_line(textview_curses &tc, adjusted_tm); } - if (len > time_range.length()) { - ssize_t padding = len - time_range.length(); - - value_out.insert(time_range.lr_start, - padding, - ' '); - } value_out.replace(time_range.lr_start, - len, + time_range.length(), buffer, len); this->lss_token_shift_start = time_range.lr_start; diff --git a/src/ptimec.hh b/src/ptimec.hh index 44d9f0ac..d4be2263 100644 --- a/src/ptimec.hh +++ b/src/ptimec.hh @@ -197,7 +197,6 @@ inline void ftime_a(char *dst, off_t &off_inout, ssize_t len, const struct exttm inline void ftime_Z(char *dst, off_t &off_inout, ssize_t len, const struct exttm &tm) { - } inline void ftime_b(char *dst, off_t &off_inout, ssize_t len, const struct exttm &tm) diff --git a/src/ptimec_rt.cc b/src/ptimec_rt.cc index b2d4ec8b..6eab65d5 100644 --- a/src/ptimec_rt.cc +++ b/src/ptimec_rt.cc @@ -65,11 +65,15 @@ bool ptime_fmt(const char *fmt, struct exttm *dst, const char *str, off_t &off, case 'a': case 'Z': if (fmt[lpc + 2]) { - if (!ptime_upto(fmt[lpc + 2], str, off, len)) return false; + if (!ptime_upto(fmt[lpc + 2], str, off, len)) { + return false; + } lpc += 1; } else { - if (!ptime_upto_end(str, off, len)) return false; + if (!ptime_upto_end(str, off, len)) { + return false; + } lpc += 1; } break; diff --git a/src/root-config.json b/src/root-config.json index 51b3027d..88afdff7 100644 --- a/src/root-config.json +++ b/src/root-config.json @@ -11,6 +11,15 @@ "archive-manager": { "min-free-space": 33554432, "cache-ttl": "2d" + }, + "remote": { + "ssh": { + "command": "ssh", + "config": { + "BatchMode": "yes", + "StrictHostKeyChecking": "no" + } + } } } } diff --git a/src/tailer/CMakeLists.txt b/src/tailer/CMakeLists.txt index d78327ee..480da34f 100644 --- a/src/tailer/CMakeLists.txt +++ b/src/tailer/CMakeLists.txt @@ -36,6 +36,7 @@ add_library( tailer.looper.hh tailer.looper.cc + tailer.looper.cfg.hh tailerbin.h tailerbin.cc ) diff --git a/src/tailer/Makefile.am b/src/tailer/Makefile.am index da856144..5c556121 100644 --- a/src/tailer/Makefile.am +++ b/src/tailer/Makefile.am @@ -24,6 +24,7 @@ noinst_HEADERS = \ sha-256.h \ tailer.h \ tailer.looper.hh \ + tailer.looper.cfg.hh \ tailerpp.hh libtailercommon_a_SOURCES = \ diff --git a/src/tailer/drive_tailer.cc b/src/tailer/drive_tailer.cc index a173d91c..2c53ba72 100644 --- a/src/tailer/drive_tailer.cc +++ b/src/tailer/drive_tailer.cc @@ -200,6 +200,9 @@ int main(int argc, char *const *argv) ftruncate(fd, ptb.ptb_offset); pwrite(fd, ptb.ptb_bits.data(), ptb.ptb_bits.size(), ptb.ptb_offset); } + }, + [&](const tailer::packet_synced &ps) { + }, [&](const tailer::packet_link &pl) { diff --git a/src/tailer/tailer.h b/src/tailer/tailer.h index e3e47bdd..e1c54de2 100644 --- a/src/tailer/tailer.h +++ b/src/tailer/tailer.h @@ -49,6 +49,7 @@ typedef enum { TPT_ACK_BLOCK, TPT_TAIL_BLOCK, TPT_LINK_BLOCK, + TPT_SYNCED, TPT_LOG, TPT_LOAD_PREVIEW, TPT_PREVIEW_ERROR, diff --git a/src/tailer/tailer.looper.cc b/src/tailer/tailer.looper.cc index 2f54bf7f..88272fe5 100644 --- a/src/tailer/tailer.looper.cc +++ b/src/tailer/tailer.looper.cc @@ -35,6 +35,7 @@ #include "base/lnav_log.hh" #include "base/paths.hh" #include "tailer.looper.hh" +#include "tailer.looper.cfg.hh" #include "tailer.h" #include "tailerpp.hh" #include "lnav.hh" @@ -85,6 +86,7 @@ static void read_err_pipe(const std::string &netloc, auto_fd &err, void tailer::looper::loop_body() { auto now = std::chrono::steady_clock::now(); + std::vector to_erase; for (auto& qpair : this->l_netlocs_to_paths) { auto& netloc = qpair.first; @@ -97,8 +99,17 @@ void tailer::looper::loop_body() auto create_res = host_tailer::for_host(netloc); if (create_res.isErr()) { - this->report_error(netloc, create_res.unwrapErr()); - rpq.rpq_next_attempt_time = now + HOST_RETRY_DELAY; + report_error(netloc, create_res.unwrapErr()); + if (std::any_of(rpq.rpq_new_paths.begin(), + rpq.rpq_new_paths.end(), + [](const auto& pair) { + return !pair.second.loo_tail; + })) { + rpq.send_synced_to_main(netloc); + to_erase.push_back(netloc); + } else { + rpq.rpq_next_attempt_time = now + HOST_RETRY_DELAY; + } continue; } @@ -114,12 +125,13 @@ void tailer::looper::loop_body() if (!rpq.rpq_new_paths.empty()) { log_debug("%s: new paths to monitor -- %s", netloc.c_str(), - rpq.rpq_new_paths.begin()->c_str()); + rpq.rpq_new_paths.begin()->first.c_str()); this->l_remotes[netloc]->send( [paths = rpq.rpq_new_paths](auto &ht) { - for (const auto &path : paths) { - log_debug("adding path to tailer -- %s", path.c_str()); - ht.open_remote_path(path); + for (const auto &pair : paths) { + log_debug("adding path to tailer -- %s", + pair.first.c_str()); + ht.open_remote_path(pair.first, pair.second); } }); @@ -128,12 +140,18 @@ void tailer::looper::loop_body() rpq.rpq_new_paths.clear(); } } + + for (const auto& netloc : to_erase) { + this->l_netlocs_to_paths.erase(netloc); + } } -void tailer::looper::add_remote(const network::path& path) +void tailer::looper::add_remote(const network::path &path, + logfile_open_options options) { auto netloc_str = fmt::format("{}", path.home()); - this->l_netlocs_to_paths[netloc_str].rpq_new_paths.insert(path.p_path); + this->l_netlocs_to_paths[netloc_str].rpq_new_paths[path.p_path] = + std::move(options); } void tailer::looper::load_preview(int64_t id, const network::path& path) @@ -192,11 +210,47 @@ void tailer::looper::complete_path(const network::path& path) }); } +static std::vector +create_ssh_args_from_config(const std::string& dest) +{ + auto& cfg = injector::get(); + std::vector retval; + + retval.emplace_back(cfg.c_ssh_cmd); + if (!cfg.c_ssh_flags.empty()) { + if (startswith(cfg.c_ssh_flags, "-")) { + retval.emplace_back(cfg.c_ssh_flags); + } else { + retval.emplace_back(fmt::format("-{}", cfg.c_ssh_flags)); + } + } + for (const auto& pair : cfg.c_ssh_options) { + if (pair.second.empty()) { + continue; + } + retval.emplace_back(fmt::format("-{}", pair.first)); + retval.emplace_back(pair.second); + } + for (const auto& pair : cfg.c_ssh_config) { + if (pair.second.empty()) { + continue; + } + retval.emplace_back(fmt::format( + "-o{}={}", pair.first, pair.second)); + } + retval.emplace_back(dest); + + return retval; +} + Result, std::string> tailer::looper::host_tailer::for_host(const std::string& netloc) { log_debug("tailer(%s): transferring tailer to remote", netloc.c_str()); + auto& cfg = injector::get(); + auto tailer_bin_name = fmt::format("tailer.bin.{}", getpid()); + auto rp = humanize::network::path::from_str(netloc).value(); auto ssh_dest = rp.p_locality.l_hostname; if (rp.p_locality.l_username.has_value()) { @@ -216,12 +270,21 @@ tailer::looper::host_tailer::for_host(const std::string& netloc) err_pipe.after_fork(child.in()); if (child.in_child()) { - execlp("ssh", "ssh", - "-oStrictHostKeyChecking=no", - "-oBatchMode=yes", - ssh_dest.c_str(), - "cat > tailer.bin && chmod ugo+rx ./tailer.bin", - nullptr); + auto arg_strs = create_ssh_args_from_config(ssh_dest); + std::vector args; + + arg_strs.emplace_back(fmt::format( + "cat > {} && chmod ugo+rx ./{}", tailer_bin_name, tailer_bin_name)); + + fmt::print(stderr, "tailer({}): executing -- {}\n", + netloc, + fmt::join(arg_strs, " ")); + for (const auto& arg : arg_strs) { + args.push_back((char *) arg.data()); + } + args.push_back(nullptr); + + execvp(cfg.c_ssh_cmd.c_str(), args.data()); exit(EXIT_FAILURE); } @@ -283,13 +346,21 @@ tailer::looper::host_tailer::for_host(const std::string& netloc) err_pipe.after_fork(child.in()); if (child.in_child()) { - execlp("ssh", "ssh", - // "-q", - "-oStrictHostKeyChecking=no", - "-oBatchMode=yes", - ssh_dest.c_str(), - "./tailer.bin", - nullptr); + auto arg_strs = create_ssh_args_from_config(ssh_dest); + std::vector args; + + arg_strs.emplace_back(fmt::format( + "./{}", tailer_bin_name, tailer_bin_name)); + + fmt::print(stderr, "tailer({}): executing -- {}\n", + netloc, + fmt::join(arg_strs, " ")); + for (const auto& arg : arg_strs) { + args.push_back((char *) arg.data()); + } + args.push_back(nullptr); + + execvp(cfg.c_ssh_cmd.c_str(), args.data()); exit(EXIT_FAILURE); } @@ -346,11 +417,12 @@ tailer::looper::host_tailer::host_tailer(const std::string &netloc, { } -void tailer::looper::host_tailer::open_remote_path(const std::string& path) +void tailer::looper::host_tailer::open_remote_path(const std::string& path, + logfile_open_options loo) { this->ht_state.match( [&](connected& conn) { - conn.c_desired_paths.insert(path); + conn.c_desired_paths[path] = std::move(loo); send_packet(conn.ht_to_child.get(), TPT_OPEN_PATH, TPPT_STRING, path.c_str(), @@ -359,6 +431,9 @@ void tailer::looper::host_tailer::open_remote_path(const std::string& path) [&](const disconnected& d) { log_warning("disconnected from host, cannot tail: %s", path.c_str()); + }, + [&](const synced& s) { + log_warning("synced with host, not tailing: %s", path.c_str()); } ); } @@ -387,6 +462,9 @@ void tailer::looper::host_tailer::load_preview(int64_t id, const std::string &pa .set_cylon(false) .set_value(msg); }); + }, + [&](const synced& s) { + require(false); } ); } @@ -403,6 +481,9 @@ void tailer::looper::host_tailer::complete_path(const std::string &path) [&](const disconnected& d) { log_warning("disconnected from host, cannot preview: %s", path.c_str()); + }, + [&](const synced& s) { + require(false); } ); } @@ -469,6 +550,35 @@ void tailer::looper::host_tailer::loop_body() log_debug("Got an offer: %s %lld - %lld", pob.pob_path.c_str(), pob.pob_offset, pob.pob_length); + logfile_open_options loo; + if (pob.pob_path == pob.pob_root_path) { + auto root_iter = conn.c_desired_paths.find(pob.pob_path); + + if (root_iter == conn.c_desired_paths.end()) { + log_warning("ignoring unknown root: %s", + pob.pob_root_path.c_str()); + return std::move(this->ht_state); + } + + loo = std::move(root_iter->second); + } else { + auto child_iter = conn.c_child_paths.find(pob.pob_path); + if (child_iter == conn.c_child_paths.end()) { + auto root_iter = conn.c_desired_paths.find(pob.pob_root_path); + + if (root_iter == conn.c_desired_paths.end()) { + log_warning("ignoring child of unknown root: %s", + pob.pob_root_path.c_str()); + return std::move(this->ht_state); + } + + conn.c_child_paths[pob.pob_path] = std::move(root_iter->second); + child_iter = conn.c_child_paths.find(pob.pob_path); + } + + loo = std::move(child_iter->second); + } + auto remote_path = ghc::filesystem::absolute( ghc::filesystem::path(pob.pob_path)).relative_path(); auto local_path = this->ht_local_path / remote_path; @@ -479,7 +589,7 @@ void tailer::looper::host_tailer::loop_body() auto custom_name = this->get_display_path(pob.pob_path); isc::to() - .send([local_path, custom_name](auto &mlooper) { + .send([local_path, custom_name, loo](auto &mlooper) { auto &active_fc = lnav_data.ld_active_files; auto lpath_str = local_path.string(); @@ -496,7 +606,8 @@ void tailer::looper::host_tailer::loop_body() fc.fc_file_names[lpath_str] .with_filename(custom_name) - .with_source(logfile_name_source::REMOTE); + .with_source(logfile_name_source::REMOTE) + .with_tail(loo.loo_tail); update_active_files(fc); }); } @@ -575,6 +686,39 @@ void tailer::looper::host_tailer::loop_body() } return std::move(this->ht_state); }, + [&](const tailer::packet_synced &ps) { + if (ps.ps_root_path == ps.ps_path) { + auto iter = conn.c_desired_paths.find(ps.ps_path); + + if (iter != conn.c_desired_paths.end()) { + if (!iter->second.loo_tail) { + log_info("synced desired path: %s", + iter->first.c_str()); + conn.c_desired_paths.erase(iter); + } + + } + } else { + auto iter = conn.c_child_paths.find(ps.ps_path); + + if (iter != conn.c_child_paths.end()) { + if (!iter->second.loo_tail) { + log_info("synced child path: %s", + iter->first.c_str()); + conn.c_child_paths.erase(iter); + } + } + } + + if (conn.c_desired_paths.empty() && + conn.c_child_paths.empty()) { + log_info("tailer(%s): all desired paths synced", + this->ht_netloc.c_str()); + return state_v{synced{}}; + } + + return std::move(this->ht_state); + }, [&](const tailer::packet_link &pl) { auto remote_path = ghc::filesystem::absolute( ghc::filesystem::path(pl.pl_path)).relative_path(); @@ -657,7 +801,7 @@ void tailer::looper::host_tailer::loop_body() } ); - if (this->ht_state.is()) { + if (!this->ht_state.is()) { this->s_looping = false; } } @@ -671,7 +815,7 @@ tailer::looper::host_tailer::compute_timeout(mstime_t current_time) const void tailer::looper::host_tailer::stopped() { - if (!this->ht_state.is()) { + if (this->ht_state.is()) { this->ht_state = disconnected(); } if (this->ht_error_reader.joinable()) { @@ -712,11 +856,45 @@ tailer::looper::child_finished(std::shared_ptr child) continue; } + if (child_tailer->is_synced()) { + log_info("synced with netloc '%s', removing", iter->first.c_str()); + auto netloc_iter = this->l_netlocs_to_paths.find(iter->first); + + if (netloc_iter != this->l_netlocs_to_paths.end()) { + netloc_iter->second.send_synced_to_main(netloc_iter->first); + this->l_netlocs_to_paths.erase(netloc_iter); + } + } this->l_remotes.erase(iter); return; } } +void +tailer::looper::remote_path_queue::send_synced_to_main(const std::string& netloc) +{ + std::set synced_files; + + for (const auto& pair : this->rpq_new_paths) { + if (!pair.second.loo_tail) { + synced_files.emplace(fmt::format("{}{}", netloc, pair.first)); + } + } + for (const auto& pair : this->rpq_existing_paths) { + if (!pair.second.loo_tail) { + synced_files.emplace(fmt::format("{}{}", netloc, pair.first)); + } + } + + isc::to() + .send([file_set = std::move(synced_files)](auto& mlooper) { + file_collection fc; + + fc.fc_synced_files = file_set; + update_active_files(fc); + }); +} + void tailer::looper::report_error(std::string path, std::string msg) { isc::to() diff --git a/src/tailer/tailer.looper.cfg.hh b/src/tailer/tailer.looper.cfg.hh new file mode 100644 index 00000000..ed0125a3 --- /dev/null +++ b/src/tailer/tailer.looper.cfg.hh @@ -0,0 +1,47 @@ +/** + * Copyright (c) 2021, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef lnav_tailer_looper_cfg_hh +#define lnav_tailer_looper_cfg_hh + +namespace tailer { + +struct config { + std::string c_ssh_cmd{"ssh"}; + std::string c_ssh_flags{}; + std::map c_ssh_options{}; + std::map c_ssh_config{ + {"BatchMode", "yes"}, + {"StrictHostKeyChecking", "yes"}, + }; +}; + +} + +#endif diff --git a/src/tailer/tailer.looper.hh b/src/tailer/tailer.looper.hh index e7b47257..c2d8b26d 100644 --- a/src/tailer/tailer.looper.hh +++ b/src/tailer/tailer.looper.hh @@ -31,6 +31,7 @@ #define lnav_tailer_looper_hh #include +#include #include "base/isc.hh" #include "base/auto_pid.hh" @@ -43,13 +44,17 @@ namespace tailer { class looper : public isc::service { public: - void add_remote(const network::path& path); + void add_remote(const network::path &path, logfile_open_options options); void load_preview(int64_t id, const network::path& path); void complete_path(const network::path& path); - std::set active_netlocs() { + bool empty() const { + return this->l_netlocs_to_paths.empty(); + } + + std::set active_netlocs() const { std::set retval; for (const auto& pair : this->l_remotes) { @@ -75,12 +80,16 @@ private: auto_fd to_child, auto_fd from_child, auto_fd err_from_child); - void open_remote_path(const std::string& path); + void open_remote_path(const std::string& path, logfile_open_options loo); void load_preview(int64_t id, const std::string& path); void complete_path(const std::string& path); + bool is_synced() const { + return this->ht_state.is(); + } + protected: void *run() override; @@ -100,14 +109,16 @@ private: auto_pid ht_child; auto_fd ht_to_child; auto_fd ht_from_child; - std::set c_desired_paths; + std::map c_desired_paths; + std::map c_child_paths; auto_pid close() &&; }; struct disconnected {}; + struct synced {}; - using state_v = mapbox::util::variant; + using state_v = mapbox::util::variant; const std::string ht_netloc; const ghc::filesystem::path ht_local_path; @@ -123,8 +134,10 @@ private: struct remote_path_queue { attempt_time_point rpq_next_attempt_time{std::chrono::steady_clock::now()}; - std::set rpq_new_paths; - std::set rpq_existing_paths; + std::map rpq_new_paths; + std::map rpq_existing_paths; + + void send_synced_to_main(const std::string& netloc); }; std::map l_netlocs_to_paths; diff --git a/src/tailer/tailer.main.c b/src/tailer/tailer.main.c index fbf5dfce..ae49c9a8 100644 --- a/src/tailer/tailer.main.c +++ b/src/tailer/tailer.main.c @@ -385,13 +385,19 @@ void send_preview_data(int64_t id, const char *path, int32_t len, const char *bi TPPT_DONE); } -int poll_paths(struct list *path_list) +int poll_paths(struct list *path_list, struct client_path_state *root_cps) { struct client_path_state *curr = (struct client_path_state *) path_list->l_head; + int is_top = root_cps == NULL; int retval = 0; while (curr->cps_node.n_succ != NULL) { + if (is_top) { + root_cps = curr; + } + if (is_glob(curr->cps_path)) { + int changes = 0; glob_t gl; memset(&gl, 0, sizeof(gl)); @@ -407,6 +413,7 @@ int poll_paths(struct list *path_list) if ((child = find_client_path_state( &prev_children, gl.gl_pathv[lpc])) == NULL) { child = create_client_path_state(gl.gl_pathv[lpc]); + changes += 1; } else { list_remove(&child->cps_node); } @@ -420,9 +427,21 @@ int poll_paths(struct list *path_list) &prev_children)) != NULL) { send_error(child, "deleted"); delete_client_path_state(child); + changes += 1; } - retval += poll_paths(&curr->cps_children); + retval += poll_paths(&curr->cps_children, root_cps); + } + + if (changes) { + curr->cps_client_state = CS_INIT; + } else if (curr->cps_client_state != CS_SYNCED) { + send_packet(STDOUT_FILENO, + TPT_SYNCED, + TPPT_STRING, root_cps->cps_path, + TPPT_STRING, curr->cps_path, + TPPT_DONE); + curr->cps_client_state = CS_SYNCED; } curr = (struct client_path_state *) curr->cps_node.n_succ; @@ -454,6 +473,7 @@ int poll_paths(struct list *path_list) buffer[link_len] = '\0'; send_packet(STDOUT_FILENO, TPT_LINK_BLOCK, + TPPT_STRING, root_cps->cps_path, TPPT_STRING, curr->cps_path, TPPT_STRING, buffer, TPPT_DONE); @@ -482,13 +502,14 @@ int poll_paths(struct list *path_list) break; } - retval += poll_paths(&curr->cps_children); + retval += poll_paths(&curr->cps_children, root_cps); curr->cps_last_path_state = PS_OK; } else if (S_ISREG(st.st_mode)) { switch (curr->cps_client_state) { case CS_INIT: - case CS_TAILING: { + case CS_TAILING: + case CS_SYNCED: { if (curr->cps_client_file_offset < st.st_size) { int fd = open(curr->cps_path, O_RDONLY); @@ -515,6 +536,7 @@ int poll_paths(struct list *path_list) curr->cps_client_file_read_length = bytes_read; send_packet(STDOUT_FILENO, TPT_OFFER_BLOCK, + TPPT_STRING, root_cps->cps_path, TPPT_STRING, curr->cps_path, TPPT_INT64, file_offset, TPPT_INT64, bytes_read, @@ -528,16 +550,25 @@ int poll_paths(struct list *path_list) send_packet(STDOUT_FILENO, TPT_TAIL_BLOCK, + TPPT_STRING, root_cps->cps_path, TPPT_STRING, curr->cps_path, TPPT_INT64, curr->cps_client_file_offset, TPPT_BITS, bytes_read, buffer, TPPT_DONE); curr->cps_client_file_offset += bytes_read; + curr->cps_client_state = CS_TAILING; } close(fd); retval = 1; } + } else if (curr->cps_client_state != CS_SYNCED) { + send_packet(STDOUT_FILENO, + TPT_SYNCED, + TPPT_STRING, root_cps->cps_path, + TPPT_STRING, curr->cps_path, + TPPT_DONE); + curr->cps_client_state = CS_SYNCED; } break; } @@ -545,11 +576,6 @@ int poll_paths(struct list *path_list) // Still waiting for the client ack break; } - case CS_SYNCED: { - fprintf(stderr, "internal-error: got synced for %s\n", - curr->cps_path); - break; - } } curr->cps_last_path_state = PS_OK; @@ -561,6 +587,7 @@ int poll_paths(struct list *path_list) } else { struct list prev_children; struct dirent *entry; + int changes = 0; list_move(&prev_children, &curr->cps_children); while ((entry = readdir(dir)) != NULL) { @@ -584,6 +611,7 @@ int poll_paths(struct list *path_list) // new file fprintf(stderr, "info: monitoring child path: %s\n", full_path); child = create_client_path_state(full_path); + changes += 1; } else { list_remove(&child->cps_node); } @@ -597,9 +625,21 @@ int poll_paths(struct list *path_list) &prev_children)) != NULL) { send_error(child, "deleted"); delete_client_path_state(child); + changes += 1; } - retval += poll_paths(&curr->cps_children); + retval += poll_paths(&curr->cps_children, root_cps); + + if (changes) { + curr->cps_client_state = CS_INIT; + } else if (curr->cps_client_state != CS_SYNCED) { + send_packet(STDOUT_FILENO, + TPT_SYNCED, + TPPT_STRING, root_cps->cps_path, + TPPT_STRING, curr->cps_path, + TPPT_DONE); + curr->cps_client_state = CS_SYNCED; + } } curr->cps_last_path_state = PS_OK; @@ -805,6 +845,9 @@ int main(int argc, char *argv[]) int done = 0, timeout = 0; recv_state_t rstate = RS_PACKET_TYPE; + // No need to leave ourselves around + unlink(argv[0]); + list_init(&client_path_list); while (!done) { @@ -912,7 +955,7 @@ int main(int argc, char *argv[]) } if (!done) { - if (poll_paths(&client_path_list)) { + if (poll_paths(&client_path_list, NULL)) { timeout = 0; } else { timeout = 1000; diff --git a/src/tailer/tailerpp.cc b/src/tailer/tailerpp.cc index 6c1ca5df..0952fcfa 100644 --- a/src/tailer/tailerpp.cc +++ b/src/tailer/tailerpp.cc @@ -77,6 +77,7 @@ Result read_packet(int fd) packet_offer_block pob; TRY(read_payloads_into(fd, + pob.pob_root_path, pob.pob_path, pob.pob_offset, pob.pob_length, @@ -87,15 +88,25 @@ Result read_packet(int fd) packet_tail_block ptb; TRY(read_payloads_into(fd, + ptb.ptb_root_path, ptb.ptb_path, ptb.ptb_offset, ptb.ptb_bits)); return Ok(packet{ptb}); } + case TPT_SYNCED: { + packet_synced ps; + + TRY(read_payloads_into(fd, + ps.ps_root_path, + ps.ps_path)); + return Ok(packet{ps}); + } case TPT_LINK_BLOCK: { packet_link pl; TRY(read_payloads_into(fd, + pl.pl_root_path, pl.pl_path, pl.pl_link_value)); return Ok(packet{pl}); diff --git a/src/tailer/tailerpp.hh b/src/tailer/tailerpp.hh index 28d932f1..b4c48c97 100644 --- a/src/tailer/tailerpp.hh +++ b/src/tailer/tailerpp.hh @@ -65,6 +65,7 @@ struct packet_log { }; struct packet_offer_block { + std::string pob_root_path; std::string pob_path; int64_t pob_offset; int64_t pob_length; @@ -72,12 +73,19 @@ struct packet_offer_block { }; struct packet_tail_block { + std::string ptb_root_path; std::string ptb_path; int64_t ptb_offset; std::vector ptb_bits; }; +struct packet_synced { + std::string ps_root_path; + std::string ps_path; +}; + struct packet_link { + std::string pl_root_path; std::string pl_path; std::string pl_link_value; }; @@ -101,7 +109,7 @@ struct packet_possible_path { using packet = mapbox::util::variant< packet_eof, packet_error, packet_offer_block, packet_tail_block, packet_link, packet_preview_error, packet_preview_data, - packet_possible_path>; + packet_possible_path, packet_synced>; struct recv_payload_type {}; struct recv_payload_length {}; diff --git a/src/yajlpp/yajlpp.cc b/src/yajlpp/yajlpp.cc index a5eec1e0..dd9fc5ac 100644 --- a/src/yajlpp/yajlpp.cc +++ b/src/yajlpp/yajlpp.cc @@ -124,6 +124,9 @@ yajl_gen_status json_path_handler_base::gen(yajlpp_gen_context &ygc, yajl_gen ha size_t len; yajl_gen_get_buf(handle, &buf, &len); if (status != yajl_gen_status_ok) { + log_error("yajl_gen failure for: %s -- %d", + jph.jph_property.c_str(), + status); return status; } } @@ -832,7 +835,8 @@ yajlpp_parse_context &yajlpp_parse_context::set_path(const string &path) for (size_t lpc = 0; lpc < path.size(); lpc++) { switch (path[lpc]) { case '/': - this->ypc_path_index_stack.push_back(1 + lpc); + this->ypc_path_index_stack.push_back( + this->ypc_path_index_stack.empty() ? 1 : 0 + lpc); break; } } @@ -846,7 +850,7 @@ const char *yajlpp_parse_context::get_path_fragment(int offset, char *frag_in, size_t start, end; if (offset < 0) { - offset = this->ypc_path_index_stack.size() + offset; + offset = ((int) this->ypc_path_index_stack.size()) + offset; } start = this->ypc_path_index_stack[offset] + ((offset == 0) ? 0 : 1); if ((offset + 1) < (int)this->ypc_path_index_stack.size()) { diff --git a/src/yajlpp/yajlpp.hh b/src/yajlpp/yajlpp.hh index 14e6aa42..957e30f7 100644 --- a/src/yajlpp/yajlpp.hh +++ b/src/yajlpp/yajlpp.hh @@ -447,7 +447,7 @@ public: typename std::enable_if::value>::type* dummy = 0) { yajl_gen_array_open(this->yg_handle); - for (auto elem : container) { + for (const auto& elem : container) { yajl_gen_status rc = (*this)(elem); if (rc != yajl_gen_status_ok) { diff --git a/src/yajlpp/yajlpp_def.hh b/src/yajlpp/yajlpp_def.hh index 5f27b11e..2900b003 100644 --- a/src/yajlpp/yajlpp_def.hh +++ b/src/yajlpp/yajlpp_def.hh @@ -530,6 +530,88 @@ struct json_path_handler : public json_path_handler_base { return *this; } + template< + typename... Args, + std::enable_if_t, Args...>::value, bool> = true + > + json_path_handler &for_field(Args... args) { + this->add_cb(str_field_cb2); + this->jph_str_cb = [args...](yajlpp_parse_context *ypc, + const unsigned char *str, + size_t len) { + auto obj = ypc->ypc_obj_stack.top(); + auto key = ypc->get_path_fragment(-1); + + json_path_handler::get_field(obj, args...)[key] = std::string((const char *) str, len); + + return 1; + }; + this->jph_gen_callback = [args...](yajlpp_gen_context &ygc, + const json_path_handler_base &jph, + yajl_gen handle) { + const auto& field = json_path_handler::get_field(ygc.ygc_obj_stack.top(), args...); + + if (!ygc.ygc_default_stack.empty()) { + const auto& field_def = json_path_handler::get_field(ygc.ygc_default_stack.top(), args...); + + if (field == field_def) { + return yajl_gen_status_ok; + } + } + + { + yajlpp_generator gen(handle); + + for (const auto& pair : field) { + gen(pair.first); + gen(pair.second); + } + } + + return yajl_gen_status_ok; + }; + return *this; + } + + template< + typename... Args, + std::enable_if_t::value, bool> = true + > + json_path_handler &for_field(Args... args) { + this->add_cb(str_field_cb2); + this->jph_str_cb = [args...](yajlpp_parse_context *ypc, + const unsigned char *str, + size_t len) { + auto obj = ypc->ypc_obj_stack.top(); + + json_path_handler::get_field(obj, args...) = std::string((const char *) str, len); + + return 1; + }; + this->jph_gen_callback = [args...](yajlpp_gen_context &ygc, + const json_path_handler_base &jph, + yajl_gen handle) { + const auto& field = json_path_handler::get_field(ygc.ygc_obj_stack.top(), args...); + + if (!ygc.ygc_default_stack.empty()) { + const auto& field_def = json_path_handler::get_field(ygc.ygc_default_stack.top(), args...); + + if (field == field_def) { + return yajl_gen_status_ok; + } + } + + if (ygc.ygc_depth) { + yajl_gen_string(handle, jph.jph_property); + } + + yajlpp_generator gen(handle); + + return gen(field); + }; + return *this; + } + template< typename... Args, std::enable_if_t::value, bool> = true diff --git a/test/Makefile.am b/test/Makefile.am index f32d5d73..f8c5f4a1 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -24,6 +24,14 @@ AM_CPPFLAGS = \ # AM_CFLAGS = -fprofile-arcs -ftest-coverage # AM_CXXFLAGS = -fprofile-arcs -ftest-coverage +remote/ssh_host_rsa_key: + mkdir -p remote + ssh-keygen -f remote/ssh_host_rsa_key -N '' -t rsa + +remote/ssh_host_dsa_key: + mkdir -p remote + ssh-keygen -f remote/ssh_host_dsa_key -N '' -t dsa + noinst_LIBRARIES = \ libtestdummy.a @@ -174,6 +182,7 @@ dist_noinst_SCRIPTS = \ test_logfile.sh \ test_meta.sh \ test_mvwattrline.sh \ + test_remote.sh \ test_scripts.sh \ test_sessions.sh \ test_shlexer.sh \ @@ -408,7 +417,10 @@ DISTCLEANFILES = \ empty \ scripts-empty +all-local: remote/ssh_host_dsa_key remote/ssh_host_rsa_key + distclean-local: + $(RM_V)rm -rf remote $(RM_V)rm -rf sessions $(RM_V)rm -rf tmp $(RM_V)rm -rf rotmp diff --git a/test/logfile_block.1 b/test/logfile_block.1 new file mode 100644 index 00000000..d72738eb --- /dev/null +++ b/test/logfile_block.1 @@ -0,0 +1,2 @@ +Wed May 19 08:00:01 EST 2021 line 1 +Wed May 19 08:00:03 EST 2021 line 3 diff --git a/test/logfile_block.2 b/test/logfile_block.2 new file mode 100644 index 00000000..3d21dd3f --- /dev/null +++ b/test/logfile_block.2 @@ -0,0 +1,2 @@ +Wed May 19 12:00:02 UTC 2021 line 2 +Wed May 19 12:00:04 UTC 2021 line 4 diff --git a/test/test_cmds.sh b/test/test_cmds.sh index 4422ce6f..02ddde6b 100644 --- a/test/test_cmds.sh +++ b/test/test_cmds.sh @@ -262,7 +262,7 @@ check_error_output "config clock-format 1" < remote/sshd_config <