You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lnav/src/tailer/tailer.looper.cc

1196 lines
44 KiB
C++

/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <regex>
#include "tailer.looper.hh"
#include "base/fs_util.hh"
#include "base/humanize.network.hh"
#include "base/lnav_log.hh"
#include "base/paths.hh"
#include "config.h"
#include "line_buffer.hh"
#include "lnav.hh"
#include "lnav.indexing.hh"
#include "service_tags.hh"
#include "tailer.h"
#include "tailer.looper.cfg.hh"
#include "tailerbin.h"
#include "tailerpp.hh"
using namespace std::chrono_literals;
static const auto HOST_RETRY_DELAY = 1min;
static void
read_err_pipe(const std::string& netloc,
auto_fd& err,
std::vector<std::string>& eq)
{
line_buffer lb;
file_range pipe_range;
bool done = false;
log_info("stderr reader started...");
lb.set_fd(err);
while (!done) {
auto load_res = lb.load_next_line(pipe_range);
if (load_res.isErr()) {
done = true;
} else {
auto li = load_res.unwrap();
pipe_range = li.li_file_range;
if (li.li_file_range.empty()) {
done = true;
} else {
lb.read_range(li.li_file_range).then([netloc, &eq](auto sbr) {
auto line_str
= string_fragment(sbr.get_data(), 0, sbr.length())
.trim("\n");
if (eq.size() < 10) {
eq.template emplace_back(line_str.to_string());
}
auto level = line_str.startswith("error:")
? lnav_log_level_t::ERROR
: line_str.startswith("warning:")
? lnav_log_level_t::WARNING
: line_str.startswith("info:")
? lnav_log_level_t::INFO
: lnav_log_level_t::DEBUG;
log_msg_wrapper(level,
"tailer[%s] %.*s",
netloc.c_str(),
line_str.length(),
line_str.data());
});
}
}
}
}
static void
update_tailer_progress(const std::string& netloc, const std::string& msg)
{
lnav_data.ld_active_files.fc_progress->writeAccess()
->sp_tailers[netloc]
.tp_message
= msg;
}
static void
update_tailer_description(
const std::string& netloc,
const std::map<std::string, logfile_open_options_base>& desired_paths,
const std::string& remote_uname)
{
std::vector<std::string> paths;
for (const auto& des_pair : desired_paths) {
paths.emplace_back(
fmt::format(FMT_STRING("{}{}"), netloc, des_pair.first));
}
isc::to<main_looper&, services::main_t>().send(
[netloc, paths, remote_uname](auto& mlooper) {
auto& fc = lnav_data.ld_active_files;
for (const auto& path : paths) {
auto iter = fc.fc_other_files.find(path);
if (iter == fc.fc_other_files.end()) {
continue;
}
iter->second.ofd_description = remote_uname;
}
fc.fc_name_to_errors->writeAccess()->erase(netloc);
});
}
void
tailer::looper::loop_body()
{
auto now = std::chrono::steady_clock::now();
std::vector<std::string> to_erase;
for (auto& qpair : this->l_netlocs_to_paths) {
auto& netloc = qpair.first;
auto& rpq = qpair.second;
if (now < rpq.rpq_next_attempt_time) {
continue;
}
if (this->l_remotes.count(netloc) == 0) {
auto create_res = host_tailer::for_host(netloc);
if (create_res.isErr()) {
report_error(netloc, create_res.unwrapErr());
if (std::any_of(
rpq.rpq_new_paths.begin(),
rpq.rpq_new_paths.end(),
[](const auto& pair) { return !pair.second.loo_tail; }))
{
rpq.send_synced_to_main(netloc);
to_erase.push_back(netloc);
} else {
rpq.rpq_next_attempt_time = now + HOST_RETRY_DELAY;
}
continue;
}
auto ht = create_res.unwrap();
this->l_remotes[netloc] = ht;
this->s_children.add_child_service(ht);
rpq.rpq_new_paths.insert(rpq.rpq_existing_paths.begin(),
rpq.rpq_existing_paths.end());
rpq.rpq_existing_paths.clear();
}
if (!rpq.rpq_new_paths.empty()) {
log_debug("%s: new paths to monitor -- %s",
netloc.c_str(),
rpq.rpq_new_paths.begin()->first.c_str());
this->l_remotes[netloc]->send(
[paths = rpq.rpq_new_paths](auto& ht) {
for (const auto& pair : paths) {
log_debug("adding path to tailer -- %s",
pair.first.c_str());
ht.open_remote_path(pair.first, std::move(pair.second));
}
});
rpq.rpq_existing_paths.insert(rpq.rpq_new_paths.begin(),
rpq.rpq_new_paths.end());
rpq.rpq_new_paths.clear();
}
}
for (const auto& netloc : to_erase) {
this->l_netlocs_to_paths.erase(netloc);
}
}
void
tailer::looper::add_remote(const network::path& path,
logfile_open_options_base options)
{
auto netloc_str = fmt::to_string(path.home());
this->l_netlocs_to_paths[netloc_str].rpq_new_paths[path.p_path]
= std::move(options);
}
void
tailer::looper::load_preview(int64_t id, const network::path& path)
{
auto netloc_str = fmt::to_string(path.home());
auto iter = this->l_remotes.find(netloc_str);
if (iter == this->l_remotes.end()) {
auto create_res = host_tailer::for_host(netloc_str);
if (create_res.isErr()) {
auto msg = create_res.unwrapErr();
isc::to<main_looper&, services::main_t>().send(
[id, msg](auto& mlooper) {
if (lnav_data.ld_preview_generation != id) {
return;
}
lnav_data.ld_preview_status_source[0]
.get_description()
.set_cylon(false)
.clear();
lnav_data.ld_preview_source[0].clear();
lnav_data.ld_bottom_source.grep_error(msg);
});
return;
}
auto ht = create_res.unwrap();
this->l_remotes[netloc_str] = ht;
this->s_children.add_child_service(ht);
}
this->l_remotes[netloc_str]->send([id, file_path = path.p_path](auto& ht) {
ht.load_preview(id, file_path);
});
}
void
tailer::looper::complete_path(const network::path& path)
{
auto netloc_str = fmt::to_string(path.home());
auto iter = this->l_remotes.find(netloc_str);
if (iter == this->l_remotes.end()) {
auto create_res = host_tailer::for_host(netloc_str);
if (create_res.isErr()) {
return;
}
auto ht = create_res.unwrap();
this->l_remotes[netloc_str] = ht;
this->s_children.add_child_service(ht);
}
this->l_remotes[netloc_str]->send(
[file_path = path.p_path](auto& ht) { ht.complete_path(file_path); });
}
static std::vector<std::string>
create_ssh_args_from_config(const std::string& dest)
{
const auto& cfg = injector::get<const tailer::config&>();
std::vector<std::string> retval;
retval.emplace_back(cfg.c_ssh_cmd);
if (!cfg.c_ssh_flags.empty()) {
if (startswith(cfg.c_ssh_flags, "-")) {
retval.emplace_back(cfg.c_ssh_flags);
} else {
retval.emplace_back(
fmt::format(FMT_STRING("-{}"), cfg.c_ssh_flags));
}
}
for (const auto& pair : cfg.c_ssh_options) {
if (pair.second.empty()) {
continue;
}
retval.emplace_back(fmt::format(FMT_STRING("-{}"), pair.first));
retval.emplace_back(pair.second);
}
for (const auto& pair : cfg.c_ssh_config) {
if (pair.second.empty()) {
continue;
}
retval.emplace_back(
fmt::format(FMT_STRING("-o{}={}"), pair.first, pair.second));
}
retval.emplace_back(dest);
return retval;
}
Result<std::shared_ptr<tailer::looper::host_tailer>, std::string>
tailer::looper::host_tailer::for_host(const std::string& netloc)
{
log_debug("tailer(%s): transferring tailer to remote", netloc.c_str());
update_tailer_progress(netloc, "Transferring tailer...");
auto& cfg = injector::get<const tailer::config&>();
auto tailer_bin_name = fmt::format(FMT_STRING("tailer.bin.{}"), getpid());
auto rp = humanize::network::path::from_str(netloc).value();
auto ssh_dest = rp.p_locality.l_hostname;
if (rp.p_locality.l_username.has_value()) {
ssh_dest = fmt::format(FMT_STRING("{}@{}"),
rp.p_locality.l_username.value(),
rp.p_locality.l_hostname);
}
{
auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO));
auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO));
auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO));
auto child = TRY(lnav::pid::from_fork());
in_pipe.after_fork(child.in());
out_pipe.after_fork(child.in());
err_pipe.after_fork(child.in());
if (child.in_child()) {
auto arg_strs = create_ssh_args_from_config(ssh_dest);
std::vector<char*> args;
arg_strs.emplace_back(
fmt::format(cfg.c_transfer_cmd, tailer_bin_name));
fmt::print(stderr,
"tailer({}): executing -- {}\n",
netloc,
fmt::join(arg_strs, " "));
for (const auto& arg : arg_strs) {
args.push_back((char*) arg.data());
}
args.push_back(nullptr);
execvp(cfg.c_ssh_cmd.c_str(), args.data());
_exit(EXIT_FAILURE);
}
std::vector<std::string> error_queue;
log_debug("tailer(%s): starting err reader", netloc.c_str());
std::thread err_reader([netloc,
err = std::move(err_pipe.read_end()),
&error_queue]() mutable {
log_set_thread_prefix(
fmt::format(FMT_STRING("tailer({})"), netloc));
read_err_pipe(netloc, err, error_queue);
});
log_debug("tailer(%s): writing to child", netloc.c_str());
auto sf = tailer_bin[0].to_string_fragment();
ssize_t total_bytes = 0;
bool write_failed = false;
while (total_bytes < sf.length()) {
log_debug("attempting to write %d", sf.length() - total_bytes);
auto rc = write(
in_pipe.write_end(), sf.data(), sf.length() - total_bytes);
if (rc < 0) {
log_error(" tailer(%s): write failed -- %s",
netloc.c_str(),
strerror(errno));
write_failed = true;
break;
}
log_debug(" wrote %d", rc);
total_bytes += rc;
}
in_pipe.write_end().reset();
while (!write_failed) {
char buffer[1024];
auto rc = read(out_pipe.read_end(), buffer, sizeof(buffer));
if (rc < 0) {
break;
}
if (rc == 0) {
break;
}
log_debug("tailer(%s): transfer output -- %.*s",
netloc.c_str(),
rc,
buffer);
}
auto finished_child = std::move(child).wait_for_child();
err_reader.join();
if (!finished_child.was_normal_exit()
|| finished_child.exit_status() != EXIT_SUCCESS)
{
auto error_msg = error_queue.empty() ? "unknown"
: error_queue.back();
return Err(fmt::format(FMT_STRING("failed to ssh to host: {}"),
error_msg));
}
}
update_tailer_progress(netloc, "Starting tailer...");
auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO));
auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO));
auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO));
auto child = TRY(lnav::pid::from_fork());
in_pipe.after_fork(child.in());
out_pipe.after_fork(child.in());
err_pipe.after_fork(child.in());
if (child.in_child()) {
auto arg_strs = create_ssh_args_from_config(ssh_dest);
std::vector<char*> args;
arg_strs.emplace_back(fmt::format(cfg.c_start_cmd, tailer_bin_name));
fmt::print(stderr,
FMT_STRING("tailer({}): executing -- {}\n"),
netloc,
fmt::join(arg_strs, " "));
for (const auto& arg : arg_strs) {
args.push_back((char*) arg.data());
}
args.push_back(nullptr);
execvp(cfg.c_ssh_cmd.c_str(), args.data());
_exit(EXIT_FAILURE);
}
return Ok(std::make_shared<host_tailer>(netloc,
std::move(child),
std::move(in_pipe.write_end()),
std::move(out_pipe.read_end()),
std::move(err_pipe.read_end())));
}
static ghc::filesystem::path
remote_cache_path()
{
return lnav::paths::workdir() / "remotes";
}
ghc::filesystem::path
tailer::looper::host_tailer::tmp_path()
{
auto local_path = remote_cache_path();
ghc::filesystem::create_directories(local_path);
auto_mem<char> resolved_path;
resolved_path = realpath(local_path.c_str(), nullptr);
if (resolved_path.in() == nullptr) {
return local_path;
}
return resolved_path.in();
}
static std::string
scrub_netloc(const std::string& netloc)
{
const static std::regex TO_SCRUB(R"([^\w\.\@])");
return std::regex_replace(netloc, TO_SCRUB, "_");
}
tailer::looper::host_tailer::host_tailer(const std::string& netloc,
auto_pid<process_state::running> child,
auto_fd to_child,
auto_fd from_child,
auto_fd err_from_child)
: isc::service<host_tailer>(netloc), ht_netloc(netloc),
ht_local_path(tmp_path() / scrub_netloc(netloc)),
ht_error_reader([netloc,
err = std::move(err_from_child),
&eq = this->ht_error_queue]() mutable {
read_err_pipe(netloc, err, eq);
}),
ht_state(connected{
std::move(child), std::move(to_child), std::move(from_child), {}})
{
}
void
tailer::looper::host_tailer::open_remote_path(const std::string& path,
logfile_open_options_base loo)
{
this->ht_state.match(
[&](connected& conn) {
conn.c_desired_paths[path] = std::move(loo);
send_packet(conn.ht_to_child.get(),
TPT_OPEN_PATH,
TPPT_STRING,
path.c_str(),
TPPT_DONE);
},
[&](const disconnected& d) {
log_warning("disconnected from host, cannot tail: %s",
path.c_str());
},
[&](const synced& s) {
log_warning("synced with host, not tailing: %s", path.c_str());
});
}
void
tailer::looper::host_tailer::load_preview(int64_t id, const std::string& path)
{
this->ht_state.match(
[&](connected& conn) {
send_packet(conn.ht_to_child.get(),
TPT_LOAD_PREVIEW,
TPPT_STRING,
path.c_str(),
TPPT_INT64,
id,
TPPT_DONE);
},
[&](const disconnected& d) {
log_warning("disconnected from host, cannot preview: %s",
path.c_str());
auto msg = fmt::format(FMT_STRING("error: disconnected from {}"),
this->ht_netloc);
isc::to<main_looper&, services::main_t>().send([=](auto& mlooper) {
if (lnav_data.ld_preview_generation != id) {
return;
}
lnav_data.ld_preview_status_source[0]
.get_description()
.set_cylon(false)
.set_value(msg);
});
},
[&](const synced& s) { require(false); });
}
void
tailer::looper::host_tailer::complete_path(const std::string& path)
{
this->ht_state.match(
[&](connected& conn) {
send_packet(conn.ht_to_child.get(),
TPT_COMPLETE_PATH,
TPPT_STRING,
path.c_str(),
TPPT_DONE);
},
[&](const disconnected& d) {
log_warning("disconnected from host, cannot preview: %s",
path.c_str());
},
[&](const synced& s) { require(false); });
}
void
tailer::looper::host_tailer::loop_body()
{
const static uint64_t TOUCH_FREQ = 10000;
if (!this->ht_state.is<connected>()) {
return;
}
this->ht_cycle_count += 1;
if (this->ht_cycle_count % TOUCH_FREQ == 0) {
auto now
= ghc::filesystem::file_time_type{std::chrono::system_clock::now()};
ghc::filesystem::last_write_time(this->ht_local_path, now);
}
auto& conn = this->ht_state.get<connected>();
pollfd pfds[1];
pfds[0].fd = conn.ht_from_child.get();
pfds[0].events = POLLIN;
pfds[0].revents = 0;
auto ready_count = poll(pfds, 1, 100);
if (ready_count > 0) {
auto read_res = tailer::read_packet(conn.ht_from_child);
if (read_res.isErr()) {
log_error("read error: %s", read_res.unwrapErr().c_str());
return;
}
auto packet = read_res.unwrap();
this->ht_state = packet.match(
[&](const tailer::packet_eof& te) {
log_debug("all done!");
auto finished_child = std::move(conn).close();
if (finished_child.exit_status() != 0
&& !this->ht_error_queue.empty())
{
report_error(this->ht_netloc, this->ht_error_queue.back());
}
return state_v{disconnected()};
},
[&](const tailer::packet_announce& pa) {
update_tailer_description(
this->ht_netloc, conn.c_desired_paths, pa.pa_uname);
this->ht_uname = pa.pa_uname;
return std::move(this->ht_state);
},
[&](const tailer::packet_log& pl) {
log_debug("%s\n", pl.pl_msg.c_str());
return std::move(this->ht_state);
},
[&](const tailer::packet_error& pe) {
log_debug("Got an error: %s -- %s",
pe.pe_path.c_str(),
pe.pe_msg.c_str());
lnav_data.ld_active_files.fc_progress->writeAccess()
->sp_tailers.erase(this->ht_netloc);
auto desired_iter = conn.c_desired_paths.find(pe.pe_path);
if (desired_iter != conn.c_desired_paths.end()) {
report_error(this->get_display_path(pe.pe_path), pe.pe_msg);
if (!desired_iter->second.loo_tail) {
conn.c_desired_paths.erase(desired_iter);
}
} else {
auto child_iter = conn.c_child_paths.find(pe.pe_path);
if (child_iter != conn.c_child_paths.end()
&& !child_iter->second.loo_tail)
{
conn.c_child_paths.erase(child_iter);
}
}
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(pe.pe_path))
.relative_path();
auto local_path = this->ht_local_path / remote_path;
log_debug("removing %s", local_path.c_str());
this->ht_active_files.erase(local_path);
ghc::filesystem::remove_all(local_path);
if (conn.c_desired_paths.empty() && conn.c_child_paths.empty())
{
log_info("tailer(%s): all desired paths synced",
this->ht_netloc.c_str());
return state_v{synced{}};
}
return std::move(this->ht_state);
},
[&](const tailer::packet_offer_block& pob) {
log_debug("Got an offer: %s %lld - %lld",
pob.pob_path.c_str(),
pob.pob_offset,
pob.pob_length);
logfile_open_options_base loo;
if (pob.pob_path == pob.pob_root_path) {
auto root_iter = conn.c_desired_paths.find(pob.pob_path);
if (root_iter == conn.c_desired_paths.end()) {
log_warning("ignoring unknown root: %s",
pob.pob_root_path.c_str());
return std::move(this->ht_state);
}
loo = root_iter->second;
} else {
auto child_iter = conn.c_child_paths.find(pob.pob_path);
if (child_iter == conn.c_child_paths.end()) {
auto root_iter
= conn.c_desired_paths.find(pob.pob_root_path);
if (root_iter == conn.c_desired_paths.end()) {
log_warning("ignoring child of unknown root: %s",
pob.pob_root_path.c_str());
return std::move(this->ht_state);
}
conn.c_child_paths[pob.pob_path]
= std::move(root_iter->second);
child_iter = conn.c_child_paths.find(pob.pob_path);
}
loo = std::move(child_iter->second);
}
update_tailer_description(
this->ht_netloc, conn.c_desired_paths, this->ht_uname);
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(pob.pob_path))
.relative_path();
auto local_path = this->ht_local_path / remote_path;
auto open_res
= lnav::filesystem::open_file(local_path, O_RDONLY);
if (this->ht_active_files.count(local_path) == 0) {
this->ht_active_files.insert(local_path);
auto custom_name = this->get_display_path(pob.pob_path);
isc::to<main_looper&, services::main_t>().send(
[local_path,
custom_name,
loo,
netloc = this->ht_netloc](auto& mlooper) {
auto& active_fc = lnav_data.ld_active_files;
auto lpath_str = local_path.string();
{
safe::WriteAccess<safe_scan_progress> sp(
*active_fc.fc_progress);
sp->sp_tailers.erase(netloc);
}
if (active_fc.fc_file_names.count(lpath_str) > 0) {
log_debug("already in fc_file_names");
return;
}
if (active_fc.fc_closed_files.count(custom_name)
> 0) {
log_debug("in closed");
return;
}
file_collection fc;
fc.fc_file_names[lpath_str]
.with_filename(custom_name)
.with_source(logfile_name_source::REMOTE)
.with_tail(loo.loo_tail)
.with_non_utf_visibility(false)
.with_visible_size_limit(256 * 1024);
update_active_files(fc);
});
}
if (open_res.isErr()) {
log_debug("file not found (%s), sending need block",
open_res.unwrapErr().c_str());
send_packet(conn.ht_to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING,
pob.pob_path.c_str(),
TPPT_DONE);
return std::move(this->ht_state);
}
auto fd = open_res.unwrap();
struct stat st;
if (fstat(fd, &st) == -1 || !S_ISREG(st.st_mode)) {
log_debug("path changed, sending need block");
ghc::filesystem::remove_all(local_path);
send_packet(conn.ht_to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING,
pob.pob_path.c_str(),
TPPT_DONE);
return std::move(this->ht_state);
}
if (st.st_size == pob.pob_offset) {
log_debug("local file is synced, sending need block");
send_packet(conn.ht_to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING,
pob.pob_path.c_str(),
TPPT_DONE);
return std::move(this->ht_state);
}
constexpr int64_t BUFFER_SIZE = 4 * 1024 * 1024;
auto buffer = auto_mem<unsigned char>::malloc(BUFFER_SIZE);
auto remaining = pob.pob_length;
auto remaining_offset = pob.pob_offset;
tailer::hash_frag thf;
SHA256_CTX shactx;
sha256_init(&shactx);
log_debug("checking offer %s[%lld..+%lld]",
local_path.c_str(),
remaining_offset,
remaining);
while (remaining > 0) {
auto nbytes = std::min(remaining, BUFFER_SIZE);
auto bytes_read
= pread(fd, buffer, nbytes, remaining_offset);
if (bytes_read == -1) {
log_debug(
"unable to read file, sending need block -- %s",
strerror(errno));
ghc::filesystem::remove_all(local_path);
break;
}
if (bytes_read == 0) {
break;
}
sha256_update(&shactx, buffer.in(), bytes_read);
remaining -= bytes_read;
remaining_offset += bytes_read;
}
if (remaining == 0) {
sha256_final(&shactx, thf.thf_hash);
if (thf == pob.pob_hash) {
log_debug("local file block is same, sending ack");
send_packet(conn.ht_to_child.get(),
TPT_ACK_BLOCK,
TPPT_STRING,
pob.pob_path.c_str(),
TPPT_INT64,
pob.pob_offset,
TPPT_INT64,
pob.pob_length,
TPPT_INT64,
(int64_t) st.st_size,
TPPT_DONE);
return std::move(this->ht_state);
}
log_debug("local file is different, sending need block");
}
send_packet(conn.ht_to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING,
pob.pob_path.c_str(),
TPPT_DONE);
return std::move(this->ht_state);
},
[&](const tailer::packet_tail_block& ptb) {
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(ptb.ptb_path))
.relative_path();
auto local_path = this->ht_local_path / remote_path;
log_debug("writing tail to: %lld/%ld %s",
ptb.ptb_offset,
ptb.ptb_bits.size(),
local_path.c_str());
ghc::filesystem::create_directories(local_path.parent_path());
auto create_res = lnav::filesystem::create_file(
local_path, O_WRONLY | O_APPEND | O_CREAT, 0600);
if (create_res.isErr()) {
log_error("open: %s", create_res.unwrapErr().c_str());
} else {
auto fd = create_res.unwrap();
ftruncate(fd, ptb.ptb_offset);
pwrite(fd,
ptb.ptb_bits.data(),
ptb.ptb_bits.size(),
ptb.ptb_offset);
auto mtime = ghc::filesystem::file_time_type{
std::chrono::seconds{ptb.ptb_mtime}};
// XXX This isn't atomic with the write...
ghc::filesystem::last_write_time(local_path, mtime);
}
return std::move(this->ht_state);
},
[&](const tailer::packet_synced& ps) {
if (ps.ps_root_path == ps.ps_path) {
auto iter = conn.c_desired_paths.find(ps.ps_path);
if (iter != conn.c_desired_paths.end()) {
if (iter->second.loo_tail) {
conn.c_synced_desired_paths.insert(ps.ps_path);
} else {
log_info("synced desired path: %s",
iter->first.c_str());
conn.c_desired_paths.erase(iter);
}
}
} else {
auto iter = conn.c_child_paths.find(ps.ps_path);
if (iter != conn.c_child_paths.end()) {
if (iter->second.loo_tail) {
conn.c_synced_child_paths.insert(ps.ps_path);
} else {
log_info("synced child path: %s",
iter->first.c_str());
conn.c_child_paths.erase(iter);
}
}
}
if (conn.c_desired_paths.empty() && conn.c_child_paths.empty())
{
log_info("tailer(%s): all desired paths synced",
this->ht_netloc.c_str());
return state_v{synced{}};
} else if (!conn.c_initial_sync_done
&& conn.c_desired_paths.size()
== conn.c_synced_desired_paths.size()
&& conn.c_child_paths.size()
== conn.c_synced_child_paths.size())
{
log_info("tailer(%s): all desired paths synced",
this->ht_netloc.c_str());
conn.c_initial_sync_done = true;
std::set<std::string> synced_files;
for (const auto& desired_pair : conn.c_desired_paths) {
synced_files.emplace(fmt::format(
FMT_STRING("{}{}"), ht_netloc, desired_pair.first));
}
isc::to<main_looper&, services::main_t>().send(
[file_set = std::move(synced_files)](auto& mlooper) {
file_collection fc;
fc.fc_synced_files = file_set;
update_active_files(fc);
});
}
return std::move(this->ht_state);
},
[&](const tailer::packet_link& pl) {
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(pl.pl_path))
.relative_path();
auto local_path = this->ht_local_path / remote_path;
auto remote_link_path = ghc::filesystem::path(pl.pl_link_value);
std::string link_path;
if (remote_link_path.is_absolute()) {
auto local_link_path = this->ht_local_path
/ remote_link_path.relative_path();
link_path = local_link_path.string();
} else {
link_path = remote_link_path.string();
}
log_debug("symlinking %s -> %s",
local_path.c_str(),
link_path.c_str());
ghc::filesystem::create_directories(local_path.parent_path());
ghc::filesystem::remove_all(local_path);
if (symlink(link_path.c_str(), local_path.c_str()) < 0) {
log_error("symlink failed: %s", strerror(errno));
}
if (pl.pl_root_path == pl.pl_path) {
auto iter = conn.c_desired_paths.find(pl.pl_path);
if (iter != conn.c_desired_paths.end()) {
if (iter->second.loo_tail) {
conn.c_synced_desired_paths.insert(pl.pl_path);
} else {
log_info("synced desired path: %s",
iter->first.c_str());
conn.c_desired_paths.erase(iter);
}
}
} else {
auto iter = conn.c_child_paths.find(pl.pl_path);
if (iter != conn.c_child_paths.end()) {
if (iter->second.loo_tail) {
conn.c_synced_child_paths.insert(pl.pl_path);
} else {
log_info("synced child path: %s",
iter->first.c_str());
conn.c_child_paths.erase(iter);
}
}
}
return std::move(this->ht_state);
},
[&](const tailer::packet_preview_error& ppe) {
isc::to<main_looper&, services::main_t>().send(
[ppe](auto& mlooper) {
if (lnav_data.ld_preview_generation != ppe.ppe_id) {
log_debug("preview ID mismatch: %lld != %lld",
lnav_data.ld_preview_generation,
ppe.ppe_id);
return;
}
lnav_data.ld_preview_status_source[0]
.get_description()
.set_cylon(false)
.clear();
lnav_data.ld_preview_source[0].clear();
lnav_data.ld_bottom_source.grep_error(ppe.ppe_msg);
});
return std::move(this->ht_state);
},
[&](const tailer::packet_preview_data& ppd) {
isc::to<main_looper&, services::main_t>().send(
[netloc = this->ht_netloc, ppd](auto& mlooper) {
if (lnav_data.ld_preview_generation != ppd.ppd_id) {
log_debug("preview ID mismatch: %lld != %lld",
lnav_data.ld_preview_generation,
ppd.ppd_id);
return;
}
std::string str(ppd.ppd_bits.begin(),
ppd.ppd_bits.end());
lnav_data.ld_preview_status_source[0]
.get_description()
.set_cylon(false)
.set_value("For file: %s:%s",
netloc.c_str(),
ppd.ppd_path.c_str());
lnav_data.ld_preview_source[0]
.replace_with(str)
.set_text_format(detect_text_format(str));
});
return std::move(this->ht_state);
},
[&](const tailer::packet_possible_path& ppp) {
log_debug("possible path: %s", ppp.ppp_path.c_str());
auto full_path = fmt::format(
FMT_STRING("{}{}"), this->ht_netloc, ppp.ppp_path);
isc::to<main_looper&, services::main_t>().send(
[full_path](auto& mlooper) {
lnav_data.ld_rl_view->add_possibility(
ln_mode_t::COMMAND, "remote-path", full_path);
});
return std::move(this->ht_state);
});
if (!this->ht_state.is<connected>()) {
this->s_looping = false;
}
}
}
std::chrono::milliseconds
tailer::looper::host_tailer::compute_timeout(mstime_t current_time) const
{
return 0s;
}
void
tailer::looper::host_tailer::stopped()
{
if (this->ht_state.is<connected>()) {
this->ht_state = disconnected();
}
if (this->ht_error_reader.joinable()) {
this->ht_error_reader.join();
}
}
std::string
tailer::looper::host_tailer::get_display_path(
const std::string& remote_path) const
{
return fmt::format(FMT_STRING("{}{}"), this->ht_netloc, remote_path);
}
void*
tailer::looper::host_tailer::run()
{
log_set_thread_prefix(
fmt::format(FMT_STRING("tailer({})"), this->ht_netloc));
return service_base::run();
}
auto_pid<process_state::finished>
tailer::looper::host_tailer::connected::close() &&
{
this->ht_to_child.reset();
this->ht_from_child.reset();
return std::move(this->ht_child).wait_for_child();
}
void
tailer::looper::child_finished(std::shared_ptr<service_base> child)
{
auto child_tailer = std::static_pointer_cast<host_tailer>(child);
for (auto iter = this->l_remotes.begin(); iter != this->l_remotes.end();
++iter)
{
if (iter->second != child_tailer) {
continue;
}
if (child_tailer->is_synced()) {
log_info("synced with netloc '%s', removing", iter->first.c_str());
auto netloc_iter = this->l_netlocs_to_paths.find(iter->first);
if (netloc_iter != this->l_netlocs_to_paths.end()) {
netloc_iter->second.send_synced_to_main(netloc_iter->first);
this->l_netlocs_to_paths.erase(netloc_iter);
}
}
lnav_data.ld_active_files.fc_progress->writeAccess()->sp_tailers.erase(
iter->first);
this->l_remotes.erase(iter);
return;
}
}
void
tailer::looper::remote_path_queue::send_synced_to_main(
const std::string& netloc)
{
std::set<std::string> synced_files;
for (const auto& pair : this->rpq_new_paths) {
if (!pair.second.loo_tail) {
synced_files.emplace(
fmt::format(FMT_STRING("{}{}"), netloc, pair.first));
}
}
for (const auto& pair : this->rpq_existing_paths) {
if (!pair.second.loo_tail) {
synced_files.emplace(
fmt::format(FMT_STRING("{}{}"), netloc, pair.first));
}
}
isc::to<main_looper&, services::main_t>().send(
[file_set = std::move(synced_files)](auto& mlooper) {
file_collection fc;
fc.fc_synced_files = file_set;
update_active_files(fc);
});
}
void
tailer::looper::report_error(std::string path, std::string msg)
{
log_error("reporting error: %s -- %s", path.c_str(), msg.c_str());
isc::to<main_looper&, services::main_t>().send([=](auto& mlooper) {
file_collection fc;
fc.fc_name_to_errors->writeAccess()->emplace(path,
file_error_info{
{},
msg,
});
update_active_files(fc);
lnav_data.ld_active_files.fc_progress->writeAccess()->sp_tailers.erase(
path);
});
}
void
tailer::cleanup_cache()
{
(void) std::async(std::launch::async, []() {
auto now = std::chrono::system_clock::now();
auto cache_path = remote_cache_path();
const auto& cfg = injector::get<const config&>();
std::vector<ghc::filesystem::path> to_remove;
log_debug("cache-ttl %d", cfg.c_cache_ttl.count());
for (const auto& entry :
ghc::filesystem::directory_iterator(cache_path))
{
auto mtime = ghc::filesystem::last_write_time(entry.path());
auto exp_time = mtime + cfg.c_cache_ttl;
if (now < exp_time) {
continue;
}
to_remove.emplace_back(entry.path());
}
for (auto& entry : to_remove) {
log_debug("removing cached remote: %s", entry.c_str());
ghc::filesystem::remove_all(entry);
}
});
}