[remote] draft of the tailer

pull/857/head
Timothy Stack 3 years ago
parent cc1e79d1cc
commit 444e7e3289

@ -1,6 +1,6 @@
# aminclude_static.am generated automatically by Autoconf
# from AX_AM_MACROS_STATIC on Mon Mar 29 09:04:25 PDT 2021
# from AX_AM_MACROS_STATIC on Thu Apr 22 08:56:31 PDT 2021
# Code coverage

@ -293,6 +293,7 @@ AC_CONFIG_FILES([src/base/Makefile])
AC_CONFIG_FILES([src/fmtlib/Makefile])
AC_CONFIG_FILES([src/pcrepp/Makefile])
AC_CONFIG_FILES([src/pugixml/Makefile])
AC_CONFIG_FILES([src/tailer/Makefile])
AC_CONFIG_FILES([src/yajl/Makefile])
AC_CONFIG_FILES([src/yajlpp/Makefile])
AC_CONFIG_FILES([test/Makefile])

@ -14,6 +14,8 @@ set(VCS_PACKAGE_STRING "test")
configure_file(config.cmake.h.in config.h)
add_subdirectory(base)
add_subdirectory(remote)
add_subdirectory(tailer)
add_executable(bin2c bin2c.hh bin2c.c)
target_link_libraries(bin2c ZLIB::zlib)

@ -1,7 +1,7 @@
include $(top_srcdir)/aminclude_static.am
SUBDIRS = fmtlib base pcrepp pugixml yajl yajlpp .
SUBDIRS = fmtlib base tailer pcrepp pugixml yajl yajlpp .
bin_PROGRAMS = lnav

