You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lnav/src/tailer/tailer.looper.cc

928 lines
33 KiB
C++

/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#include <regex>
#include "base/humanize.network.hh"
#include "base/lnav_log.hh"
#include "base/paths.hh"
#include "tailer.looper.hh"
#include "tailer.looper.cfg.hh"
#include "tailer.h"
#include "tailerpp.hh"
#include "lnav.hh"
#include "service_tags.hh"
#include "line_buffer.hh"
#include "tailerbin.h"
using namespace std::chrono_literals;
static const auto HOST_RETRY_DELAY = 1min;
static void read_err_pipe(const std::string &netloc, auto_fd &err,
std::vector<std::string> &eq)
{
line_buffer lb;
file_range pipe_range;
bool done = false;
log_info("stderr reader started...");
lb.set_fd(err);
while (!done) {
auto load_res = lb.load_next_line(pipe_range);
if (load_res.isErr()) {
done = true;
} else {
auto li = load_res.unwrap();
pipe_range = li.li_file_range;
if (li.li_file_range.empty()) {
done = true;
} else {
lb.read_range(li.li_file_range).then([netloc, &eq](auto sbr) {
auto line_str = string_fragment(sbr.get_data(),
0,
sbr.length());
line_str.trim("\n");
if (eq.size() < 10) {
eq.template emplace_back(line_str.to_string());
}
log_debug("%.*s", line_str.length(), line_str.data());
});
}
}
}
}
void tailer::looper::loop_body()
{
auto now = std::chrono::steady_clock::now();
std::vector<std::string> to_erase;
for (auto& qpair : this->l_netlocs_to_paths) {
auto& netloc = qpair.first;
auto& rpq = qpair.second;
if (now < rpq.rpq_next_attempt_time) {
continue;
}
if (this->l_remotes.count(netloc) == 0) {
auto create_res = host_tailer::for_host(netloc);
if (create_res.isErr()) {
report_error(netloc, create_res.unwrapErr());
if (std::any_of(rpq.rpq_new_paths.begin(),
rpq.rpq_new_paths.end(),
[](const auto& pair) {
return !pair.second.loo_tail;
})) {
rpq.send_synced_to_main(netloc);
to_erase.push_back(netloc);
} else {
rpq.rpq_next_attempt_time = now + HOST_RETRY_DELAY;
}
continue;
}
auto ht = create_res.unwrap();
this->l_remotes[netloc] = ht;
this->s_children.add_child_service(ht);
rpq.rpq_new_paths.insert(rpq.rpq_existing_paths.begin(),
rpq.rpq_existing_paths.end());
rpq.rpq_existing_paths.clear();
}
if (!rpq.rpq_new_paths.empty()) {
log_debug("%s: new paths to monitor -- %s",
netloc.c_str(),
rpq.rpq_new_paths.begin()->first.c_str());
this->l_remotes[netloc]->send(
[paths = rpq.rpq_new_paths](auto &ht) {
for (const auto &pair : paths) {
log_debug("adding path to tailer -- %s",
pair.first.c_str());
ht.open_remote_path(pair.first, pair.second);
}
});
rpq.rpq_existing_paths.insert(rpq.rpq_new_paths.begin(),
rpq.rpq_new_paths.end());
rpq.rpq_new_paths.clear();
}
}
for (const auto& netloc : to_erase) {
this->l_netlocs_to_paths.erase(netloc);
}
}
void tailer::looper::add_remote(const network::path &path,
logfile_open_options options)
{
auto netloc_str = fmt::format("{}", path.home());
this->l_netlocs_to_paths[netloc_str].rpq_new_paths[path.p_path] =
std::move(options);
}
void tailer::looper::load_preview(int64_t id, const network::path& path)
{
auto netloc_str = fmt::format("{}", path.home());
auto iter = this->l_remotes.find(netloc_str);
if (iter == this->l_remotes.end()) {
auto create_res = host_tailer::for_host(netloc_str);
if (create_res.isErr()) {
auto msg = create_res.unwrapErr();
isc::to<main_looper&, services::main_t>()
.send([id, msg](auto& mlooper) {
if (lnav_data.ld_preview_generation != id) {
return;
}
lnav_data.ld_preview_status_source.get_description()
.set_cylon(false)
.clear();
lnav_data.ld_preview_source.clear();
lnav_data.ld_bottom_source.grep_error(msg);
});
return;
}
auto ht = create_res.unwrap();
this->l_remotes[netloc_str] = ht;
this->s_children.add_child_service(ht);
}
this->l_remotes[netloc_str]->send([id, file_path = path.p_path](auto &ht) {
ht.load_preview(id, file_path);
});
}
void tailer::looper::complete_path(const network::path& path)
{
auto netloc_str = fmt::format("{}", path.home());
auto iter = this->l_remotes.find(netloc_str);
if (iter == this->l_remotes.end()) {
auto create_res = host_tailer::for_host(netloc_str);
if (create_res.isErr()) {
return;
}
auto ht = create_res.unwrap();
this->l_remotes[netloc_str] = ht;
this->s_children.add_child_service(ht);
}
this->l_remotes[netloc_str]->send([file_path = path.p_path](auto &ht) {
ht.complete_path(file_path);
});
}
static std::vector<std::string>
create_ssh_args_from_config(const std::string& dest)
{
auto& cfg = injector::get<const tailer::config&>();
std::vector<std::string> retval;
retval.emplace_back(cfg.c_ssh_cmd);
if (!cfg.c_ssh_flags.empty()) {
if (startswith(cfg.c_ssh_flags, "-")) {
retval.emplace_back(cfg.c_ssh_flags);
} else {
retval.emplace_back(fmt::format("-{}", cfg.c_ssh_flags));
}
}
for (const auto& pair : cfg.c_ssh_options) {
if (pair.second.empty()) {
continue;
}
retval.emplace_back(fmt::format("-{}", pair.first));
retval.emplace_back(pair.second);
}
for (const auto& pair : cfg.c_ssh_config) {
if (pair.second.empty()) {
continue;
}
retval.emplace_back(fmt::format(
"-o{}={}", pair.first, pair.second));
}
retval.emplace_back(dest);
return retval;
}
Result<std::shared_ptr<tailer::looper::host_tailer>, std::string>
tailer::looper::host_tailer::for_host(const std::string& netloc)
{
log_debug("tailer(%s): transferring tailer to remote", netloc.c_str());
auto& cfg = injector::get<const tailer::config&>();
auto tailer_bin_name = fmt::format("tailer.bin.{}", getpid());
auto rp = humanize::network::path::from_str(netloc).value();
auto ssh_dest = rp.p_locality.l_hostname;
if (rp.p_locality.l_username.has_value()) {
ssh_dest = fmt::format("{}@{}",
rp.p_locality.l_username.value(),
rp.p_locality.l_hostname);
}
{
auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO));
auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO));
auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO));
auto child = TRY(lnav::pid::from_fork());
in_pipe.after_fork(child.in());
out_pipe.after_fork(child.in());
err_pipe.after_fork(child.in());
if (child.in_child()) {
auto arg_strs = create_ssh_args_from_config(ssh_dest);
std::vector<char *> args;
arg_strs.emplace_back(fmt::format(
"cat > {} && chmod ugo+rx ./{}", tailer_bin_name, tailer_bin_name));
fmt::print(stderr, "tailer({}): executing -- {}\n",
netloc,
fmt::join(arg_strs, " "));
for (const auto& arg : arg_strs) {
args.push_back((char *) arg.data());
}
args.push_back(nullptr);
execvp(cfg.c_ssh_cmd.c_str(), args.data());
exit(EXIT_FAILURE);
}
std::vector<std::string> error_queue;
log_debug("starting err reader");
std::thread err_reader([netloc, err = std::move(err_pipe.read_end()), &error_queue]() mutable {
log_set_thread_prefix(fmt::format("tailer({})", netloc));
read_err_pipe(netloc, err, error_queue);
});
log_debug("writing to child");
auto sf = tailer_bin[0].to_string_fragment();
ssize_t total_bytes = 0;
while (total_bytes < sf.length()) {
log_debug("attempting to write %d", sf.length() - total_bytes);
auto rc = write(in_pipe.write_end(), sf.data(), sf.length() - total_bytes);
log_debug("wrote %d", rc);
if (rc < 0) {
break;
}
total_bytes += rc;
}
in_pipe.write_end().reset();
while (true) {
char buffer[1024];
auto rc = read(out_pipe.read_end(), buffer, sizeof(buffer));
if (rc < 0) {
break;
}
if (rc == 0) {
break;
}
log_debug("tailer(%s): transfer output -- %.*s",
netloc.c_str(), rc, buffer);
}
auto finished_child = std::move(child).wait_for_child();
err_reader.join();
if (!finished_child.was_normal_exit() ||
finished_child.exit_status() != EXIT_SUCCESS) {
auto error_msg = error_queue.empty() ? "unknown" : error_queue.back();
return Err(fmt::format("failed to ssh to host: {}", error_msg));
}
}
auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO));
auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO));
auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO));
auto child = TRY(lnav::pid::from_fork());
in_pipe.after_fork(child.in());
out_pipe.after_fork(child.in());
err_pipe.after_fork(child.in());
if (child.in_child()) {
auto arg_strs = create_ssh_args_from_config(ssh_dest);
std::vector<char *> args;
arg_strs.emplace_back(fmt::format(
"./{}", tailer_bin_name, tailer_bin_name));
fmt::print(stderr, "tailer({}): executing -- {}\n",
netloc,
fmt::join(arg_strs, " "));
for (const auto& arg : arg_strs) {
args.push_back((char *) arg.data());
}
args.push_back(nullptr);
execvp(cfg.c_ssh_cmd.c_str(), args.data());
exit(EXIT_FAILURE);
}
return Ok(std::make_shared<host_tailer>(
netloc,
std::move(child),
std::move(in_pipe.write_end()),
std::move(out_pipe.read_end()),
std::move(err_pipe.read_end())
));
}
ghc::filesystem::path tailer::looper::host_tailer::tmp_path()
{
auto local_path = lnav::paths::workdir() / "remotes";
ghc::filesystem::create_directories(local_path);
auto_mem<char> resolved_path;
resolved_path = realpath(local_path.c_str(), nullptr);
if (resolved_path.in() == nullptr) {
return local_path;
}
return resolved_path.in();
}
static
std::string scrub_netloc(const std::string& netloc)
{
const static std::regex TO_SCRUB(R"([^\w\.\@])");
return std::regex_replace(netloc, TO_SCRUB, "_");
}
tailer::looper::host_tailer::host_tailer(const std::string &netloc,
auto_pid<process_state::RUNNING> child,
auto_fd to_child,
auto_fd from_child,
auto_fd err_from_child)
: isc::service<host_tailer>(netloc),
ht_netloc(netloc),
ht_local_path(tmp_path() / scrub_netloc(netloc)),
ht_error_reader([
netloc, err = std::move(err_from_child), &eq = this->ht_error_queue]() mutable {
read_err_pipe(netloc, err, eq);
}),
ht_state(connected{
std::move(child),
std::move(to_child),
std::move(from_child),
{}
})
{
}
void tailer::looper::host_tailer::open_remote_path(const std::string& path,
logfile_open_options loo)
{
this->ht_state.match(
[&](connected& conn) {
conn.c_desired_paths[path] = std::move(loo);
send_packet(conn.ht_to_child.get(),
TPT_OPEN_PATH,
TPPT_STRING, path.c_str(),
TPPT_DONE);
},
[&](const disconnected& d) {
log_warning("disconnected from host, cannot tail: %s",
path.c_str());
},
[&](const synced& s) {
log_warning("synced with host, not tailing: %s", path.c_str());
}
);
}
void tailer::looper::host_tailer::load_preview(int64_t id, const std::string &path)
{
this->ht_state.match(
[&](connected& conn) {
send_packet(conn.ht_to_child.get(),
TPT_LOAD_PREVIEW,
TPPT_STRING, path.c_str(),
TPPT_INT64, id,
TPPT_DONE);
},
[&](const disconnected& d) {
log_warning("disconnected from host, cannot preview: %s",
path.c_str());
auto msg = fmt::format("error: disconnected from {}", this->ht_netloc);
isc::to<main_looper&, services::main_t>()
.send([=](auto& mlooper) {
if (lnav_data.ld_preview_generation != id) {
return;
}
lnav_data.ld_preview_status_source.get_description()
.set_cylon(false)
.set_value(msg);
});
},
[&](const synced& s) {
require(false);
}
);
}
void tailer::looper::host_tailer::complete_path(const std::string &path)
{
this->ht_state.match(
[&](connected& conn) {
send_packet(conn.ht_to_child.get(),
TPT_COMPLETE_PATH,
TPPT_STRING, path.c_str(),
TPPT_DONE);
},
[&](const disconnected& d) {
log_warning("disconnected from host, cannot preview: %s",
path.c_str());
},
[&](const synced& s) {
require(false);
}
);
}
void tailer::looper::host_tailer::loop_body()
{
if (!this->ht_state.is<connected>()) {
return;
}
auto& conn = this->ht_state.get<connected>();
pollfd pfds[1];
pfds[0].fd = conn.ht_from_child.get();
pfds[0].events = POLLIN;
pfds[0].revents = 0;
auto ready_count = poll(pfds, 1, 100);
if (ready_count > 0) {
auto read_res = tailer::read_packet(conn.ht_from_child);
if (read_res.isErr()) {
log_error("read error: %s", read_res.unwrapErr().c_str());
exit(EXIT_FAILURE);
}
auto packet = read_res.unwrap();
this->ht_state = packet.match(
[&](const tailer::packet_eof &te) {
log_debug("all done!");
auto finished_child = std::move(conn).close();
if (finished_child.exit_status() != 0 &&
!this->ht_error_queue.empty()) {
report_error(this->ht_netloc,
this->ht_error_queue.back());
}
return state_v{disconnected()};
},
[&](const tailer::packet_log &pl) {
log_debug("%s\n", pl.pl_msg.c_str());
return std::move(this->ht_state);
},
[&](const tailer::packet_error &pe) {
log_debug("Got an error: %s -- %s", pe.pe_path.c_str(),
pe.pe_msg.c_str());
auto desired_iter = conn.c_desired_paths.find(pe.pe_path);
if (desired_iter != conn.c_desired_paths.end()) {
report_error(this->get_display_path(pe.pe_path), pe.pe_msg);
if (!desired_iter->second.loo_tail) {
conn.c_desired_paths.erase(desired_iter);
}
}
else {
auto child_iter = conn.c_child_paths.find(pe.pe_path);
if (child_iter != conn.c_child_paths.end() &&
!child_iter->second.loo_tail) {
conn.c_child_paths.erase(child_iter);
}
}
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(pe.pe_path)).relative_path();
auto local_path = this->ht_local_path / remote_path;
log_debug("removing %s", local_path.c_str());
this->ht_active_files.erase(local_path);
ghc::filesystem::remove_all(local_path);
if (conn.c_desired_paths.empty() &&
conn.c_child_paths.empty()) {
log_info("tailer(%s): all desired paths synced",
this->ht_netloc.c_str());
return state_v{synced{}};
}
return std::move(this->ht_state);
},
[&](const tailer::packet_offer_block &pob) {
log_debug("Got an offer: %s %lld - %lld", pob.pob_path.c_str(),
pob.pob_offset, pob.pob_length);
logfile_open_options loo;
if (pob.pob_path == pob.pob_root_path) {
auto root_iter = conn.c_desired_paths.find(pob.pob_path);
if (root_iter == conn.c_desired_paths.end()) {
log_warning("ignoring unknown root: %s",
pob.pob_root_path.c_str());
return std::move(this->ht_state);
}
loo = std::move(root_iter->second);
} else {
auto child_iter = conn.c_child_paths.find(pob.pob_path);
if (child_iter == conn.c_child_paths.end()) {
auto root_iter = conn.c_desired_paths.find(pob.pob_root_path);
if (root_iter == conn.c_desired_paths.end()) {
log_warning("ignoring child of unknown root: %s",
pob.pob_root_path.c_str());
return std::move(this->ht_state);
}
conn.c_child_paths[pob.pob_path] = std::move(root_iter->second);
child_iter = conn.c_child_paths.find(pob.pob_path);
}
loo = std::move(child_iter->second);
}
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(pob.pob_path)).relative_path();
auto local_path = this->ht_local_path / remote_path;
auto fd = auto_fd(::open(local_path.c_str(), O_RDONLY));
if (this->ht_active_files.count(local_path) == 0) {
this->ht_active_files.insert(local_path);
auto custom_name = this->get_display_path(pob.pob_path);
isc::to<main_looper &, services::main_t>()
.send([local_path, custom_name, loo](auto &mlooper) {
auto &active_fc = lnav_data.ld_active_files;
auto lpath_str = local_path.string();
if (active_fc.fc_file_names.count(lpath_str) > 0) {
log_debug("already in fc_file_names");
return;
}
if (active_fc.fc_closed_files.count(custom_name) > 0) {
log_debug("in closed");
return;
}
file_collection fc;
fc.fc_file_names[lpath_str]
.with_filename(custom_name)
.with_source(logfile_name_source::REMOTE)
.with_tail(loo.loo_tail);
update_active_files(fc);
});
}
if (fd == -1) {
log_debug("file not found, sending need block");
send_packet(conn.ht_to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING, pob.pob_path.c_str(),
TPPT_DONE);
return std::move(this->ht_state);
}
struct stat st;
if (fstat(fd, &st) == -1 || !S_ISREG(st.st_mode)) {
log_debug("path changed, sending need block");
ghc::filesystem::remove_all(local_path);
send_packet(conn.ht_to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING, pob.pob_path.c_str(),
TPPT_DONE);
return std::move(this->ht_state);
}
auto_mem<char> buffer;
buffer = (char *) malloc(pob.pob_length);
auto bytes_read = pread(fd, buffer, pob.pob_length,
pob.pob_offset);
if (bytes_read == pob.pob_length) {
tailer::hash_frag thf;
calc_sha_256(thf.thf_hash, buffer, bytes_read);
if (thf == pob.pob_hash) {
log_debug("local file block is same, sending ack");
send_packet(conn.ht_to_child.get(),
TPT_ACK_BLOCK,
TPPT_STRING, pob.pob_path.c_str(),
TPPT_DONE);
return std::move(this->ht_state);
}
log_debug("local file is different, sending need block");
} else if (bytes_read == -1) {
log_debug("unable to read file, sending need block -- %s",
strerror(errno));
ghc::filesystem::remove_all(local_path);
}
send_packet(conn.ht_to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING, pob.pob_path.c_str(),
TPPT_DONE);
return std::move(this->ht_state);
},
[&](const tailer::packet_tail_block &ptb) {
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(ptb.ptb_path)).relative_path();
auto local_path = this->ht_local_path / remote_path;
log_debug("writing tail to: %lld/%ld %s",
ptb.ptb_offset,
ptb.ptb_bits.size(),
local_path.c_str());
ghc::filesystem::create_directories(local_path.parent_path());
auto fd = auto_fd(
::open(local_path.c_str(), O_WRONLY | O_APPEND | O_CREAT,
0600));
if (fd == -1) {
log_error("open: %s", strerror(errno));
} else {
ftruncate(fd, ptb.ptb_offset);
pwrite(fd,
ptb.ptb_bits.data(), ptb.ptb_bits.size(),
ptb.ptb_offset);
}
return std::move(this->ht_state);
},
[&](const tailer::packet_synced &ps) {
if (ps.ps_root_path == ps.ps_path) {
auto iter = conn.c_desired_paths.find(ps.ps_path);
if (iter != conn.c_desired_paths.end()) {
if (!iter->second.loo_tail) {
log_info("synced desired path: %s",
iter->first.c_str());
conn.c_desired_paths.erase(iter);
}
}
} else {
auto iter = conn.c_child_paths.find(ps.ps_path);
if (iter != conn.c_child_paths.end()) {
if (!iter->second.loo_tail) {
log_info("synced child path: %s",
iter->first.c_str());
conn.c_child_paths.erase(iter);
}
}
}
if (conn.c_desired_paths.empty() &&
conn.c_child_paths.empty()) {
log_info("tailer(%s): all desired paths synced",
this->ht_netloc.c_str());
return state_v{synced{}};
}
return std::move(this->ht_state);
},
[&](const tailer::packet_link &pl) {
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(pl.pl_path)).relative_path();
auto local_path = this->ht_local_path / remote_path;
auto remote_link_path = ghc::filesystem::path(pl.pl_link_value);
std::string link_path;
if (remote_path.is_absolute()) {
auto local_link_path =
this->ht_local_path / remote_link_path.relative_path();
link_path = local_link_path.string();
} else {
link_path = remote_link_path.string();
}
log_debug("symlinking %s -> %s",
local_path.c_str(),
link_path.c_str());
ghc::filesystem::create_directories(local_path.parent_path());
ghc::filesystem::remove_all(local_path);
if (symlink(link_path.c_str(), local_path.c_str()) < 0) {
log_error("symlink failed: %s", strerror(errno));
}
return std::move(this->ht_state);
},
[&](const tailer::packet_preview_error &ppe) {
isc::to<main_looper&, services::main_t>()
.send([ppe](auto& mlooper) {
if (lnav_data.ld_preview_generation != ppe.ppe_id) {
log_debug("preview ID mismatch: %lld != %lld",
lnav_data.ld_preview_generation,
ppe.ppe_id);
return;
}
lnav_data.ld_preview_status_source.get_description()
.set_cylon(false)
.clear();
lnav_data.ld_preview_source.clear();
lnav_data.ld_bottom_source.grep_error(ppe.ppe_msg);
});
return std::move(this->ht_state);
},
[&](const tailer::packet_preview_data &ppd) {
isc::to<main_looper&, services::main_t>()
.send([netloc = this->ht_netloc, ppd](auto& mlooper) {
if (lnav_data.ld_preview_generation != ppd.ppd_id) {
log_debug("preview ID mismatch: %lld != %lld",
lnav_data.ld_preview_generation,
ppd.ppd_id);
return;
}
std::string str(ppd.ppd_bits.begin(),
ppd.ppd_bits.end());
lnav_data.ld_preview_status_source.get_description()
.set_cylon(false)
.set_value("For file: %s:%s",
netloc.c_str(),
ppd.ppd_path.c_str());
lnav_data.ld_preview_source
.replace_with(str)
.set_text_format(detect_text_format(str));
});
return std::move(this->ht_state);
},
[&](const tailer::packet_possible_path &ppp) {
log_debug("possible path: %s", ppp.ppp_path.c_str());
auto full_path = fmt::format("{}{}",
this->ht_netloc,
ppp.ppp_path);
isc::to<main_looper&, services::main_t>()
.send([full_path](auto& mlooper) {
lnav_data.ld_rl_view->add_possibility(
LNM_COMMAND, "remote-path", full_path);
});
return std::move(this->ht_state);
}
);
if (!this->ht_state.is<connected>()) {
this->s_looping = false;
}
}
}
std::chrono::milliseconds
tailer::looper::host_tailer::compute_timeout(mstime_t current_time) const
{
return 0s;
}
void tailer::looper::host_tailer::stopped()
{
if (this->ht_state.is<connected>()) {
this->ht_state = disconnected();
}
if (this->ht_error_reader.joinable()) {
this->ht_error_reader.join();
}
}
std::string tailer::looper::host_tailer::get_display_path(const std::string& remote_path) const
{
return fmt::format("{}{}", this->ht_netloc, remote_path);
}
void *tailer::looper::host_tailer::run()
{
log_set_thread_prefix(fmt::format("tailer({})", this->ht_netloc));
return service_base::run();
}
auto_pid<process_state::FINISHED>
tailer::looper::host_tailer::connected::close() &&
{
this->ht_to_child.reset();
this->ht_from_child.reset();
return std::move(this->ht_child).wait_for_child();
}
void
tailer::looper::child_finished(std::shared_ptr<service_base> child)
{
auto child_tailer = std::static_pointer_cast<host_tailer>(child);
for (auto iter = this->l_remotes.begin();
iter != this->l_remotes.end();
++iter) {
if (iter->second != child_tailer) {
continue;
}
if (child_tailer->is_synced()) {
log_info("synced with netloc '%s', removing", iter->first.c_str());
auto netloc_iter = this->l_netlocs_to_paths.find(iter->first);
if (netloc_iter != this->l_netlocs_to_paths.end()) {
netloc_iter->second.send_synced_to_main(netloc_iter->first);
this->l_netlocs_to_paths.erase(netloc_iter);
}
}
this->l_remotes.erase(iter);
return;
}
}
void
tailer::looper::remote_path_queue::send_synced_to_main(const std::string& netloc)
{
std::set<std::string> synced_files;
for (const auto& pair : this->rpq_new_paths) {
if (!pair.second.loo_tail) {
synced_files.emplace(fmt::format("{}{}", netloc, pair.first));
}
}
for (const auto& pair : this->rpq_existing_paths) {
if (!pair.second.loo_tail) {
synced_files.emplace(fmt::format("{}{}", netloc, pair.first));
}
}
isc::to<main_looper&, services::main_t>()
.send([file_set = std::move(synced_files)](auto& mlooper) {
file_collection fc;
fc.fc_synced_files = file_set;
update_active_files(fc);
});
}
void tailer::looper::report_error(std::string path, std::string msg)
{
isc::to<main_looper&, services::main_t>()
.send([=](auto& mlooper) {
file_collection fc;
fc.fc_name_to_errors[path] = msg;
update_active_files(fc);
});
}