Multithreaded ncls (#1232)

Thread out ncls to perform the media decode in different threads, in parallel. Only the display needs be locked. On a directory of 200 files on my 39070X, this speeds ncls from ~5s to ~1s. On 75 files, we go from ~.5s to ~.2s. On a single file, we lose about 5%. To facilitate this, ncdirect_render_image() has been split into two helpers, ncdirect_render_frame() and ncdirect_raster_frame().
This commit is contained in:
Nick Black 2020-12-18 15:28:24 -05:00 committed by GitHub
parent d7fd7fa39e
commit 730e9f69ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 141 additions and 35 deletions

View File

@ -143,6 +143,16 @@ namespace ncpp
return ncdirect_render_image (direct, file, align, blitter, scale);
}
struct ncplane* prep_image (const char* file, ncblitter_e blitter, ncscale_e scale) const noexcept
{
return ncdirect_render_frame (direct, file, blitter, scale);
}
int raster_image (struct ncplane* faken, ncalign_e align, ncblitter_e blitter, ncscale_e scale) const noexcept
{
return ncdirect_raster_frame (direct, faken, align, blitter, scale);
}
bool putstr (uint64_t channels, const char* utf8) const NOEXCEPT_MAYBE
{
return error_guard (ncdirect_putstr (direct, channels, utf8), -1);

View File

@ -7,6 +7,8 @@
extern "C" {
#endif
typedef struct ncplane ncdirectv;
#define API __attribute__((visibility("default")))
// ncdirect_init() will call setlocale() to inspect the current locale. If
@ -109,7 +111,8 @@ API int ncdirect_cursor_pop(struct ncdirect* n);
// Display an image using the specified blitter and scaling. The image may
// be arbitrarily many rows -- the output will scroll -- but will only occupy
// the column of the cursor, and those to the right.
// the column of the cursor, and those to the right. The render/raster process
// can be split by using ncdirect_render_frame() and ncdirect_raster_frame().
API int ncdirect_render_image(struct ncdirect* n, const char* filename,
ncalign_e align, ncblitter_e blitter,
ncscale_e scale);
@ -189,6 +192,20 @@ ncdirect_getc_blocking(struct ncdirect* n, ncinput* ni){
// Release 'nc' and any associated resources. 0 on success, non-0 on failure.
API int ncdirect_stop(struct ncdirect* nc);
// Render an image using the specified blitter and scaling, but do not write
// the result. The image may be arbitrarily many rows -- the output will scroll
// -- but will only occupy the column of the cursor, and those to the right.
// To actually write (and free) this, invoke ncdirect_raster_frame().
API ncdirectv* ncdirect_render_frame(struct ncdirect* n, const char* filename,
ncblitter_e blitter, ncscale_e scale);
// Takes the result of ncdirect_render_frame() and writes it to the output. The
// 'align', 'blitter', and 'scale' arguments must be the same as those passed
// to ncdirect_render_frame().
API int ncdirect_raster_frame(struct ncdirect* n, ncdirectv* faken,
ncalign_e align, ncblitter_e blitter,
ncscale_e scale);
#undef API
#ifdef __cplusplus

View File

@ -425,23 +425,42 @@ ncdirect_dump_plane(ncdirect* n, const ncplane* np, int xoff){
return 0;
}
int ncdirect_render_image(ncdirect* n, const char* file, ncalign_e align,
int ncdirect_raster_frame(ncdirect* n, ncdirectv* faken, ncalign_e align,
ncblitter_e blitter, ncscale_e scale){
auto bset = rgba_blitter_low(n->utf8, scale, true, blitter);
if(!bset){
free_plane(faken);
return -1;
}
int lenx = ncplane_dim_x(faken);
int xoff = ncdirect_align(n, align, lenx / encoding_x_scale(bset));
if(ncdirect_dump_plane(n, faken, xoff)){
free_plane(faken);
return -1;
}
int r = ncdirect_flush(n);
free_plane(faken);
return r;
}
ncdirectv* ncdirect_render_frame(ncdirect* n, const char* file,
ncblitter_e blitter, ncscale_e scale){
struct ncvisual* ncv = ncvisual_from_file(file);
if(ncv == nullptr){
return -1;
return nullptr;
}
//fprintf(stderr, "OUR DATA: %p rows/cols: %d/%d\n", ncv->data, ncv->rows, ncv->cols);
int leny = ncv->rows; // we allow it to freely scroll
int lenx = ncv->cols;
if(leny == 0 || lenx == 0){
ncvisual_destroy(ncv);
return -1;
return nullptr;
}
//fprintf(stderr, "render %d/%d to %d+%dx%d scaling: %d\n", ncv->rows, ncv->cols, leny, lenx, scale);
auto bset = rgba_blitter_low(n->utf8, scale, true, blitter);
if(!bset){
return -1;
ncvisual_destroy(ncv);
return nullptr;
}
int disprows, dispcols;
if(scale != NCSCALE_NONE){
@ -469,22 +488,26 @@ int ncdirect_render_image(ncdirect* n, const char* file, ncalign_e align,
};
struct ncplane* faken = ncplane_new_internal(nullptr, nullptr, &nopts);
if(faken == nullptr){
return -1;
ncvisual_destroy(ncv);
return nullptr;
}
if(ncvisual_blit(ncv, disprows, dispcols, faken, bset,
0, 0, 0, 0, leny, lenx, false)){
ncvisual_destroy(ncv);
free_plane(faken);
return -1;
return nullptr;
}
ncvisual_destroy(ncv);
int xoff = ncdirect_align(n, align, lenx / encoding_x_scale(bset));
if(ncdirect_dump_plane(n, faken, xoff)){
return faken;
}
int ncdirect_render_image(ncdirect* n, const char* file, ncalign_e align,
ncblitter_e blitter, ncscale_e scale){
auto faken = ncdirect_render_frame(n, file, blitter, scale);
if(!faken){
return -1;
}
int r = ncdirect_flush(n);
free_plane(faken);
return r;
return ncdirect_raster_frame(n, faken, align, blitter, scale);
}
int ncdirect_fg_palindex(ncdirect* nc, int pidx){

View File

@ -1,10 +1,15 @@
#define NCPP_EXCEPTIONS_PLEASE
#include <queue>
#include <thread>
#include <vector>
#include <atomic>
#include <cstdlib>
#include <fcntl.h>
#include <getopt.h>
#include <iostream>
#include <dirent.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/stat.h>
#include <filesystem>
#include <sys/types.h>
@ -13,8 +18,7 @@
#define AT_NO_AUTOMOUNT 0 // not defined on freebsd
#endif
static void
usage(std::ostream& os, const char* name, int code){
void usage(std::ostream& os, const char* name, int code){
os << "usage: " << name << " -h | [ -lLR ] [ --align type ] paths...\n";
os << " -d: list directories themselves, not their contents\n";
os << " -l: use a long listing format\n";
@ -26,6 +30,17 @@ usage(std::ostream& os, const char* name, int code){
exit(code);
}
struct job {
std::filesystem::path dir;
std::string p;
};
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t outmtx = PTHREAD_MUTEX_INITIALIZER; // guards standard out
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
std::queue<job> work; // jobs available for workers
bool keep_working; // set false when we're done so threads die
// context as configured on the command line
struct lsContext {
ncpp::Direct nc;
@ -36,24 +51,23 @@ struct lsContext {
ncalign_e alignment;
};
static int
handle_path(int dirfd, std::filesystem::path& dir, const char* p, const lsContext& ctx, bool toplevel);
int handle_path(int dirfd, std::filesystem::path& dir, const char* p, const lsContext& ctx, bool toplevel);
// handle a single inode of arbitrary type
static int
handle_inode(std::filesystem::path& dir, const char* p, const struct stat* st, const lsContext& ctx){
int handle_inode(std::filesystem::path& dir, const char* p, const struct stat* st, const lsContext& ctx){
(void)st; // FIXME handle symlink (dereflinks)
std::cout << p << '\n';
auto s = dir / p;
ctx.nc.render_image(s.c_str(), ctx.alignment, NCBLIT_DEFAULT, NCSCALE_SCALE);
(void)ctx; // FIXME handle symlink (dereflinks)
pthread_mutex_lock(&mtx);
work.emplace(job{dir, p});
pthread_mutex_unlock(&mtx);
pthread_cond_signal(&cond);
return 0;
}
// if |directories| is true, only print details of |p|, and return. otherwise,
// if |recursedirs| or |toplevel| is set, we will recurse, passing false as
// toplevel (but preserving |recursedirs|).
static int
handle_dir(int dirfd, std::filesystem::path& pdir, const char* p, const struct stat* st, const lsContext& ctx, bool toplevel){
int handle_dir(int dirfd, std::filesystem::path& pdir, const char* p, const struct stat* st, const lsContext& ctx, bool toplevel){
if(ctx.directories){
return handle_inode(pdir, p, st, ctx);
}
@ -91,8 +105,7 @@ handle_dir(int dirfd, std::filesystem::path& pdir, const char* p, const struct s
return 0;
}
static int
handle_deref(const char* p, const struct stat* st, const lsContext& ctx){
int handle_deref(const char* p, const struct stat* st, const lsContext& ctx){
(void)p;
(void)st;
(void)ctx; // FIXME dereference and rerun on target
@ -101,8 +114,7 @@ handle_deref(const char* p, const struct stat* st, const lsContext& ctx){
// handle some path |p|, either absolute or relative to |dirfd|. |toplevel| is
// true iff the path was directly listed on the command line.
static int
handle_path(int dirfd, std::filesystem::path& pdir, const char* p, const lsContext& ctx, bool toplevel){
int handle_path(int dirfd, std::filesystem::path& pdir, const char* p, const lsContext& ctx, bool toplevel){
struct stat st;
if(fstatat(dirfd, p, &st, AT_NO_AUTOMOUNT)){
std::cerr << "Error running fstatat(" << p << "): " << strerror(errno) << std::endl;
@ -118,10 +130,35 @@ handle_path(int dirfd, std::filesystem::path& pdir, const char* p, const lsConte
return handle_inode(pdir, p, &st, ctx);
}
// return long-term return code
void ncls_thread(const lsContext* ctx) {
while(true){
pthread_mutex_lock(&mtx);
while(work.empty() && keep_working){
pthread_cond_wait(&cond, &mtx);
}
if(!work.empty()){
job j = work.front();
work.pop();
pthread_mutex_unlock(&mtx);
auto s = j.dir / j.p;
auto faken = ctx->nc.prep_image(s.c_str(), NCBLIT_DEFAULT, NCSCALE_SCALE);
pthread_mutex_lock(&outmtx);
std::cout << j.p << '\n';
if(faken){
ctx->nc.raster_image(faken, ctx->alignment, NCBLIT_DEFAULT, NCSCALE_SCALE);
}
pthread_mutex_unlock(&outmtx);
}else if(!keep_working){
pthread_mutex_unlock(&mtx);
return;
}
}
}
// these are our command line arguments. they're the only paths for which
// handle_path() gets toplevel == true.
static int
list_paths(const char* const * argv, const lsContext& ctx){
int list_paths(const char* const * argv, const lsContext& ctx){
int dirfd = open(".", O_DIRECTORY | O_CLOEXEC);
if(dirfd < 0){
std::cerr << "Error opening current directory: " << strerror(errno) << std::endl;
@ -182,15 +219,34 @@ int main(int argc, char* const * argv){
break;
}
}
auto procs = std::thread::hardware_concurrency();
if(procs <= 0){
procs = 4;
}
if(procs > 8){
procs = 8;
}
std::vector<std::thread> threads;
lsContext ctx = {
.nc = ncpp::Direct(),
.longlisting = longlisting,
.recursedirs = recursedirs,
.directories = directories,
.dereflinks = dereflinks,
.alignment = alignment,
ncpp::Direct(),
longlisting,
recursedirs,
directories,
dereflinks,
alignment,
};
keep_working = true;
for(auto s = 0u ; s < procs ; ++s){
threads.emplace_back(std::thread(ncls_thread, &ctx));
}
static const char* const default_args[] = { ".", nullptr };
list_paths(argv[optind] ? argv + optind : default_args, ctx);
keep_working = false;
pthread_cond_broadcast(&cond);
for(auto &t : threads){
//std::cerr << "Waiting on thread " << procs << std::endl;
t.join();
--procs;
}
return EXIT_SUCCESS;
}