@ -56,6 +56,14 @@ public:
return *this;
};
pid_t in() const {
return this->ap_child;
}
bool failed() const {
return this->ap_child == -1;
}
bool in_child() const
{
return this->ap_child == 0;

@ -10,6 +10,7 @@ add_library(base STATIC
isc.cc
lnav.gzip.cc
lnav_log.cc
network.tcp.cc
string_util.cc
time_util.cc
@ -27,6 +28,7 @@ add_library(base STATIC
isc.hh
lrucache.hpp
math_util.hh
network.tcp.hh
result.h
time_util.hh
)
@ -46,6 +48,8 @@ add_executable(test_base
humanize.file_size.tests.cc
humanize.time.tests.cc
string_util.tests.cc
network.tcp.tests.cc
test_base.cc)
target_link_libraries(test_base base)
add_test(NAME test_base COMMAND test_base)

@ -30,6 +30,7 @@ noinst_HEADERS = \
lnav.gzip.hh \
lrucache.hpp \
math_util.hh \
network.tcp.hh \
opt_util.hh \
result.h \
string_util.hh \
@ -44,6 +45,7 @@ libbase_a_SOURCES = \
isc.cc \
lnav.gzip.cc \
lnav_log.cc \
network.tcp.cc \
string_util.cc \
time_util.cc

@ -0,0 +1,70 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include "fmt/format.h"
#include "network.tcp.hh"
namespace network {
namespace tcp {
Result<auto_fd, std::string> connect(const char *hostname,
const char *servname)
{
struct addrinfo hints, *ai;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
auto rc = getaddrinfo(hostname, servname, &hints, &ai);
if (rc != 0) {
return Err(fmt::format("unable to resolve {}:{} -- {}",
hostname, servname, gai_strerror(rc)));
}
auto retval = auto_fd(socket(ai->ai_family, ai->ai_socktype, 0));
rc = ::connect(retval, ai->ai_addr, ai->ai_addrlen);
if (rc != 0) {
return Err(fmt::format("unable to connect to {}:{} -- {}",
hostname, servname, strerror(rc)));
}
return Ok(retval);
}
}
}

@ -0,0 +1,47 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef lnav_network_tcp_hh
#define lnav_network_tcp_hh
#include <string>
#include "auto_fd.hh"
#include "result.h"
namespace network {
namespace tcp {
Result<auto_fd, std::string> connect(const char *hostname,
const char *servname);
}
}
#endif

@ -0,0 +1,52 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#include <iostream>
#include "doctest.hh"
#include "network.tcp.hh"
TEST_CASE ("bad hostname")
{
auto connect_res = network::tcp::connect("foobar.bazzer", "http");
CHECK(connect_res.unwrapErr() ==
"unable to resolve foobar.bazzer:http -- nodename nor servname "
"provided, or not known");
}
TEST_CASE ("bad servname")
{
auto connect_res = network::tcp::connect("www.cnn.com", "non-existent");
CHECK(connect_res.unwrapErr() ==
"unable to resolve www.cnn.com:non-existent -- nodename nor "
"servname provided, or not known");
}

@ -0,0 +1,18 @@
add_library(remote STATIC
../config.h
remote.ssh.cc
remote.ssh.hh
)
target_include_directories(
remote
PUBLIC
.
..
../fmtlib
${CMAKE_CURRENT_BINARY_DIR}/..
)
target_link_libraries(remote cppfmt PkgConfig::libpcre)

@ -0,0 +1,20 @@
include $(top_srcdir)/aminclude_static.am
AM_CPPFLAGS = \
$(CODE_COVERAGE_CPPFLAGS) \
-Wall \
-I$(top_srcdir)/src/ \
-I$(top_srcdir)/src/fmtlib
AM_LIBS = $(CODE_COVERAGE_LIBS)
AM_CFLAGS = $(CODE_COVERAGE_CFLAGS)
AM_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS)
noinst_LIBRARIES = libremote.a
noinst_HEADERS = \
remote.ssh.hh
libremote_a_SOURCES = \
remote.ssh.cc

@ -0,0 +1,38 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#include "remote.ssh.hh"
namespace remote {
namespace ssh {
}
}

@ -0,0 +1,41 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef lnav_remote_ssh_hh
#define lnav_remote_ssh_hh
#include "base/result.h"
namespace remote {
namespace ssh {
}
}
#endif

@ -0,0 +1,42 @@
add_library(
tailercommon
sha-256.c
sha-256.h
tailer.c
tailer.h
)
add_executable(
tailer
tailer.main.c
)
target_link_libraries(tailer tailercommon)
add_library(
tailerpp
tailerpp.hh
tailerpp.cc
)
target_link_libraries(tailerpp base)
add_executable(
drive_tailer
drive_tailer.cc
)
target_include_directories(
drive_tailer
PUBLIC
.
..
../fmtlib
${CMAKE_CURRENT_BINARY_DIR}/..
)
target_link_libraries(drive_tailer base tailercommon tailerpp)

@ -0,0 +1,40 @@
noinst_LIBRARIES = libtailercommon.a libtailerpp.a
noinst_HEADERS = \
sha-256.h \
tailer.h \
tailerpp.hh
libtailercommon_a_SOURCES = \
sha-256.c \
tailer.c
libtailerpp_a_CPPFLAGS = \
-I$(srcdir)/.. \
-I$(srcdir)/../fmtlib
libtailerpp_a_SOURCES = \
tailerpp.cc
noinst_PROGRAMS = \
drive_tailer \
tailer
tailer_SOURCES = \
tailer.main.c
tailer_LDADD = libtailercommon.a
drive_tailer_CPPFLAGS = \
-I$(srcdir)/.. \
-I$(srcdir)/../fmtlib
drive_tailer_SOURCES = \
drive_tailer.cc
drive_tailer_LDADD = \
../base/libbase.a \
../fmtlib/libcppfmt.a \
libtailercommon.a \
libtailerpp.a

@ -0,0 +1,177 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <unistd.h>
#include "config.h"
#include "ghc/filesystem.hpp"
#include "auto_pid.hh"
#include "auto_fd.hh"
#include "tailerpp.hh"
int main(int argc, char *const *argv)
{
auto tmppath = ghc::filesystem::temp_directory_path() / "drive_tailer";
ghc::filesystem::remove_all(tmppath);
ghc::filesystem::create_directories(tmppath);
auto_pipe in_pipe(STDIN_FILENO);
auto_pipe out_pipe(STDOUT_FILENO);
in_pipe.open();
out_pipe.open();
auto child = auto_pid(fork());
if (child.failed()) {
exit(EXIT_FAILURE);
}
in_pipe.after_fork(child.in());
out_pipe.after_fork(child.in());
if (child.in_child()) {
auto this_exe = ghc::filesystem::path(argv[0]);
auto exe_dir = this_exe.parent_path();
auto tailer_exe = exe_dir / "tailer";
execvp(tailer_exe.c_str(), argv);
exit(EXIT_FAILURE);
}
fprintf(stderr, "info: child pid %d\n", child.in());
auto &to_child = in_pipe.write_end();
auto &from_child = out_pipe.read_end();
for (int lpc = 1; lpc < argc; lpc++) {
send_packet(to_child,
TPT_OPEN_PATH,
TPPT_STRING, argv[lpc],
TPPT_DONE);
}
bool done = false;
while (!done) {
auto read_res = tailer::read_packet(from_child);
if (read_res.isErr()) {
fprintf(stderr, "read error: %s\n", read_res.unwrapErr().c_str());
exit(EXIT_FAILURE);
}
auto packet = read_res.unwrap();
packet.match(
[&](const tailer::packet_eof &te) {
printf("all done!\n");
done = true;
},
[&](const tailer::packet_error &te) {
printf("Got an error: %s -- %s\n", te.te_path.c_str(),
te.te_msg.c_str());
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(te.te_path)).relative_path();
auto local_path = tmppath / remote_path;
printf("removing %s\n", local_path.c_str());
ghc::filesystem::remove_all(local_path);
},
[&](const tailer::packet_offer_block &tob) {
printf("Got an offer: %s %lld - %lld\n", tob.tob_path.c_str(),
tob.tob_offset, tob.tob_length);
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(tob.tob_path)).relative_path();
auto local_path = tmppath / remote_path;
auto fd = auto_fd(open(local_path.c_str(), O_RDONLY));
fprintf(stderr, "offer fd %d\n", fd.get());
if (fd == -1) {
printf("sending need block\n");
send_packet(to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING, tob.tob_path.c_str(),
TPPT_DONE);
return;
}
auto_mem<char> buffer;
buffer = (char *) malloc(tob.tob_length);
auto bytes_read = pread(fd, buffer, tob.tob_length,
tob.tob_offset);
fprintf(stderr, "debug: bytes_read %ld\n", bytes_read);
if (bytes_read == tob.tob_length) {
tailer::hash_frag thf;
calc_sha_256(thf.thf_hash, buffer, bytes_read);
if (thf == tob.tob_hash) {
send_packet(to_child.get(),
TPT_ACK_BLOCK,
TPPT_STRING, tob.tob_path.c_str(),
TPPT_DONE);
return;
}
} else if (bytes_read == -1) {
ghc::filesystem::remove(local_path);
}
send_packet(to_child.get(),
TPT_NEED_BLOCK,
TPPT_STRING, tob.tob_path.c_str(),
TPPT_DONE);
},
[&](const tailer::packet_tail_block &ttb) {
//printf("got a tail: %s %ld\n", ttb.ttb_path.c_str(),
// ttb.ttb_bits.size());
auto remote_path = ghc::filesystem::absolute(
ghc::filesystem::path(ttb.ttb_path)).relative_path();
auto local_path = tmppath / remote_path;
ghc::filesystem::create_directories(local_path.parent_path());
auto fd = auto_fd(
open(local_path.c_str(), O_WRONLY | O_APPEND | O_CREAT,
0600));
if (fd == -1) {
perror("open");
} else {
write(fd, ttb.ttb_bits.data(), ttb.ttb_bits.size());
}
}
);
}
child.wait_for_child();
if (!child.was_normal_exit()) {
fprintf(stderr, "error: child exited abnormally\n");
}
}

