|
|
|
@ -35,6 +35,7 @@
|
|
|
|
|
#include "base/lnav_log.hh"
|
|
|
|
|
#include "base/paths.hh"
|
|
|
|
|
#include "tailer.looper.hh"
|
|
|
|
|
#include "tailer.looper.cfg.hh"
|
|
|
|
|
#include "tailer.h"
|
|
|
|
|
#include "tailerpp.hh"
|
|
|
|
|
#include "lnav.hh"
|
|
|
|
@ -85,6 +86,7 @@ static void read_err_pipe(const std::string &netloc, auto_fd &err,
|
|
|
|
|
void tailer::looper::loop_body()
|
|
|
|
|
{
|
|
|
|
|
auto now = std::chrono::steady_clock::now();
|
|
|
|
|
std::vector<std::string> to_erase;
|
|
|
|
|
|
|
|
|
|
for (auto& qpair : this->l_netlocs_to_paths) {
|
|
|
|
|
auto& netloc = qpair.first;
|
|
|
|
@ -97,8 +99,17 @@ void tailer::looper::loop_body()
|
|
|
|
|
auto create_res = host_tailer::for_host(netloc);
|
|
|
|
|
|
|
|
|
|
if (create_res.isErr()) {
|
|
|
|
|
this->report_error(netloc, create_res.unwrapErr());
|
|
|
|
|
rpq.rpq_next_attempt_time = now + HOST_RETRY_DELAY;
|
|
|
|
|
report_error(netloc, create_res.unwrapErr());
|
|
|
|
|
if (std::any_of(rpq.rpq_new_paths.begin(),
|
|
|
|
|
rpq.rpq_new_paths.end(),
|
|
|
|
|
[](const auto& pair) {
|
|
|
|
|
return !pair.second.loo_tail;
|
|
|
|
|
})) {
|
|
|
|
|
rpq.send_synced_to_main(netloc);
|
|
|
|
|
to_erase.push_back(netloc);
|
|
|
|
|
} else {
|
|
|
|
|
rpq.rpq_next_attempt_time = now + HOST_RETRY_DELAY;
|
|
|
|
|
}
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -114,12 +125,13 @@ void tailer::looper::loop_body()
|
|
|
|
|
if (!rpq.rpq_new_paths.empty()) {
|
|
|
|
|
log_debug("%s: new paths to monitor -- %s",
|
|
|
|
|
netloc.c_str(),
|
|
|
|
|
rpq.rpq_new_paths.begin()->c_str());
|
|
|
|
|
rpq.rpq_new_paths.begin()->first.c_str());
|
|
|
|
|
this->l_remotes[netloc]->send(
|
|
|
|
|
[paths = rpq.rpq_new_paths](auto &ht) {
|
|
|
|
|
for (const auto &path : paths) {
|
|
|
|
|
log_debug("adding path to tailer -- %s", path.c_str());
|
|
|
|
|
ht.open_remote_path(path);
|
|
|
|
|
for (const auto &pair : paths) {
|
|
|
|
|
log_debug("adding path to tailer -- %s",
|
|
|
|
|
pair.first.c_str());
|
|
|
|
|
ht.open_remote_path(pair.first, pair.second);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
@ -128,12 +140,18 @@ void tailer::looper::loop_body()
|
|
|
|
|
rpq.rpq_new_paths.clear();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (const auto& netloc : to_erase) {
|
|
|
|
|
this->l_netlocs_to_paths.erase(netloc);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void tailer::looper::add_remote(const network::path& path)
|
|
|
|
|
void tailer::looper::add_remote(const network::path &path,
|
|
|
|
|
logfile_open_options options)
|
|
|
|
|
{
|
|
|
|
|
auto netloc_str = fmt::format("{}", path.home());
|
|
|
|
|
this->l_netlocs_to_paths[netloc_str].rpq_new_paths.insert(path.p_path);
|
|
|
|
|
this->l_netlocs_to_paths[netloc_str].rpq_new_paths[path.p_path] =
|
|
|
|
|
std::move(options);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void tailer::looper::load_preview(int64_t id, const network::path& path)
|
|
|
|
@ -192,11 +210,47 @@ void tailer::looper::complete_path(const network::path& path)
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static std::vector<std::string>
|
|
|
|
|
create_ssh_args_from_config(const std::string& dest)
|
|
|
|
|
{
|
|
|
|
|
auto& cfg = injector::get<const tailer::config&>();
|
|
|
|
|
std::vector<std::string> retval;
|
|
|
|
|
|
|
|
|
|
retval.emplace_back(cfg.c_ssh_cmd);
|
|
|
|
|
if (!cfg.c_ssh_flags.empty()) {
|
|
|
|
|
if (startswith(cfg.c_ssh_flags, "-")) {
|
|
|
|
|
retval.emplace_back(cfg.c_ssh_flags);
|
|
|
|
|
} else {
|
|
|
|
|
retval.emplace_back(fmt::format("-{}", cfg.c_ssh_flags));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for (const auto& pair : cfg.c_ssh_options) {
|
|
|
|
|
if (pair.second.empty()) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
retval.emplace_back(fmt::format("-{}", pair.first));
|
|
|
|
|
retval.emplace_back(pair.second);
|
|
|
|
|
}
|
|
|
|
|
for (const auto& pair : cfg.c_ssh_config) {
|
|
|
|
|
if (pair.second.empty()) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
retval.emplace_back(fmt::format(
|
|
|
|
|
"-o{}={}", pair.first, pair.second));
|
|
|
|
|
}
|
|
|
|
|
retval.emplace_back(dest);
|
|
|
|
|
|
|
|
|
|
return retval;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Result<std::shared_ptr<tailer::looper::host_tailer>, std::string>
|
|
|
|
|
tailer::looper::host_tailer::for_host(const std::string& netloc)
|
|
|
|
|
{
|
|
|
|
|
log_debug("tailer(%s): transferring tailer to remote", netloc.c_str());
|
|
|
|
|
|
|
|
|
|
auto& cfg = injector::get<const tailer::config&>();
|
|
|
|
|
auto tailer_bin_name = fmt::format("tailer.bin.{}", getpid());
|
|
|
|
|
|
|
|
|
|
auto rp = humanize::network::path::from_str(netloc).value();
|
|
|
|
|
auto ssh_dest = rp.p_locality.l_hostname;
|
|
|
|
|
if (rp.p_locality.l_username.has_value()) {
|
|
|
|
@ -216,12 +270,21 @@ tailer::looper::host_tailer::for_host(const std::string& netloc)
|
|
|
|
|
err_pipe.after_fork(child.in());
|
|
|
|
|
|
|
|
|
|
if (child.in_child()) {
|
|
|
|
|
execlp("ssh", "ssh",
|
|
|
|
|
"-oStrictHostKeyChecking=no",
|
|
|
|
|
"-oBatchMode=yes",
|
|
|
|
|
ssh_dest.c_str(),
|
|
|
|
|
"cat > tailer.bin && chmod ugo+rx ./tailer.bin",
|
|
|
|
|
nullptr);
|
|
|
|
|
auto arg_strs = create_ssh_args_from_config(ssh_dest);
|
|
|
|
|
std::vector<char *> args;
|
|
|
|
|
|
|
|
|
|
arg_strs.emplace_back(fmt::format(
|
|
|
|
|
"cat > {} && chmod ugo+rx ./{}", tailer_bin_name, tailer_bin_name));
|
|
|
|
|
|
|
|
|
|
fmt::print(stderr, "tailer({}): executing -- {}\n",
|
|
|
|
|
netloc,
|
|
|
|
|
fmt::join(arg_strs, " "));
|
|
|
|
|
for (const auto& arg : arg_strs) {
|
|
|
|
|
args.push_back((char *) arg.data());
|
|
|
|
|
}
|
|
|
|
|
args.push_back(nullptr);
|
|
|
|
|
|
|
|
|
|
execvp(cfg.c_ssh_cmd.c_str(), args.data());
|
|
|
|
|
exit(EXIT_FAILURE);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -283,13 +346,21 @@ tailer::looper::host_tailer::for_host(const std::string& netloc)
|
|
|
|
|
err_pipe.after_fork(child.in());
|
|
|
|
|
|
|
|
|
|
if (child.in_child()) {
|
|
|
|
|
execlp("ssh", "ssh",
|
|
|
|
|
// "-q",
|
|
|
|
|
"-oStrictHostKeyChecking=no",
|
|
|
|
|
"-oBatchMode=yes",
|
|
|
|
|
ssh_dest.c_str(),
|
|
|
|
|
"./tailer.bin",
|
|
|
|
|
nullptr);
|
|
|
|
|
auto arg_strs = create_ssh_args_from_config(ssh_dest);
|
|
|
|
|
std::vector<char *> args;
|
|
|
|
|
|
|
|
|
|
arg_strs.emplace_back(fmt::format(
|
|
|
|
|
"./{}", tailer_bin_name, tailer_bin_name));
|
|
|
|
|
|
|
|
|
|
fmt::print(stderr, "tailer({}): executing -- {}\n",
|
|
|
|
|
netloc,
|
|
|
|
|
fmt::join(arg_strs, " "));
|
|
|
|
|
for (const auto& arg : arg_strs) {
|
|
|
|
|
args.push_back((char *) arg.data());
|
|
|
|
|
}
|
|
|
|
|
args.push_back(nullptr);
|
|
|
|
|
|
|
|
|
|
execvp(cfg.c_ssh_cmd.c_str(), args.data());
|
|
|
|
|
exit(EXIT_FAILURE);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -346,11 +417,12 @@ tailer::looper::host_tailer::host_tailer(const std::string &netloc,
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void tailer::looper::host_tailer::open_remote_path(const std::string& path)
|
|
|
|
|
void tailer::looper::host_tailer::open_remote_path(const std::string& path,
|
|
|
|
|
logfile_open_options loo)
|
|
|
|
|
{
|
|
|
|
|
this->ht_state.match(
|
|
|
|
|
[&](connected& conn) {
|
|
|
|
|
conn.c_desired_paths.insert(path);
|
|
|
|
|
conn.c_desired_paths[path] = std::move(loo);
|
|
|
|
|
send_packet(conn.ht_to_child.get(),
|
|
|
|
|
TPT_OPEN_PATH,
|
|
|
|
|
TPPT_STRING, path.c_str(),
|
|
|
|
@ -359,6 +431,9 @@ void tailer::looper::host_tailer::open_remote_path(const std::string& path)
|
|
|
|
|
[&](const disconnected& d) {
|
|
|
|
|
log_warning("disconnected from host, cannot tail: %s",
|
|
|
|
|
path.c_str());
|
|
|
|
|
},
|
|
|
|
|
[&](const synced& s) {
|
|
|
|
|
log_warning("synced with host, not tailing: %s", path.c_str());
|
|
|
|
|
}
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
@ -387,6 +462,9 @@ void tailer::looper::host_tailer::load_preview(int64_t id, const std::string &pa
|
|
|
|
|
.set_cylon(false)
|
|
|
|
|
.set_value(msg);
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
[&](const synced& s) {
|
|
|
|
|
require(false);
|
|
|
|
|
}
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
@ -403,6 +481,9 @@ void tailer::looper::host_tailer::complete_path(const std::string &path)
|
|
|
|
|
[&](const disconnected& d) {
|
|
|
|
|
log_warning("disconnected from host, cannot preview: %s",
|
|
|
|
|
path.c_str());
|
|
|
|
|
},
|
|
|
|
|
[&](const synced& s) {
|
|
|
|
|
require(false);
|
|
|
|
|
}
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
@ -469,6 +550,35 @@ void tailer::looper::host_tailer::loop_body()
|
|
|
|
|
log_debug("Got an offer: %s %lld - %lld", pob.pob_path.c_str(),
|
|
|
|
|
pob.pob_offset, pob.pob_length);
|
|
|
|
|
|
|
|
|
|
logfile_open_options loo;
|
|
|
|
|
if (pob.pob_path == pob.pob_root_path) {
|
|
|
|
|
auto root_iter = conn.c_desired_paths.find(pob.pob_path);
|
|
|
|
|
|
|
|
|
|
if (root_iter == conn.c_desired_paths.end()) {
|
|
|
|
|
log_warning("ignoring unknown root: %s",
|
|
|
|
|
pob.pob_root_path.c_str());
|
|
|
|
|
return std::move(this->ht_state);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
loo = std::move(root_iter->second);
|
|
|
|
|
} else {
|
|
|
|
|
auto child_iter = conn.c_child_paths.find(pob.pob_path);
|
|
|
|
|
if (child_iter == conn.c_child_paths.end()) {
|
|
|
|
|
auto root_iter = conn.c_desired_paths.find(pob.pob_root_path);
|
|
|
|
|
|
|
|
|
|
if (root_iter == conn.c_desired_paths.end()) {
|
|
|
|
|
log_warning("ignoring child of unknown root: %s",
|
|
|
|
|
pob.pob_root_path.c_str());
|
|
|
|
|
return std::move(this->ht_state);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
conn.c_child_paths[pob.pob_path] = std::move(root_iter->second);
|
|
|
|
|
child_iter = conn.c_child_paths.find(pob.pob_path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
loo = std::move(child_iter->second);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto remote_path = ghc::filesystem::absolute(
|
|
|
|
|
ghc::filesystem::path(pob.pob_path)).relative_path();
|
|
|
|
|
auto local_path = this->ht_local_path / remote_path;
|
|
|
|
@ -479,7 +589,7 @@ void tailer::looper::host_tailer::loop_body()
|
|
|
|
|
|
|
|
|
|
auto custom_name = this->get_display_path(pob.pob_path);
|
|
|
|
|
isc::to<main_looper &, services::main_t>()
|
|
|
|
|
.send([local_path, custom_name](auto &mlooper) {
|
|
|
|
|
.send([local_path, custom_name, loo](auto &mlooper) {
|
|
|
|
|
auto &active_fc = lnav_data.ld_active_files;
|
|
|
|
|
auto lpath_str = local_path.string();
|
|
|
|
|
|
|
|
|
@ -496,7 +606,8 @@ void tailer::looper::host_tailer::loop_body()
|
|
|
|
|
|
|
|
|
|
fc.fc_file_names[lpath_str]
|
|
|
|
|
.with_filename(custom_name)
|
|
|
|
|
.with_source(logfile_name_source::REMOTE);
|
|
|
|
|
.with_source(logfile_name_source::REMOTE)
|
|
|
|
|
.with_tail(loo.loo_tail);
|
|
|
|
|
update_active_files(fc);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
@ -575,6 +686,39 @@ void tailer::looper::host_tailer::loop_body()
|
|
|
|
|
}
|
|
|
|
|
return std::move(this->ht_state);
|
|
|
|
|
},
|
|
|
|
|
[&](const tailer::packet_synced &ps) {
|
|
|
|
|
if (ps.ps_root_path == ps.ps_path) {
|
|
|
|
|
auto iter = conn.c_desired_paths.find(ps.ps_path);
|
|
|
|
|
|
|
|
|
|
if (iter != conn.c_desired_paths.end()) {
|
|
|
|
|
if (!iter->second.loo_tail) {
|
|
|
|
|
log_info("synced desired path: %s",
|
|
|
|
|
iter->first.c_str());
|
|
|
|
|
conn.c_desired_paths.erase(iter);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
auto iter = conn.c_child_paths.find(ps.ps_path);
|
|
|
|
|
|
|
|
|
|
if (iter != conn.c_child_paths.end()) {
|
|
|
|
|
if (!iter->second.loo_tail) {
|
|
|
|
|
log_info("synced child path: %s",
|
|
|
|
|
iter->first.c_str());
|
|
|
|
|
conn.c_child_paths.erase(iter);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (conn.c_desired_paths.empty() &&
|
|
|
|
|
conn.c_child_paths.empty()) {
|
|
|
|
|
log_info("tailer(%s): all desired paths synced",
|
|
|
|
|
this->ht_netloc.c_str());
|
|
|
|
|
return state_v{synced{}};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return std::move(this->ht_state);
|
|
|
|
|
},
|
|
|
|
|
[&](const tailer::packet_link &pl) {
|
|
|
|
|
auto remote_path = ghc::filesystem::absolute(
|
|
|
|
|
ghc::filesystem::path(pl.pl_path)).relative_path();
|
|
|
|
@ -657,7 +801,7 @@ void tailer::looper::host_tailer::loop_body()
|
|
|
|
|
}
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if (this->ht_state.is<disconnected>()) {
|
|
|
|
|
if (!this->ht_state.is<connected>()) {
|
|
|
|
|
this->s_looping = false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -671,7 +815,7 @@ tailer::looper::host_tailer::compute_timeout(mstime_t current_time) const
|
|
|
|
|
|
|
|
|
|
void tailer::looper::host_tailer::stopped()
|
|
|
|
|
{
|
|
|
|
|
if (!this->ht_state.is<disconnected>()) {
|
|
|
|
|
if (this->ht_state.is<connected>()) {
|
|
|
|
|
this->ht_state = disconnected();
|
|
|
|
|
}
|
|
|
|
|
if (this->ht_error_reader.joinable()) {
|
|
|
|
@ -712,11 +856,45 @@ tailer::looper::child_finished(std::shared_ptr<service_base> child)
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (child_tailer->is_synced()) {
|
|
|
|
|
log_info("synced with netloc '%s', removing", iter->first.c_str());
|
|
|
|
|
auto netloc_iter = this->l_netlocs_to_paths.find(iter->first);
|
|
|
|
|
|
|
|
|
|
if (netloc_iter != this->l_netlocs_to_paths.end()) {
|
|
|
|
|
netloc_iter->second.send_synced_to_main(netloc_iter->first);
|
|
|
|
|
this->l_netlocs_to_paths.erase(netloc_iter);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
this->l_remotes.erase(iter);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
tailer::looper::remote_path_queue::send_synced_to_main(const std::string& netloc)
|
|
|
|
|
{
|
|
|
|
|
std::set<std::string> synced_files;
|
|
|
|
|
|
|
|
|
|
for (const auto& pair : this->rpq_new_paths) {
|
|
|
|
|
if (!pair.second.loo_tail) {
|
|
|
|
|
synced_files.emplace(fmt::format("{}{}", netloc, pair.first));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for (const auto& pair : this->rpq_existing_paths) {
|
|
|
|
|
if (!pair.second.loo_tail) {
|
|
|
|
|
synced_files.emplace(fmt::format("{}{}", netloc, pair.first));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
isc::to<main_looper&, services::main_t>()
|
|
|
|
|
.send([file_set = std::move(synced_files)](auto& mlooper) {
|
|
|
|
|
file_collection fc;
|
|
|
|
|
|
|
|
|
|
fc.fc_synced_files = file_set;
|
|
|
|
|
update_active_files(fc);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void tailer::looper::report_error(std::string path, std::string msg)
|
|
|
|
|
{
|
|
|
|
|
isc::to<main_looper&, services::main_t>()
|
|
|
|
|