mirror of https://github.com/tstack/lnav
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1191 lines
43 KiB
C++
1191 lines
43 KiB
C++
/**
|
|
* Copyright (c) 2021, Timothy Stack
|
|
*
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright notice, this
|
|
* list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above copyright notice,
|
|
* this list of conditions and the following disclaimer in the documentation
|
|
* and/or other materials provided with the distribution.
|
|
* * Neither the name of Timothy Stack nor the names of its contributors
|
|
* may be used to endorse or promote products derived from this software
|
|
* without specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
|
|
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
|
|
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
|
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
|
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
|
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
#include <regex>
|
|
|
|
#include "tailer.looper.hh"
|
|
|
|
#include "base/fs_util.hh"
|
|
#include "base/humanize.network.hh"
|
|
#include "base/lnav_log.hh"
|
|
#include "base/paths.hh"
|
|
#include "config.h"
|
|
#include "line_buffer.hh"
|
|
#include "lnav.hh"
|
|
#include "lnav.indexing.hh"
|
|
#include "service_tags.hh"
|
|
#include "tailer.h"
|
|
#include "tailer.looper.cfg.hh"
|
|
#include "tailerbin.h"
|
|
#include "tailerpp.hh"
|
|
|
|
using namespace std::chrono_literals;
|
|
|
|
static const auto HOST_RETRY_DELAY = 1min;
|
|
|
|
static void
|
|
read_err_pipe(const std::string& netloc,
|
|
auto_fd& err,
|
|
std::vector<std::string>& eq)
|
|
{
|
|
line_buffer lb;
|
|
file_range pipe_range;
|
|
bool done = false;
|
|
|
|
log_info("stderr reader started...");
|
|
lb.set_fd(err);
|
|
while (!done) {
|
|
auto load_res = lb.load_next_line(pipe_range);
|
|
|
|
if (load_res.isErr()) {
|
|
done = true;
|
|
} else {
|
|
auto li = load_res.unwrap();
|
|
|
|
pipe_range = li.li_file_range;
|
|
if (li.li_file_range.empty()) {
|
|
done = true;
|
|
} else {
|
|
lb.read_range(li.li_file_range).then([netloc, &eq](auto sbr) {
|
|
auto line_str
|
|
= string_fragment(sbr.get_data(), 0, sbr.length())
|
|
.trim("\n");
|
|
if (eq.size() < 10) {
|
|
eq.template emplace_back(line_str.to_string());
|
|
}
|
|
|
|
auto level = line_str.startswith("error:")
|
|
? lnav_log_level_t::ERROR
|
|
: line_str.startswith("warning:")
|
|
? lnav_log_level_t::WARNING
|
|
: line_str.startswith("info:")
|
|
? lnav_log_level_t::INFO
|
|
: lnav_log_level_t::DEBUG;
|
|
log_msg_wrapper(level,
|
|
"tailer[%s] %.*s",
|
|
netloc.c_str(),
|
|
line_str.length(),
|
|
line_str.data());
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
update_tailer_progress(const std::string& netloc, const std::string& msg)
|
|
{
|
|
lnav_data.ld_active_files.fc_progress->writeAccess()
|
|
->sp_tailers[netloc]
|
|
.tp_message
|
|
= msg;
|
|
}
|
|
|
|
static void
|
|
update_tailer_description(
|
|
const std::string& netloc,
|
|
const std::map<std::string, logfile_open_options_base>& desired_paths,
|
|
const std::string& remote_uname)
|
|
{
|
|
std::vector<std::string> paths;
|
|
|
|
for (const auto& des_pair : desired_paths) {
|
|
paths.emplace_back(
|
|
fmt::format(FMT_STRING("{}{}"), netloc, des_pair.first));
|
|
}
|
|
isc::to<main_looper&, services::main_t>().send(
|
|
[netloc, paths, remote_uname](auto& mlooper) {
|
|
auto& fc = lnav_data.ld_active_files;
|
|
|
|
for (const auto& path : paths) {
|
|
auto iter = fc.fc_other_files.find(path);
|
|
|
|
if (iter == fc.fc_other_files.end()) {
|
|
continue;
|
|
}
|
|
|
|
iter->second.ofd_description = remote_uname;
|
|
}
|
|
fc.fc_name_to_errors.erase(netloc);
|
|
});
|
|
}
|
|
|
|
void
|
|
tailer::looper::loop_body()
|
|
{
|
|
auto now = std::chrono::steady_clock::now();
|
|
std::vector<std::string> to_erase;
|
|
|
|
for (auto& qpair : this->l_netlocs_to_paths) {
|
|
auto& netloc = qpair.first;
|
|
auto& rpq = qpair.second;
|
|
|
|
if (now < rpq.rpq_next_attempt_time) {
|
|
continue;
|
|
}
|
|
if (this->l_remotes.count(netloc) == 0) {
|
|
auto create_res = host_tailer::for_host(netloc);
|
|
|
|
if (create_res.isErr()) {
|
|
report_error(netloc, create_res.unwrapErr());
|
|
if (std::any_of(
|
|
rpq.rpq_new_paths.begin(),
|
|
rpq.rpq_new_paths.end(),
|
|
[](const auto& pair) { return !pair.second.loo_tail; }))
|
|
{
|
|
rpq.send_synced_to_main(netloc);
|
|
to_erase.push_back(netloc);
|
|
} else {
|
|
rpq.rpq_next_attempt_time = now + HOST_RETRY_DELAY;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
auto ht = create_res.unwrap();
|
|
this->l_remotes[netloc] = ht;
|
|
this->s_children.add_child_service(ht);
|
|
|
|
rpq.rpq_new_paths.insert(rpq.rpq_existing_paths.begin(),
|
|
rpq.rpq_existing_paths.end());
|
|
rpq.rpq_existing_paths.clear();
|
|
}
|
|
|
|
if (!rpq.rpq_new_paths.empty()) {
|
|
log_debug("%s: new paths to monitor -- %s",
|
|
netloc.c_str(),
|
|
rpq.rpq_new_paths.begin()->first.c_str());
|
|
this->l_remotes[netloc]->send(
|
|
[paths = rpq.rpq_new_paths](auto& ht) {
|
|
for (const auto& pair : paths) {
|
|
log_debug("adding path to tailer -- %s",
|
|
pair.first.c_str());
|
|
ht.open_remote_path(pair.first, std::move(pair.second));
|
|
}
|
|
});
|
|
|
|
rpq.rpq_existing_paths.insert(rpq.rpq_new_paths.begin(),
|
|
rpq.rpq_new_paths.end());
|
|
rpq.rpq_new_paths.clear();
|
|
}
|
|
}
|
|
|
|
for (const auto& netloc : to_erase) {
|
|
this->l_netlocs_to_paths.erase(netloc);
|
|
}
|
|
}
|
|
|
|
void
|
|
tailer::looper::add_remote(const network::path& path,
|
|
logfile_open_options_base options)
|
|
{
|
|
auto netloc_str = fmt::to_string(path.home());
|
|
this->l_netlocs_to_paths[netloc_str].rpq_new_paths[path.p_path]
|
|
= std::move(options);
|
|
}
|
|
|
|
void
|
|
tailer::looper::load_preview(int64_t id, const network::path& path)
|
|
{
|
|
auto netloc_str = fmt::to_string(path.home());
|
|
auto iter = this->l_remotes.find(netloc_str);
|
|
|
|
if (iter == this->l_remotes.end()) {
|
|
auto create_res = host_tailer::for_host(netloc_str);
|
|
|
|
if (create_res.isErr()) {
|
|
auto msg = create_res.unwrapErr();
|
|
isc::to<main_looper&, services::main_t>().send(
|
|
[id, msg](auto& mlooper) {
|
|
if (lnav_data.ld_preview_generation != id) {
|
|
return;
|
|
}
|
|
lnav_data.ld_preview_status_source.get_description()
|
|
.set_cylon(false)
|
|
.clear();
|
|
lnav_data.ld_preview_source.clear();
|
|
lnav_data.ld_bottom_source.grep_error(msg);
|
|
});
|
|
return;
|
|
}
|
|
|
|
auto ht = create_res.unwrap();
|
|
this->l_remotes[netloc_str] = ht;
|
|
this->s_children.add_child_service(ht);
|
|
}
|
|
|
|
this->l_remotes[netloc_str]->send([id, file_path = path.p_path](auto& ht) {
|
|
ht.load_preview(id, file_path);
|
|
});
|
|
}
|
|
|
|
void
|
|
tailer::looper::complete_path(const network::path& path)
|
|
{
|
|
auto netloc_str = fmt::to_string(path.home());
|
|
auto iter = this->l_remotes.find(netloc_str);
|
|
|
|
if (iter == this->l_remotes.end()) {
|
|
auto create_res = host_tailer::for_host(netloc_str);
|
|
|
|
if (create_res.isErr()) {
|
|
return;
|
|
}
|
|
|
|
auto ht = create_res.unwrap();
|
|
this->l_remotes[netloc_str] = ht;
|
|
this->s_children.add_child_service(ht);
|
|
}
|
|
|
|
this->l_remotes[netloc_str]->send(
|
|
[file_path = path.p_path](auto& ht) { ht.complete_path(file_path); });
|
|
}
|
|
|
|
static std::vector<std::string>
|
|
create_ssh_args_from_config(const std::string& dest)
|
|
{
|
|
const auto& cfg = injector::get<const tailer::config&>();
|
|
std::vector<std::string> retval;
|
|
|
|
retval.emplace_back(cfg.c_ssh_cmd);
|
|
if (!cfg.c_ssh_flags.empty()) {
|
|
if (startswith(cfg.c_ssh_flags, "-")) {
|
|
retval.emplace_back(cfg.c_ssh_flags);
|
|
} else {
|
|
retval.emplace_back(
|
|
fmt::format(FMT_STRING("-{}"), cfg.c_ssh_flags));
|
|
}
|
|
}
|
|
for (const auto& pair : cfg.c_ssh_options) {
|
|
if (pair.second.empty()) {
|
|
continue;
|
|
}
|
|
retval.emplace_back(fmt::format(FMT_STRING("-{}"), pair.first));
|
|
retval.emplace_back(pair.second);
|
|
}
|
|
for (const auto& pair : cfg.c_ssh_config) {
|
|
if (pair.second.empty()) {
|
|
continue;
|
|
}
|
|
retval.emplace_back(
|
|
fmt::format(FMT_STRING("-o{}={}"), pair.first, pair.second));
|
|
}
|
|
retval.emplace_back(dest);
|
|
|
|
return retval;
|
|
}
|
|
|
|
Result<std::shared_ptr<tailer::looper::host_tailer>, std::string>
|
|
tailer::looper::host_tailer::for_host(const std::string& netloc)
|
|
{
|
|
log_debug("tailer(%s): transferring tailer to remote", netloc.c_str());
|
|
|
|
update_tailer_progress(netloc, "Transferring tailer...");
|
|
|
|
auto& cfg = injector::get<const tailer::config&>();
|
|
auto tailer_bin_name = fmt::format(FMT_STRING("tailer.bin.{}"), getpid());
|
|
|
|
auto rp = humanize::network::path::from_str(netloc).value();
|
|
auto ssh_dest = rp.p_locality.l_hostname;
|
|
if (rp.p_locality.l_username.has_value()) {
|
|
ssh_dest = fmt::format(FMT_STRING("{}@{}"),
|
|
rp.p_locality.l_username.value(),
|
|
rp.p_locality.l_hostname);
|
|
}
|
|
|
|
{
|
|
auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO));
|
|
auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO));
|
|
auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO));
|
|
auto child = TRY(lnav::pid::from_fork());
|
|
|
|
in_pipe.after_fork(child.in());
|
|
out_pipe.after_fork(child.in());
|
|
err_pipe.after_fork(child.in());
|
|
|
|
if (child.in_child()) {
|
|
auto arg_strs = create_ssh_args_from_config(ssh_dest);
|
|
std::vector<char*> args;
|
|
|
|
arg_strs.emplace_back(
|
|
fmt::format(cfg.c_transfer_cmd, tailer_bin_name));
|
|
|
|
fmt::print(stderr,
|
|
"tailer({}): executing -- {}\n",
|
|
netloc,
|
|
fmt::join(arg_strs, " "));
|
|
for (const auto& arg : arg_strs) {
|
|
args.push_back((char*) arg.data());
|
|
}
|
|
args.push_back(nullptr);
|
|
|
|
execvp(cfg.c_ssh_cmd.c_str(), args.data());
|
|
_exit(EXIT_FAILURE);
|
|
}
|
|
|
|
std::vector<std::string> error_queue;
|
|
log_debug("tailer(%s): starting err reader", netloc.c_str());
|
|
std::thread err_reader([netloc,
|
|
err = std::move(err_pipe.read_end()),
|
|
&error_queue]() mutable {
|
|
log_set_thread_prefix(
|
|
fmt::format(FMT_STRING("tailer({})"), netloc));
|
|
read_err_pipe(netloc, err, error_queue);
|
|
});
|
|
|
|
log_debug("tailer(%s): writing to child", netloc.c_str());
|
|
auto sf = tailer_bin[0].to_string_fragment();
|
|
ssize_t total_bytes = 0;
|
|
bool write_failed = false;
|
|
|
|
while (total_bytes < sf.length()) {
|
|
log_debug("attempting to write %d", sf.length() - total_bytes);
|
|
auto rc = write(
|
|
in_pipe.write_end(), sf.data(), sf.length() - total_bytes);
|
|
|
|
if (rc < 0) {
|
|
log_error(" tailer(%s): write failed -- %s",
|
|
netloc.c_str(),
|
|
strerror(errno));
|
|
write_failed = true;
|
|
break;
|
|
}
|
|
log_debug(" wrote %d", rc);
|
|
total_bytes += rc;
|
|
}
|
|
|
|
in_pipe.write_end().reset();
|
|
|
|
while (!write_failed) {
|
|
char buffer[1024];
|
|
|
|
auto rc = read(out_pipe.read_end(), buffer, sizeof(buffer));
|
|
if (rc < 0) {
|
|
break;
|
|
}
|
|
if (rc == 0) {
|
|
break;
|
|
}
|
|
log_debug("tailer(%s): transfer output -- %.*s",
|
|
netloc.c_str(),
|
|
rc,
|
|
buffer);
|
|
}
|
|
|
|
auto finished_child = std::move(child).wait_for_child();
|
|
|
|
err_reader.join();
|
|
if (!finished_child.was_normal_exit()
|
|
|| finished_child.exit_status() != EXIT_SUCCESS)
|
|
{
|
|
auto error_msg = error_queue.empty() ? "unknown"
|
|
: error_queue.back();
|
|
return Err(fmt::format(FMT_STRING("failed to ssh to host: {}"),
|
|
error_msg));
|
|
}
|
|
}
|
|
|
|
update_tailer_progress(netloc, "Starting tailer...");
|
|
|
|
auto in_pipe = TRY(auto_pipe::for_child_fd(STDIN_FILENO));
|
|
auto out_pipe = TRY(auto_pipe::for_child_fd(STDOUT_FILENO));
|
|
auto err_pipe = TRY(auto_pipe::for_child_fd(STDERR_FILENO));
|
|
auto child = TRY(lnav::pid::from_fork());
|
|
|
|
in_pipe.after_fork(child.in());
|
|
out_pipe.after_fork(child.in());
|
|
err_pipe.after_fork(child.in());
|
|
|
|
if (child.in_child()) {
|
|
auto arg_strs = create_ssh_args_from_config(ssh_dest);
|
|
std::vector<char*> args;
|
|
|
|
arg_strs.emplace_back(fmt::format(cfg.c_start_cmd, tailer_bin_name));
|
|
|
|
fmt::print(stderr,
|
|
FMT_STRING("tailer({}): executing -- {}\n"),
|
|
netloc,
|
|
fmt::join(arg_strs, " "));
|
|
for (const auto& arg : arg_strs) {
|
|
args.push_back((char*) arg.data());
|
|
}
|
|
args.push_back(nullptr);
|
|
|
|
execvp(cfg.c_ssh_cmd.c_str(), args.data());
|
|
_exit(EXIT_FAILURE);
|
|
}
|
|
|
|
return Ok(std::make_shared<host_tailer>(netloc,
|
|
std::move(child),
|
|
std::move(in_pipe.write_end()),
|
|
std::move(out_pipe.read_end()),
|
|
std::move(err_pipe.read_end())));
|
|
}
|
|
|
|
static ghc::filesystem::path
|
|
remote_cache_path()
|
|
{
|
|
return lnav::paths::workdir() / "remotes";
|
|
}
|
|
|
|
ghc::filesystem::path
|
|
tailer::looper::host_tailer::tmp_path()
|
|
{
|
|
auto local_path = remote_cache_path();
|
|
|
|
ghc::filesystem::create_directories(local_path);
|
|
auto_mem<char> resolved_path;
|
|
|
|
resolved_path = realpath(local_path.c_str(), nullptr);
|
|
if (resolved_path.in() == nullptr) {
|
|
return local_path;
|
|
}
|
|
|
|
return resolved_path.in();
|
|
}
|
|
|
|
static std::string
|
|
scrub_netloc(const std::string& netloc)
|
|
{
|
|
const static std::regex TO_SCRUB(R"([^\w\.\@])");
|
|
|
|
return std::regex_replace(netloc, TO_SCRUB, "_");
|
|
}
|
|
|
|
tailer::looper::host_tailer::host_tailer(const std::string& netloc,
|
|
auto_pid<process_state::running> child,
|
|
auto_fd to_child,
|
|
auto_fd from_child,
|
|
auto_fd err_from_child)
|
|
: isc::service<host_tailer>(netloc), ht_netloc(netloc),
|
|
ht_local_path(tmp_path() / scrub_netloc(netloc)),
|
|
ht_error_reader([netloc,
|
|
err = std::move(err_from_child),
|
|
&eq = this->ht_error_queue]() mutable {
|
|
read_err_pipe(netloc, err, eq);
|
|
}),
|
|
ht_state(connected{
|
|
std::move(child), std::move(to_child), std::move(from_child), {}})
|
|
{
|
|
}
|
|
|
|
void
|
|
tailer::looper::host_tailer::open_remote_path(const std::string& path,
|
|
logfile_open_options_base loo)
|
|
{
|
|
this->ht_state.match(
|
|
[&](connected& conn) {
|
|
conn.c_desired_paths[path] = std::move(loo);
|
|
send_packet(conn.ht_to_child.get(),
|
|
TPT_OPEN_PATH,
|
|
TPPT_STRING,
|
|
path.c_str(),
|
|
TPPT_DONE);
|
|
},
|
|
[&](const disconnected& d) {
|
|
log_warning("disconnected from host, cannot tail: %s",
|
|
path.c_str());
|
|
},
|
|
[&](const synced& s) {
|
|
log_warning("synced with host, not tailing: %s", path.c_str());
|
|
});
|
|
}
|
|
|
|
void
|
|
tailer::looper::host_tailer::load_preview(int64_t id, const std::string& path)
|
|
{
|
|
this->ht_state.match(
|
|
[&](connected& conn) {
|
|
send_packet(conn.ht_to_child.get(),
|
|
TPT_LOAD_PREVIEW,
|
|
TPPT_STRING,
|
|
path.c_str(),
|
|
TPPT_INT64,
|
|
id,
|
|
TPPT_DONE);
|
|
},
|
|
[&](const disconnected& d) {
|
|
log_warning("disconnected from host, cannot preview: %s",
|
|
path.c_str());
|
|
|
|
auto msg = fmt::format(FMT_STRING("error: disconnected from {}"),
|
|
this->ht_netloc);
|
|
isc::to<main_looper&, services::main_t>().send([=](auto& mlooper) {
|
|
if (lnav_data.ld_preview_generation != id) {
|
|
return;
|
|
}
|
|
lnav_data.ld_preview_status_source.get_description()
|
|
.set_cylon(false)
|
|
.set_value(msg);
|
|
});
|
|
},
|
|
[&](const synced& s) { require(false); });
|
|
}
|
|
|
|
void
|
|
tailer::looper::host_tailer::complete_path(const std::string& path)
|
|
{
|
|
this->ht_state.match(
|
|
[&](connected& conn) {
|
|
send_packet(conn.ht_to_child.get(),
|
|
TPT_COMPLETE_PATH,
|
|
TPPT_STRING,
|
|
path.c_str(),
|
|
TPPT_DONE);
|
|
},
|
|
[&](const disconnected& d) {
|
|
log_warning("disconnected from host, cannot preview: %s",
|
|
path.c_str());
|
|
},
|
|
[&](const synced& s) { require(false); });
|
|
}
|
|
|
|
void
|
|
tailer::looper::host_tailer::loop_body()
|
|
{
|
|
const static uint64_t TOUCH_FREQ = 10000;
|
|
|
|
if (!this->ht_state.is<connected>()) {
|
|
return;
|
|
}
|
|
|
|
this->ht_cycle_count += 1;
|
|
if (this->ht_cycle_count % TOUCH_FREQ == 0) {
|
|
auto now
|
|
= ghc::filesystem::file_time_type{std::chrono::system_clock::now()};
|
|
ghc::filesystem::last_write_time(this->ht_local_path, now);
|
|
}
|
|
|
|
auto& conn = this->ht_state.get<connected>();
|
|
|
|
pollfd pfds[1];
|
|
|
|
pfds[0].fd = conn.ht_from_child.get();
|
|
pfds[0].events = POLLIN;
|
|
pfds[0].revents = 0;
|
|
|
|
auto ready_count = poll(pfds, 1, 100);
|
|
if (ready_count > 0) {
|
|
auto read_res = tailer::read_packet(conn.ht_from_child);
|
|
|
|
if (read_res.isErr()) {
|
|
log_error("read error: %s", read_res.unwrapErr().c_str());
|
|
return;
|
|
}
|
|
|
|
auto packet = read_res.unwrap();
|
|
this->ht_state = packet.match(
|
|
[&](const tailer::packet_eof& te) {
|
|
log_debug("all done!");
|
|
|
|
auto finished_child = std::move(conn).close();
|
|
if (finished_child.exit_status() != 0
|
|
&& !this->ht_error_queue.empty())
|
|
{
|
|
report_error(this->ht_netloc, this->ht_error_queue.back());
|
|
}
|
|
|
|
return state_v{disconnected()};
|
|
},
|
|
[&](const tailer::packet_announce& pa) {
|
|
update_tailer_description(
|
|
this->ht_netloc, conn.c_desired_paths, pa.pa_uname);
|
|
this->ht_uname = pa.pa_uname;
|
|
return std::move(this->ht_state);
|
|
},
|
|
[&](const tailer::packet_log& pl) {
|
|
log_debug("%s\n", pl.pl_msg.c_str());
|
|
return std::move(this->ht_state);
|
|
},
|
|
[&](const tailer::packet_error& pe) {
|
|
log_debug("Got an error: %s -- %s",
|
|
pe.pe_path.c_str(),
|
|
pe.pe_msg.c_str());
|
|
|
|
lnav_data.ld_active_files.fc_progress->writeAccess()
|
|
->sp_tailers.erase(this->ht_netloc);
|
|
|
|
auto desired_iter = conn.c_desired_paths.find(pe.pe_path);
|
|
if (desired_iter != conn.c_desired_paths.end()) {
|
|
report_error(this->get_display_path(pe.pe_path), pe.pe_msg);
|
|
if (!desired_iter->second.loo_tail) {
|
|
conn.c_desired_paths.erase(desired_iter);
|
|
}
|
|
} else {
|
|
auto child_iter = conn.c_child_paths.find(pe.pe_path);
|
|
|
|
if (child_iter != conn.c_child_paths.end()
|
|
&& !child_iter->second.loo_tail)
|
|
{
|
|
conn.c_child_paths.erase(child_iter);
|
|
}
|
|
}
|
|
|
|
auto remote_path = ghc::filesystem::absolute(
|
|
ghc::filesystem::path(pe.pe_path))
|
|
.relative_path();
|
|
auto local_path = this->ht_local_path / remote_path;
|
|
|
|
log_debug("removing %s", local_path.c_str());
|
|
this->ht_active_files.erase(local_path);
|
|
ghc::filesystem::remove_all(local_path);
|
|
|
|
if (conn.c_desired_paths.empty() && conn.c_child_paths.empty())
|
|
{
|
|
log_info("tailer(%s): all desired paths synced",
|
|
this->ht_netloc.c_str());
|
|
return state_v{synced{}};
|
|
}
|
|
|
|
return std::move(this->ht_state);
|
|
},
|
|
[&](const tailer::packet_offer_block& pob) {
|
|
log_debug("Got an offer: %s %lld - %lld",
|
|
pob.pob_path.c_str(),
|
|
pob.pob_offset,
|
|
pob.pob_length);
|
|
|
|
logfile_open_options_base loo;
|
|
if (pob.pob_path == pob.pob_root_path) {
|
|
auto root_iter = conn.c_desired_paths.find(pob.pob_path);
|
|
|
|
if (root_iter == conn.c_desired_paths.end()) {
|
|
log_warning("ignoring unknown root: %s",
|
|
pob.pob_root_path.c_str());
|
|
return std::move(this->ht_state);
|
|
}
|
|
|
|
loo = root_iter->second;
|
|
} else {
|
|
auto child_iter = conn.c_child_paths.find(pob.pob_path);
|
|
if (child_iter == conn.c_child_paths.end()) {
|
|
auto root_iter
|
|
= conn.c_desired_paths.find(pob.pob_root_path);
|
|
|
|
if (root_iter == conn.c_desired_paths.end()) {
|
|
log_warning("ignoring child of unknown root: %s",
|
|
pob.pob_root_path.c_str());
|
|
return std::move(this->ht_state);
|
|
}
|
|
|
|
conn.c_child_paths[pob.pob_path]
|
|
= std::move(root_iter->second);
|
|
child_iter = conn.c_child_paths.find(pob.pob_path);
|
|
}
|
|
|
|
loo = std::move(child_iter->second);
|
|
}
|
|
|
|
update_tailer_description(
|
|
this->ht_netloc, conn.c_desired_paths, this->ht_uname);
|
|
|
|
auto remote_path = ghc::filesystem::absolute(
|
|
ghc::filesystem::path(pob.pob_path))
|
|
.relative_path();
|
|
auto local_path = this->ht_local_path / remote_path;
|
|
auto open_res
|
|
= lnav::filesystem::open_file(local_path, O_RDONLY);
|
|
|
|
if (this->ht_active_files.count(local_path) == 0) {
|
|
this->ht_active_files.insert(local_path);
|
|
|
|
auto custom_name = this->get_display_path(pob.pob_path);
|
|
isc::to<main_looper&, services::main_t>().send(
|
|
[local_path,
|
|
custom_name,
|
|
loo,
|
|
netloc = this->ht_netloc](auto& mlooper) {
|
|
auto& active_fc = lnav_data.ld_active_files;
|
|
auto lpath_str = local_path.string();
|
|
|
|
{
|
|
safe::WriteAccess<safe_scan_progress> sp(
|
|
*active_fc.fc_progress);
|
|
|
|
sp->sp_tailers.erase(netloc);
|
|
}
|
|
if (active_fc.fc_file_names.count(lpath_str) > 0) {
|
|
log_debug("already in fc_file_names");
|
|
return;
|
|
}
|
|
if (active_fc.fc_closed_files.count(custom_name)
|
|
> 0) {
|
|
log_debug("in closed");
|
|
return;
|
|
}
|
|
|
|
file_collection fc;
|
|
|
|
fc.fc_file_names[lpath_str]
|
|
.with_filename(custom_name)
|
|
.with_source(logfile_name_source::REMOTE)
|
|
.with_tail(loo.loo_tail)
|
|
.with_non_utf_visibility(false)
|
|
.with_visible_size_limit(256 * 1024);
|
|
update_active_files(fc);
|
|
});
|
|
}
|
|
|
|
if (open_res.isErr()) {
|
|
log_debug("file not found (%s), sending need block",
|
|
open_res.unwrapErr().c_str());
|
|
send_packet(conn.ht_to_child.get(),
|
|
TPT_NEED_BLOCK,
|
|
TPPT_STRING,
|
|
pob.pob_path.c_str(),
|
|
TPPT_DONE);
|
|
return std::move(this->ht_state);
|
|
}
|
|
|
|
auto fd = open_res.unwrap();
|
|
struct stat st;
|
|
|
|
if (fstat(fd, &st) == -1 || !S_ISREG(st.st_mode)) {
|
|
log_debug("path changed, sending need block");
|
|
ghc::filesystem::remove_all(local_path);
|
|
send_packet(conn.ht_to_child.get(),
|
|
TPT_NEED_BLOCK,
|
|
TPPT_STRING,
|
|
pob.pob_path.c_str(),
|
|
TPPT_DONE);
|
|
return std::move(this->ht_state);
|
|
}
|
|
|
|
if (st.st_size == pob.pob_offset) {
|
|
log_debug("local file is synced, sending need block");
|
|
send_packet(conn.ht_to_child.get(),
|
|
TPT_NEED_BLOCK,
|
|
TPPT_STRING,
|
|
pob.pob_path.c_str(),
|
|
TPPT_DONE);
|
|
return std::move(this->ht_state);
|
|
}
|
|
|
|
constexpr int64_t BUFFER_SIZE = 4 * 1024 * 1024;
|
|
auto buffer = auto_mem<unsigned char>::malloc(BUFFER_SIZE);
|
|
auto remaining = pob.pob_length;
|
|
auto remaining_offset = pob.pob_offset;
|
|
tailer::hash_frag thf;
|
|
SHA256_CTX shactx;
|
|
sha256_init(&shactx);
|
|
|
|
log_debug("checking offer %s[%lld..+%lld]",
|
|
local_path.c_str(),
|
|
remaining_offset,
|
|
remaining);
|
|
while (remaining > 0) {
|
|
auto nbytes = std::min(remaining, BUFFER_SIZE);
|
|
auto bytes_read
|
|
= pread(fd, buffer, nbytes, remaining_offset);
|
|
if (bytes_read == -1) {
|
|
log_debug(
|
|
"unable to read file, sending need block -- %s",
|
|
strerror(errno));
|
|
ghc::filesystem::remove_all(local_path);
|
|
break;
|
|
}
|
|
if (bytes_read == 0) {
|
|
break;
|
|
}
|
|
sha256_update(&shactx, buffer.in(), bytes_read);
|
|
remaining -= bytes_read;
|
|
remaining_offset += bytes_read;
|
|
}
|
|
|
|
if (remaining == 0) {
|
|
sha256_final(&shactx, thf.thf_hash);
|
|
|
|
if (thf == pob.pob_hash) {
|
|
log_debug("local file block is same, sending ack");
|
|
send_packet(conn.ht_to_child.get(),
|
|
TPT_ACK_BLOCK,
|
|
TPPT_STRING,
|
|
pob.pob_path.c_str(),
|
|
TPPT_INT64,
|
|
pob.pob_offset,
|
|
TPPT_INT64,
|
|
pob.pob_length,
|
|
TPPT_INT64,
|
|
(int64_t) st.st_size,
|
|
TPPT_DONE);
|
|
return std::move(this->ht_state);
|
|
}
|
|
log_debug("local file is different, sending need block");
|
|
}
|
|
send_packet(conn.ht_to_child.get(),
|
|
TPT_NEED_BLOCK,
|
|
TPPT_STRING,
|
|
pob.pob_path.c_str(),
|
|
TPPT_DONE);
|
|
return std::move(this->ht_state);
|
|
},
|
|
[&](const tailer::packet_tail_block& ptb) {
|
|
auto remote_path = ghc::filesystem::absolute(
|
|
ghc::filesystem::path(ptb.ptb_path))
|
|
.relative_path();
|
|
auto local_path = this->ht_local_path / remote_path;
|
|
|
|
log_debug("writing tail to: %lld/%ld %s",
|
|
ptb.ptb_offset,
|
|
ptb.ptb_bits.size(),
|
|
local_path.c_str());
|
|
ghc::filesystem::create_directories(local_path.parent_path());
|
|
auto create_res = lnav::filesystem::create_file(
|
|
local_path, O_WRONLY | O_APPEND | O_CREAT, 0600);
|
|
|
|
if (create_res.isErr()) {
|
|
log_error("open: %s", create_res.unwrapErr().c_str());
|
|
} else {
|
|
auto fd = create_res.unwrap();
|
|
ftruncate(fd, ptb.ptb_offset);
|
|
pwrite(fd,
|
|
ptb.ptb_bits.data(),
|
|
ptb.ptb_bits.size(),
|
|
ptb.ptb_offset);
|
|
auto mtime = ghc::filesystem::file_time_type{
|
|
std::chrono::seconds{ptb.ptb_mtime}};
|
|
// XXX This isn't atomic with the write...
|
|
ghc::filesystem::last_write_time(local_path, mtime);
|
|
}
|
|
return std::move(this->ht_state);
|
|
},
|
|
[&](const tailer::packet_synced& ps) {
|
|
if (ps.ps_root_path == ps.ps_path) {
|
|
auto iter = conn.c_desired_paths.find(ps.ps_path);
|
|
|
|
if (iter != conn.c_desired_paths.end()) {
|
|
if (iter->second.loo_tail) {
|
|
conn.c_synced_desired_paths.insert(ps.ps_path);
|
|
} else {
|
|
log_info("synced desired path: %s",
|
|
iter->first.c_str());
|
|
conn.c_desired_paths.erase(iter);
|
|
}
|
|
}
|
|
} else {
|
|
auto iter = conn.c_child_paths.find(ps.ps_path);
|
|
|
|
if (iter != conn.c_child_paths.end()) {
|
|
if (iter->second.loo_tail) {
|
|
conn.c_synced_child_paths.insert(ps.ps_path);
|
|
} else {
|
|
log_info("synced child path: %s",
|
|
iter->first.c_str());
|
|
conn.c_child_paths.erase(iter);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (conn.c_desired_paths.empty() && conn.c_child_paths.empty())
|
|
{
|
|
log_info("tailer(%s): all desired paths synced",
|
|
this->ht_netloc.c_str());
|
|
return state_v{synced{}};
|
|
} else if (!conn.c_initial_sync_done
|
|
&& conn.c_desired_paths.size()
|
|
== conn.c_synced_desired_paths.size()
|
|
&& conn.c_child_paths.size()
|
|
== conn.c_synced_child_paths.size())
|
|
{
|
|
log_info("tailer(%s): all desired paths synced",
|
|
this->ht_netloc.c_str());
|
|
conn.c_initial_sync_done = true;
|
|
|
|
std::set<std::string> synced_files;
|
|
for (const auto& desired_pair : conn.c_desired_paths) {
|
|
synced_files.emplace(fmt::format(
|
|
FMT_STRING("{}{}"), ht_netloc, desired_pair.first));
|
|
}
|
|
isc::to<main_looper&, services::main_t>().send(
|
|
[file_set = std::move(synced_files)](auto& mlooper) {
|
|
file_collection fc;
|
|
|
|
fc.fc_synced_files = file_set;
|
|
update_active_files(fc);
|
|
});
|
|
}
|
|
|
|
return std::move(this->ht_state);
|
|
},
|
|
[&](const tailer::packet_link& pl) {
|
|
auto remote_path = ghc::filesystem::absolute(
|
|
ghc::filesystem::path(pl.pl_path))
|
|
.relative_path();
|
|
auto local_path = this->ht_local_path / remote_path;
|
|
auto remote_link_path = ghc::filesystem::path(pl.pl_link_value);
|
|
std::string link_path;
|
|
|
|
if (remote_link_path.is_absolute()) {
|
|
auto local_link_path = this->ht_local_path
|
|
/ remote_link_path.relative_path();
|
|
|
|
link_path = local_link_path.string();
|
|
} else {
|
|
link_path = remote_link_path.string();
|
|
}
|
|
|
|
log_debug("symlinking %s -> %s",
|
|
local_path.c_str(),
|
|
link_path.c_str());
|
|
ghc::filesystem::create_directories(local_path.parent_path());
|
|
ghc::filesystem::remove_all(local_path);
|
|
if (symlink(link_path.c_str(), local_path.c_str()) < 0) {
|
|
log_error("symlink failed: %s", strerror(errno));
|
|
}
|
|
|
|
if (pl.pl_root_path == pl.pl_path) {
|
|
auto iter = conn.c_desired_paths.find(pl.pl_path);
|
|
|
|
if (iter != conn.c_desired_paths.end()) {
|
|
if (iter->second.loo_tail) {
|
|
conn.c_synced_desired_paths.insert(pl.pl_path);
|
|
} else {
|
|
log_info("synced desired path: %s",
|
|
iter->first.c_str());
|
|
conn.c_desired_paths.erase(iter);
|
|
}
|
|
}
|
|
} else {
|
|
auto iter = conn.c_child_paths.find(pl.pl_path);
|
|
|
|
if (iter != conn.c_child_paths.end()) {
|
|
if (iter->second.loo_tail) {
|
|
conn.c_synced_child_paths.insert(pl.pl_path);
|
|
} else {
|
|
log_info("synced child path: %s",
|
|
iter->first.c_str());
|
|
conn.c_child_paths.erase(iter);
|
|
}
|
|
}
|
|
}
|
|
|
|
return std::move(this->ht_state);
|
|
},
|
|
[&](const tailer::packet_preview_error& ppe) {
|
|
isc::to<main_looper&, services::main_t>().send(
|
|
[ppe](auto& mlooper) {
|
|
if (lnav_data.ld_preview_generation != ppe.ppe_id) {
|
|
log_debug("preview ID mismatch: %lld != %lld",
|
|
lnav_data.ld_preview_generation,
|
|
ppe.ppe_id);
|
|
return;
|
|
}
|
|
lnav_data.ld_preview_status_source.get_description()
|
|
.set_cylon(false)
|
|
.clear();
|
|
lnav_data.ld_preview_source.clear();
|
|
lnav_data.ld_bottom_source.grep_error(ppe.ppe_msg);
|
|
});
|
|
|
|
return std::move(this->ht_state);
|
|
},
|
|
[&](const tailer::packet_preview_data& ppd) {
|
|
isc::to<main_looper&, services::main_t>().send(
|
|
[netloc = this->ht_netloc, ppd](auto& mlooper) {
|
|
if (lnav_data.ld_preview_generation != ppd.ppd_id) {
|
|
log_debug("preview ID mismatch: %lld != %lld",
|
|
lnav_data.ld_preview_generation,
|
|
ppd.ppd_id);
|
|
return;
|
|
}
|
|
std::string str(ppd.ppd_bits.begin(),
|
|
ppd.ppd_bits.end());
|
|
lnav_data.ld_preview_status_source.get_description()
|
|
.set_cylon(false)
|
|
.set_value("For file: %s:%s",
|
|
netloc.c_str(),
|
|
ppd.ppd_path.c_str());
|
|
lnav_data.ld_preview_source.replace_with(str)
|
|
.set_text_format(detect_text_format(str));
|
|
});
|
|
return std::move(this->ht_state);
|
|
},
|
|
[&](const tailer::packet_possible_path& ppp) {
|
|
log_debug("possible path: %s", ppp.ppp_path.c_str());
|
|
auto full_path = fmt::format(
|
|
FMT_STRING("{}{}"), this->ht_netloc, ppp.ppp_path);
|
|
|
|
isc::to<main_looper&, services::main_t>().send(
|
|
[full_path](auto& mlooper) {
|
|
lnav_data.ld_rl_view->add_possibility(
|
|
ln_mode_t::COMMAND, "remote-path", full_path);
|
|
});
|
|
return std::move(this->ht_state);
|
|
});
|
|
|
|
if (!this->ht_state.is<connected>()) {
|
|
this->s_looping = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
std::chrono::milliseconds
|
|
tailer::looper::host_tailer::compute_timeout(mstime_t current_time) const
|
|
{
|
|
return 0s;
|
|
}
|
|
|
|
void
|
|
tailer::looper::host_tailer::stopped()
|
|
{
|
|
if (this->ht_state.is<connected>()) {
|
|
this->ht_state = disconnected();
|
|
}
|
|
if (this->ht_error_reader.joinable()) {
|
|
this->ht_error_reader.join();
|
|
}
|
|
}
|
|
|
|
std::string
|
|
tailer::looper::host_tailer::get_display_path(
|
|
const std::string& remote_path) const
|
|
{
|
|
return fmt::format(FMT_STRING("{}{}"), this->ht_netloc, remote_path);
|
|
}
|
|
|
|
void*
|
|
tailer::looper::host_tailer::run()
|
|
{
|
|
log_set_thread_prefix(
|
|
fmt::format(FMT_STRING("tailer({})"), this->ht_netloc));
|
|
|
|
return service_base::run();
|
|
}
|
|
|
|
auto_pid<process_state::finished>
|
|
tailer::looper::host_tailer::connected::close() &&
|
|
{
|
|
this->ht_to_child.reset();
|
|
this->ht_from_child.reset();
|
|
|
|
return std::move(this->ht_child).wait_for_child();
|
|
}
|
|
|
|
void
|
|
tailer::looper::child_finished(std::shared_ptr<service_base> child)
|
|
{
|
|
auto child_tailer = std::static_pointer_cast<host_tailer>(child);
|
|
|
|
for (auto iter = this->l_remotes.begin(); iter != this->l_remotes.end();
|
|
++iter)
|
|
{
|
|
if (iter->second != child_tailer) {
|
|
continue;
|
|
}
|
|
|
|
if (child_tailer->is_synced()) {
|
|
log_info("synced with netloc '%s', removing", iter->first.c_str());
|
|
auto netloc_iter = this->l_netlocs_to_paths.find(iter->first);
|
|
|
|
if (netloc_iter != this->l_netlocs_to_paths.end()) {
|
|
netloc_iter->second.send_synced_to_main(netloc_iter->first);
|
|
this->l_netlocs_to_paths.erase(netloc_iter);
|
|
}
|
|
}
|
|
lnav_data.ld_active_files.fc_progress->writeAccess()->sp_tailers.erase(
|
|
iter->first);
|
|
this->l_remotes.erase(iter);
|
|
return;
|
|
}
|
|
}
|
|
|
|
void
|
|
tailer::looper::remote_path_queue::send_synced_to_main(
|
|
const std::string& netloc)
|
|
{
|
|
std::set<std::string> synced_files;
|
|
|
|
for (const auto& pair : this->rpq_new_paths) {
|
|
if (!pair.second.loo_tail) {
|
|
synced_files.emplace(
|
|
fmt::format(FMT_STRING("{}{}"), netloc, pair.first));
|
|
}
|
|
}
|
|
for (const auto& pair : this->rpq_existing_paths) {
|
|
if (!pair.second.loo_tail) {
|
|
synced_files.emplace(
|
|
fmt::format(FMT_STRING("{}{}"), netloc, pair.first));
|
|
}
|
|
}
|
|
|
|
isc::to<main_looper&, services::main_t>().send(
|
|
[file_set = std::move(synced_files)](auto& mlooper) {
|
|
file_collection fc;
|
|
|
|
fc.fc_synced_files = file_set;
|
|
update_active_files(fc);
|
|
});
|
|
}
|
|
|
|
void
|
|
tailer::looper::report_error(std::string path, std::string msg)
|
|
{
|
|
log_error("reporting error: %s -- %s", path.c_str(), msg.c_str());
|
|
isc::to<main_looper&, services::main_t>().send([=](auto& mlooper) {
|
|
file_collection fc;
|
|
|
|
fc.fc_name_to_errors.emplace(path,
|
|
file_error_info{
|
|
{},
|
|
msg,
|
|
});
|
|
update_active_files(fc);
|
|
lnav_data.ld_active_files.fc_progress->writeAccess()->sp_tailers.erase(
|
|
path);
|
|
});
|
|
}
|
|
|
|
void
|
|
tailer::cleanup_cache()
|
|
{
|
|
(void) std::async(std::launch::async, []() {
|
|
auto now = std::chrono::system_clock::now();
|
|
auto cache_path = remote_cache_path();
|
|
const auto& cfg = injector::get<const config&>();
|
|
std::vector<ghc::filesystem::path> to_remove;
|
|
|
|
log_debug("cache-ttl %d", cfg.c_cache_ttl.count());
|
|
for (const auto& entry :
|
|
ghc::filesystem::directory_iterator(cache_path))
|
|
{
|
|
auto mtime = ghc::filesystem::last_write_time(entry.path());
|
|
auto exp_time = mtime + cfg.c_cache_ttl;
|
|
if (now < exp_time) {
|
|
continue;
|
|
}
|
|
|
|
to_remove.emplace_back(entry.path());
|
|
}
|
|
|
|
for (auto& entry : to_remove) {
|
|
log_debug("removing cached remote: %s", entry.c_str());
|
|
ghc::filesystem::remove_all(entry);
|
|
}
|
|
});
|
|
}
|