@ -0,0 +1,219 @@
#include <stdint.h>
#include <string.h>
#include "sha-256.h"
#define CHUNK_SIZE 64
#define TOTAL_LEN_LEN 8
/*
* ABOUT bool: this file does not use bool in order to be as pre-C99 compatible as possible.
*/
/*
* Comments from pseudo-code at https://en.wikipedia.org/wiki/SHA-2 are reproduced here.
* When useful for clarification, portions of the pseudo-code are reproduced here too.
*/
/*
* Initialize array of round constants:
* (first 32 bits of the fractional parts of the cube roots of the first 64 primes 2..311):
*/
static const uint32_t k[] = {
0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5,
0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174,
0xe49b69c1, 0xefbe4786, 0x0fc19dc6, 0x240ca1cc, 0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da,
0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7, 0xc6e00bf3, 0xd5a79147, 0x06ca6351, 0x14292967,
0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13, 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85,
0xa2bfe8a1, 0xa81a664b, 0xc24b8b70, 0xc76c51a3, 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070,
0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a, 0x5b9cca4f, 0x682e6ff3,
0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2
};
struct buffer_state {
const uint8_t * p;
size_t len;
size_t total_len;
int single_one_delivered; /* bool */
int total_len_delivered; /* bool */
};
static inline uint32_t right_rot(uint32_t value, unsigned int count)
{
/*
* Defined behaviour in standard C for all count where 0 < count < 32,
* which is what we need here.
*/
return value >> count | value << (32 - count);
}
static void init_buf_state(struct buffer_state * state, const void * input, size_t len)
{
state->p = input;
state->len = len;
state->total_len = len;
state->single_one_delivered = 0;
state->total_len_delivered = 0;
}
/* Return value: bool */
static int calc_chunk(uint8_t chunk[CHUNK_SIZE], struct buffer_state * state)
{
size_t space_in_chunk;
if (state->total_len_delivered) {
return 0;
}
if (state->len >= CHUNK_SIZE) {
memcpy(chunk, state->p, CHUNK_SIZE);
state->p += CHUNK_SIZE;
state->len -= CHUNK_SIZE;
return 1;
}
memcpy(chunk, state->p, state->len);
chunk += state->len;
space_in_chunk = CHUNK_SIZE - state->len;
state->p += state->len;
state->len = 0;
/* If we are here, space_in_chunk is one at minimum. */
if (!state->single_one_delivered) {
*chunk++ = 0x80;
space_in_chunk -= 1;
state->single_one_delivered = 1;
}
/*
* Now:
* - either there is enough space left for the total length, and we can conclude,
* - or there is too little space left, and we have to pad the rest of this chunk with zeroes.
* In the latter case, we will conclude at the next invokation of this function.
*/
if (space_in_chunk >= TOTAL_LEN_LEN) {
const size_t left = space_in_chunk - TOTAL_LEN_LEN;
size_t len = state->total_len;
int i;
memset(chunk, 0x00, left);
chunk += left;
/* Storing of len * 8 as a big endian 64-bit without overflow. */
chunk[7] = (uint8_t) (len << 3);
len >>= 5;
for (i = 6; i >= 0; i--) {
chunk[i] = (uint8_t) len;
len >>= 8;
}
state->total_len_delivered = 1;
} else {
memset(chunk, 0x00, space_in_chunk);
}
return 1;
}
/*
* Limitations:
* - Since input is a pointer in RAM, the data to hash should be in RAM, which could be a problem
* for large data sizes.
* - SHA algorithms theoretically operate on bit strings. However, this implementation has no support
* for bit string lengths that are not multiples of eight, and it really operates on arrays of bytes.
* In particular, the len parameter is a number of bytes.
*/
void calc_sha_256(uint8_t hash[32], const void * input, size_t len)
{
/*
* Note 1: All integers (expect indexes) are 32-bit unsigned integers and addition is calculated modulo 2^32.
* Note 2: For each round, there is one round constant k[i] and one entry in the message schedule array w[i], 0 = i = 63
* Note 3: The compression function uses 8 working variables, a through h
* Note 4: Big-endian convention is used when expressing the constants in this pseudocode,
* and when parsing message block data from bytes to words, for example,
* the first word of the input message "abc" after padding is 0x61626380
*/
/*
* Initialize hash values:
* (first 32 bits of the fractional parts of the square roots of the first 8 primes 2..19):
*/
uint32_t h[] = { 0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a, 0x510e527f, 0x9b05688c, 0x1f83d9ab, 0x5be0cd19 };
unsigned i, j;
/* 512-bit chunks is what we will operate on. */
uint8_t chunk[64];
struct buffer_state state;
init_buf_state(&state, input, len);
while (calc_chunk(chunk, &state)) {
uint32_t ah[8];
const uint8_t *p = chunk;
/* Initialize working variables to current hash value: */
for (i = 0; i < 8; i++)
ah[i] = h[i];
/* Compression function main loop: */
for (i = 0; i < 4; i++) {
/*
* The w-array is really w[64], but since we only need
* 16 of them at a time, we save stack by calculating
* 16 at a time.
*
* This optimization was not there initially and the
* rest of the comments about w[64] are kept in their
* initial state.
*/
/*
* create a 64-entry message schedule array w[0..63] of 32-bit words
* (The initial values in w[0..63] don't matter, so many implementations zero them here)
* copy chunk into first 16 words w[0..15] of the message schedule array
*/
uint32_t w[16];
for (j = 0; j < 16; j++) {
if (i == 0) {
w[j] = (uint32_t) p[0] << 24 | (uint32_t) p[1] << 16 |
(uint32_t) p[2] << 8 | (uint32_t) p[3];
p += 4;
} else {
/* Extend the first 16 words into the remaining 48 words w[16..63] of the message schedule array: */
const uint32_t s0 = right_rot(w[(j + 1) & 0xf], 7) ^ right_rot(w[(j + 1) & 0xf], 18) ^ (w[(j + 1) & 0xf] >> 3);
const uint32_t s1 = right_rot(w[(j + 14) & 0xf], 17) ^ right_rot(w[(j + 14) & 0xf], 19) ^ (w[(j + 14) & 0xf] >> 10);
w[j] = w[j] + s0 + w[(j + 9) & 0xf] + s1;
}
const uint32_t s1 = right_rot(ah[4], 6) ^ right_rot(ah[4], 11) ^ right_rot(ah[4], 25);
const uint32_t ch = (ah[4] & ah[5]) ^ (~ah[4] & ah[6]);
const uint32_t temp1 = ah[7] + s1 + ch + k[i << 4 | j] + w[j];
const uint32_t s0 = right_rot(ah[0], 2) ^ right_rot(ah[0], 13) ^ right_rot(ah[0], 22);
const uint32_t maj = (ah[0] & ah[1]) ^ (ah[0] & ah[2]) ^ (ah[1] & ah[2]);
const uint32_t temp2 = s0 + maj;
ah[7] = ah[6];
ah[6] = ah[5];
ah[5] = ah[4];
ah[4] = ah[3] + temp1;
ah[3] = ah[2];
ah[2] = ah[1];
ah[1] = ah[0];
ah[0] = temp1 + temp2;
}
}
/* Add the compressed chunk to the current hash value: */
for (i = 0; i < 8; i++)
h[i] += ah[i];
}
/* Produce the final hash value (big-endian): */
for (i = 0, j = 0; i < 8; i++)
{
hash[j++] = (uint8_t) (h[i] >> 24);
hash[j++] = (uint8_t) (h[i] >> 16);
hash[j++] = (uint8_t) (h[i] >> 8);
hash[j++] = (uint8_t) h[i];
}
}

@ -0,0 +1,15 @@
#include <stdint.h>
#define SHA_256_HASH_SIZE 32
#ifdef __cplusplus
extern "C" {
#endif
void
calc_sha_256(uint8_t hash[SHA_256_HASH_SIZE], const void *input, size_t len);
#ifdef __cplusplus
};
#endif

@ -0,0 +1,96 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <stdarg.h>
#include "sha-256.h"
#include "tailer.h"
ssize_t send_packet(int fd,
tailer_packet_type_t tpt,
tailer_packet_payload_type_t payload_type,
...)
{
va_list args;
int done = 0;
va_start(args, payload_type);
write(fd, &tpt, sizeof(tpt));
do {
write(fd, &payload_type, sizeof(payload_type));
switch (payload_type) {
case TPPT_STRING: {
char *str = va_arg(args, char *);
uint32_t length = strlen(str);
write(fd, &length, sizeof(length));
write(fd, str, length);
break;
}
case TPPT_HASH: {
const char *hash = va_arg(args, const char *);
write(fd, hash, SHA_256_HASH_SIZE);
break;
}
case TPPT_INT64: {
int64_t i = va_arg(args, int64_t);
write(fd, &i, sizeof(i));
break;
}
case TPPT_BITS: {
int64_t length = va_arg(args, int64_t);
const char *bits = va_arg(args, const char *);
write(fd, &length, sizeof(length));
write(fd, bits, length);
break;
}
case TPPT_DONE: {
done = 1;
break;
}
default: {
assert(0);
break;
}
}
if (!done) {
payload_type = va_arg(args, tailer_packet_payload_type_t);
}
} while (!done);
va_end(args);
return 0;
}

@ -0,0 +1,66 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef lnav_tailer_h
#define lnav_tailer_h
#include <sys/types.h>
typedef enum {
TPPT_DONE,
TPPT_STRING,
TPPT_HASH,
TPPT_INT64,
TPPT_BITS,
} tailer_packet_payload_type_t;
typedef enum {
TPT_ERROR,
TPT_OPEN_PATH,
TPT_CLOSE_PATH,
TPT_OFFER_BLOCK,
TPT_NEED_BLOCK,
TPT_ACK_BLOCK,
TPT_TAIL_BLOCK,
} tailer_packet_type_t;
#ifdef __cplusplus
extern "C" {
#endif
ssize_t send_packet(int fd,
tailer_packet_type_t tpt,
tailer_packet_payload_type_t payload_type,
...);
#ifdef __cplusplus
};
#endif
#endif

@ -0,0 +1,511 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <dirent.h>
#include <stdarg.h>
#include <limits.h>
#include <poll.h>
#include <sys/stat.h>
#include <sys/fcntl.h>
#include <unistd.h>
#include "sha-256.h"
#include "tailer.h"
struct node {
struct node *n_succ;
struct node *n_pred;
};
struct list {
struct node *l_head;
struct node *l_tail;
struct node *l_tail_pred;
};
void list_init(struct list *l)
{
l->l_head = (struct node *) &l->l_tail;
l->l_tail = NULL;
l->l_tail_pred = (struct node *) &l->l_head;
}
void list_move(struct list *dst, struct list *src)
{
if (src->l_head->n_succ == NULL) {
list_init(dst);
return;
}
dst->l_head = src->l_head;
dst->l_head->n_pred = (struct node *) &dst->l_head;
dst->l_tail = NULL;
dst->l_tail_pred = src->l_tail_pred;
dst->l_tail_pred->n_succ = (struct node *) &dst->l_tail;
list_init(src);
}
void list_remove(struct node *n)
{
n->n_pred->n_succ = n->n_succ;
n->n_succ->n_pred = n->n_pred;
n->n_succ = NULL;
n->n_pred = NULL;
}
struct node *list_remove_head(struct list *l)
{
struct node *retval = NULL;
if (l->l_head->n_succ != NULL) {
retval = l->l_head;
list_remove(l->l_head);
}
return retval;
}
void list_append(struct list *l, struct node *n)
{
n->n_pred = l->l_tail_pred;
n->n_succ = (struct node *) &l->l_tail;
l->l_tail_pred->n_succ = n;
l->l_tail_pred = n;
}
typedef enum {
CS_INIT,
CS_OFFERED,
CS_TAILING,
} client_state_t;
typedef enum {
PS_UNKNOWN,
PS_OK,
PS_ERROR,
} path_state_t;
struct client_path_state {
struct node cps_node;
char *cps_path;
path_state_t cps_last_path_state;
struct stat cps_last_stat;
int64_t cps_client_file_offset;
int64_t cps_client_file_read_length;
client_state_t cps_client_state;
struct list cps_children;
};
struct client_path_state *create_client_path_state(const char *path)
{
struct client_path_state *retval = malloc(sizeof(struct client_path_state));
retval->cps_path = strdup(path);
retval->cps_last_path_state = PS_UNKNOWN;
memset(&retval->cps_last_stat, 0, sizeof(retval->cps_last_stat));
retval->cps_client_file_offset = 0;
retval->cps_client_file_read_length = 0;
retval->cps_client_state = CS_INIT;
list_init(&retval->cps_children);
return retval;
}
void delete_client_path_state(struct client_path_state *cps)
{
free(cps->cps_path);
while (cps->cps_children.l_head->n_succ != NULL) {
struct client_path_state *child_cps = (struct client_path_state *) cps->cps_children.l_head;
list_remove(&child_cps->cps_node);
delete_client_path_state(child_cps);
}
free(cps);
}
void dump_client_path_states(struct list *path_list)
{
struct client_path_state *curr = (struct client_path_state *) path_list->l_head;
while (curr->cps_node.n_succ != NULL) {
fprintf(stderr, "debug: path %s\n", curr->cps_path);
dump_client_path_states(&curr->cps_children);
curr = (struct client_path_state *) curr->cps_node.n_succ;
}
curr = (struct client_path_state *) path_list->l_tail_pred;
while (curr->cps_node.n_pred != NULL) {
fprintf(stderr, "debug: back path %s\n", curr->cps_path);
dump_client_path_states(&curr->cps_children);
curr = (struct client_path_state *) curr->cps_node.n_pred;
}
}
void send_error(struct client_path_state *cps, char *msg, ...)
{
char buffer[1024];
va_list args;
va_start(args, msg);
vsnprintf(buffer, sizeof(buffer), msg, args);
va_end(args);
send_packet(STDOUT_FILENO,
TPT_ERROR,
TPPT_STRING, cps->cps_path,
TPPT_STRING, buffer,
TPPT_DONE);
}
void set_client_path_state_error(struct client_path_state *cps)
{
if (cps->cps_last_path_state != PS_ERROR) {
// tell client of the problem
send_error(cps, "unable to open -- %s", strerror(errno));
}
cps->cps_last_path_state = PS_ERROR;
cps->cps_client_file_offset = 0;
cps->cps_client_state = CS_INIT;
}
static int readall(int sock, void *buf, size_t len)
{
char *cbuf = (char *) buf;
off_t offset = 0;
while (len > 0) {
ssize_t rc = read(sock, &cbuf[offset], len);
if (rc == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
break;
default:
return -1;
}
}
else if (rc == 0) {
errno = EIO;
return -1;
}
else {
len -= rc;
offset += rc;
}
}
return 0;
}
static tailer_packet_payload_type_t read_payload_type(int sock)
{
tailer_packet_payload_type_t retval = TPPT_DONE;
readall(sock, &retval, sizeof(retval));
return retval;
}
static char *readstr(int sock)
{
tailer_packet_payload_type_t payload_type = read_payload_type(sock);
if (payload_type != TPPT_STRING) {
fprintf(stderr, "error: expected string, got: %d\n", payload_type);
return NULL;
}
int32_t length;
if (readall(sock, &length, sizeof(length)) == -1) {
fprintf(stderr, "error: unable to read string length\n");
return NULL;
}
char *retval = malloc(length + 1);
if (retval == NULL) {
return NULL;
}
if (readall(sock, retval, length) == -1) {
fprintf(stderr, "error: unable to read string of length: %d\n", length);
free(retval);
return NULL;
}
retval[length] = '\0';
return retval;
}
struct list client_path_list;
struct client_path_state *find_client_path_state(struct list *path_list, const char *path)
{
struct client_path_state *curr = (struct client_path_state *) path_list->l_head;
while (curr->cps_node.n_succ != NULL) {
if (strcmp(curr->cps_path, path) == 0) {
return curr;
}
if (strncmp(curr->cps_path, path, strlen(curr->cps_path)) == 0) {
struct client_path_state *child =
find_client_path_state(&curr->cps_children, path);
if (child != NULL) {
return child;
}
}
curr = (struct client_path_state *) curr->cps_node.n_succ;
}
return NULL;
}
int poll_paths(struct list *path_list)
{
struct client_path_state *curr = (struct client_path_state *) path_list->l_head;
int retval = 0;
while (curr->cps_node.n_succ != NULL) {
struct stat st;
int rc = lstat(curr->cps_path, &st);
if (rc == -1) {
set_client_path_state_error(curr);
} else if (S_ISREG(st.st_mode)) {
switch (curr->cps_client_state) {
case CS_INIT:
case CS_TAILING: {
if (curr->cps_client_file_offset < st.st_size) {
int fd = open(curr->cps_path, O_RDONLY);
if (fd == -1) {
set_client_path_state_error(curr);
} else {
char buffer[64 * 1024];
int64_t bytes_read = pread(fd,
buffer, sizeof(buffer),
curr->cps_client_file_offset);
if (bytes_read == -1) {
set_client_path_state_error(curr);
} else if (curr->cps_client_state == CS_INIT) {
uint8_t hash[SHA_256_HASH_SIZE];
calc_sha_256(hash, buffer, bytes_read);
curr->cps_client_file_read_length = bytes_read;
send_packet(STDOUT_FILENO,
TPT_OFFER_BLOCK,
TPPT_STRING, curr->cps_path,
TPPT_INT64,
curr->cps_client_file_offset,
TPPT_INT64, bytes_read,
TPPT_HASH, hash,
TPPT_DONE);
curr->cps_client_state = CS_OFFERED;
} else {
send_packet(STDOUT_FILENO,
TPT_TAIL_BLOCK,
TPPT_STRING, curr->cps_path,
TPPT_BITS, bytes_read, buffer,
TPPT_DONE);
curr->cps_client_file_offset += bytes_read;
}
close(fd);
retval = 1;
}
}
break;
}
case CS_OFFERED: {
// Still waiting for the client ack
break;
}
}
} else if (S_ISDIR(st.st_mode)) {
DIR *dir = opendir(curr->cps_path);
if (dir == NULL) {
set_client_path_state_error(curr);
} else {
struct list prev_children;
struct dirent *entry;
list_move(&prev_children, &curr->cps_children);
while ((entry = readdir(dir)) != NULL) {
if (strcmp(entry->d_name, ".") == 0 ||
strcmp(entry->d_name, "..") == 0) {
continue;
}
char full_path[PATH_MAX];
snprintf(full_path, sizeof(full_path),
"%s/%s",
curr->cps_path, entry->d_name);
struct client_path_state *child = find_client_path_state(&prev_children, full_path);
if (child == NULL) {
// new file
fprintf(stderr, "info: monitoring child path: %s\n", full_path);
child = create_client_path_state(full_path);
} else {
list_remove(&child->cps_node);
}
list_append(&curr->cps_children, &child->cps_node);
}
closedir(dir);
struct client_path_state *child;
while ((child = (struct client_path_state *) list_remove_head(
&prev_children)) != NULL) {
send_error(child, "deleted");
delete_client_path_state(child);
}
retval += poll_paths(&curr->cps_children);
}
}
curr = (struct client_path_state *) curr->cps_node.n_succ;
}
return retval;
}
int main(int argc, char *argv[])
{
int done = 0, timeout = 0;
list_init(&client_path_list);
while (!done) {
struct pollfd pfds[1];
pfds[0].fd = STDIN_FILENO;
pfds[0].events = POLLIN;
int ready_count = poll(pfds, 1, timeout);
if (ready_count) {
tailer_packet_type_t type;
if (readall(STDIN_FILENO, &type, sizeof(type)) == -1) {
fprintf(stderr, "info: exiting...\n");
done = 1;
} else {
switch (type) {
case TPT_OPEN_PATH:
case TPT_CLOSE_PATH: {
const char *path = readstr(STDIN_FILENO);
if (path == NULL) {
fprintf(stderr, "error: unable to get path to open\n");
done = 1;
} else if (read_payload_type(STDIN_FILENO) != TPPT_DONE) {
fprintf(stderr, "error: invalid open packet\n");
done = 1;
} else if (type == TPT_OPEN_PATH) {
struct client_path_state *cps = create_client_path_state(path);
fprintf(stderr, "info: monitoring path: %s\n", path);
list_append(&client_path_list, &cps->cps_node);
} else {
struct client_path_state *cps = find_client_path_state(&client_path_list, path);
if (cps == NULL) {
fprintf(stderr, "warning: path is not open: %s\n", path);
} else {
list_remove(&cps->cps_node);
delete_client_path_state(cps);
}
};
break;
}
case TPT_ACK_BLOCK:
case TPT_NEED_BLOCK: {
char *path = readstr(STDIN_FILENO);
// fprintf(stderr, "info: block packet path: %s\n", path);
if (path == NULL) {
fprintf(stderr, "error: unable to get block path\n");
done = 1;
} else if (read_payload_type(STDIN_FILENO) != TPPT_DONE) {
fprintf(stderr, "error: invalid block packet\n");
done = 1;
} else {
struct client_path_state *cps = find_client_path_state(&client_path_list, path);
if (cps == NULL) {
fprintf(stderr, "warning: unknown path in block packet: %s\n", path);
} else if (type == TPT_NEED_BLOCK) {
// fprintf(stderr, "info: client is tailing: %s\n", path);
cps->cps_client_state = CS_TAILING;
} else if (type == TPT_ACK_BLOCK) {
// fprintf(stderr, "info: client acked: %s\n", path);
cps->cps_client_file_offset +=
cps->cps_client_file_read_length;
cps->cps_client_state = CS_INIT;
}
free(path);
}
break;
}
default: {
assert(0);
}
}
}
}
if (!done) {
if (poll_paths(&client_path_list)) {
timeout = 0;
} else {
timeout = 1000;
fprintf(stderr, "all synced!\n");
}
}
}
return EXIT_SUCCESS;
}

@ -0,0 +1,101 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "tailerpp.hh"
namespace tailer {
int readall(int sock, void *buf, size_t len)
{
char *cbuf = (char *) buf;
off_t offset = 0;
while (len > 0) {
ssize_t rc = read(sock, &cbuf[offset], len);
if (rc == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
break;
default:
return -1;
}
} else if (rc == 0) {
errno = EIO;
return -1;
} else {
len -= rc;
offset += rc;
}
}
return 0;
}
Result<packet, std::string> read_packet(int fd)
{
tailer_packet_type_t type;
if (readall(fd, &type, sizeof(type)) == -1) {
return Ok(packet{packet_eof{}});
}
switch (type) {
case TPT_ERROR: {
packet_error te;
TRY(read_payloads_into(fd, te.te_path, te.te_msg));
return Ok(packet{te});
}
case TPT_OFFER_BLOCK: {
packet_offer_block tob;
TRY(read_payloads_into(fd,
tob.tob_path,
tob.tob_offset,
tob.tob_length,
tob.tob_hash));
return Ok(packet{tob});
}
case TPT_TAIL_BLOCK: {
packet_tail_block ttb;
TRY(read_payloads_into(fd,
ttb.ttb_path,
ttb.ttb_bits));
return Ok(packet{ttb});
}
default:
assert(0);
break;
}
}
}

@ -0,0 +1,188 @@
/**
* Copyright (c) 2021, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef lnav_tailerpp_hh
#define lnav_tailerpp_hh
#include <string>
#include "sha-256.h"
#include "auto_mem.hh"
#include "base/result.h"
#include "fmt/format.h"
#include "mapbox/variant.hpp"
#include "tailer.h"
namespace tailer {
struct packet_eof {
};
struct packet_error {
std::string te_path;
std::string te_msg;
};
struct hash_frag {
uint8_t thf_hash[SHA_256_HASH_SIZE];
bool operator==(const hash_frag &other) const
{
return memcmp(this->thf_hash, other.thf_hash, sizeof(this->thf_hash)) ==
0;
}
};
struct packet_offer_block {
std::string tob_path;
int64_t tob_offset;
int64_t tob_length;
hash_frag tob_hash;
};
struct packet_tail_block {
std::string ttb_path;
std::vector<uint8_t> ttb_bits;
};
using packet = mapbox::util::variant<
packet_eof, packet_error, packet_offer_block, packet_tail_block>;
int readall(int sock, void *buf, size_t len);
inline Result<void, std::string> read_payloads_into(int fd)
{
tailer_packet_payload_type_t payload_type;
readall(fd, &payload_type, sizeof(payload_type));
if (payload_type != TPPT_DONE) {
return Err(std::string("not done"));
}
return Ok();
}
template<typename ...Ts>
Result<void, std::string>
read_payloads_into(int fd, std::vector<uint8_t> &bits, Ts &...args)
{
tailer_packet_payload_type_t payload_type;
if (readall(fd, &payload_type, sizeof(payload_type)) == -1) {
return Err(fmt::format("unable to read bits payload type"));
}
if (payload_type != TPPT_BITS) {
return Err(
fmt::format("expecting bits payload, found: {}", payload_type));
}
int64_t length;
if (readall(fd, &length, sizeof(length)) == -1) {
return Err(std::string("unable to read bits length"));
}
bits.resize(length);
if (readall(fd, bits.data(), length) == -1) {
return Err(fmt::format("unable to read bits of length: {}", length));
}
return read_payloads_into(fd, args...);
}
template<typename ...Ts>
Result<void, std::string>
read_payloads_into(int fd, hash_frag &thf, Ts &...args)
{
tailer_packet_payload_type_t payload_type;
readall(fd, &payload_type, sizeof(payload_type));
if (payload_type != TPPT_HASH) {
return Err(
fmt::format("expecting int64 payload, found: {}", payload_type));
}
readall(fd, thf.thf_hash, SHA_256_HASH_SIZE);
return read_payloads_into(fd, args...);
}
template<typename ...Ts>
Result<void, std::string> read_payloads_into(int fd, int64_t &i, Ts &...args)
{
tailer_packet_payload_type_t payload_type;
readall(fd, &payload_type, sizeof(payload_type));
if (payload_type != TPPT_INT64) {
return Err(
fmt::format("expecting int64 payload, found: {}", payload_type));
}
readall(fd, &i, sizeof(i));
return read_payloads_into(fd, args...);
}
template<typename ...Ts>
Result<void, std::string>
read_payloads_into(int fd, std::string &str, Ts &...args)
{
tailer_packet_payload_type_t payload_type;
readall(fd, &payload_type, sizeof(payload_type));
if (payload_type != TPPT_STRING) {
printf("not a string! %d\n", payload_type);
return Err(
fmt::format("expecting string payload, found: {}", payload_type));
}
int32_t length;
if (readall(fd, &length, sizeof(length)) == -1) {
return Err(std::string("unable to read string length"));
}
auto_mem<char> child_str;
child_str = (char *) malloc(length);
if (child_str == nullptr) {
return Err(fmt::format("string size is too large: {}", length));
}
if (readall(fd, child_str, length) == -1) {
return Err(fmt::format("unable to read string of size: {}", length));
}
str.assign(child_str.in(), length);
return read_payloads_into(fd, args...);
}
Result<packet, std::string> read_packet(int fd);
}
#endif
Loading…
Cancel
Save