[sixel] safe, reliable worker engine #2573

pull/2610/head
nick black 2 years ago committed by nick black
parent 5d3266ce33
commit 7fdd0e5b4d

@ -5,9 +5,12 @@
#define RGBSIZE 3
// number of worker threads FIXME
// number of worker threads FIXME fit to local machine
#define POPULATION 3
// a worker can have up to three qstates enqueued for work
#define WORKERDEPTH 3
// this palette entry is a sentinel for a transparent pixel (and thus caps
// the palette at 65535 other entries).
#define TRANS_PALETTE_ENTRY 65535
@ -15,21 +18,36 @@
// bytes per element in the auxiliary vector
#define AUXVECELEMSIZE 2
// returns the number of individual sixels necessary to represent the specified
// pixel geometry. these might encompass more pixel rows than |dimy| would
// suggest, up to the next multiple of 6 (i.e. a single row becomes a 6-row
// bitmap; as do two, three, four, five, or six rows). input is scaled geometry.
static inline int
sixelcount(int dimy, int dimx){
return (dimy + 5) / 6 * dimx;
}
// three scaled sixel [0..100x3] components plus a population count.
typedef struct qsample {
unsigned char comps[RGBSIZE];
uint32_t pop;
} qsample;
// returns the number of sixel bands (horizontal series of sixels, aka 6 rows)
// for |dimy| source rows. sixels are encoded as a series of sixel bands.
static inline int
sixelbandcount(int dimy){
return sixelcount(dimy, 1);
}
// lowest samples for each node. first-order nodes track 1000 points in
// sixelspace (10x10x10). there are eight possible second-order nodes from a
// fractured first-order node, covering 125 points each (5x5x5).
typedef struct qnode {
qsample q;
// cidx plays two roles. during merge, we select the active set, and extract
// them (since they'll be sorted, we can't operate directly on the octree).
// here, we use cidx to map back to the initial octree entry, as we need
// update them (from the active set) at the end of merging. afterwards, the
// high bit indicates that it was chosen, and the cidx is a valid index into
// the final color table. it is otherwise a link to the merged qnode.
// during initial filtering, qlink determines whether a node has fractured:
// if qlink is non-zero, it is a one-biased index to an onode.
// FIXME combine these once more, but for now to keep it easy, we have two.
// qlink links back into the octree.
uint16_t qlink;
uint16_t cidx;
} qnode;
// an octree-style node, used for fractured first-order nodes. the first
// bit is whether we're on the top or bottom of the R, then G, then B.
typedef struct onode {
qnode* q[8];
} onode;
// we set P2 based on whether there is any transparency in the sixel. if not,
// use SIXEL_P2_ALLOPAQUE (0), for faster drawing in certain terminals.
@ -66,12 +84,99 @@ typedef struct sixelmap {
sixel_p2_e p2; // set to SIXEL_P2_TRANS if we have transparent pixels
} sixelmap;
// second pass: construct data for extracted colors over the sixels. the
// map will be persisted in the sprixel; the remainder is lost.
// FIXME kill this off; use sixelmap directly
typedef struct sixeltable {
sixelmap* map; // copy of palette indices / transparency bits
} sixeltable;
typedef struct qstate {
int refcount; // initialized to worker count
atomic_int bandbuilder; // threads take bands as their work unit
// we always work in terms of quantized colors (quantization is the first
// step of rendering), using indexes into the derived palette. the actual
// palette need only be stored during the initial render, since the sixel
// header can be preserved, and the palette is unchanged by wipes/restores.
unsigned char* table; // |colors| x RGBSIZE components
qnode* qnodes;
onode* onodes;
unsigned dynnodes_free;
unsigned dynnodes_total;
unsigned onodes_free;
unsigned onodes_total;
const struct blitterargs* bargs;
const uint32_t* data;
int linesize;
sixelmap* smap;
// these are the leny and lenx passed to sixel_blit(), which are likely
// different from those reachable through bargs->len{y,x}!
int leny, lenx;
} qstate;
// a work_queue per worker thread. if used == WORKERDEPTH, this thread is
// backed up, and we cannot enqueue to it. writeto wraps around the array.
typedef struct work_queue {
qstate* qstates[WORKERDEPTH];
unsigned writeto;
unsigned used;
} work_queue;
// we keep a few worker threads (POPULATION) spun up to assist with
// quantization. each has an array of up to WORKERDEPTH qstates to work on.
typedef struct sixel_engine {
pthread_mutex_t lock;
pthread_cond_t cond;
work_queue queues[POPULATION];
pthread_t tids[POPULATION];
bool done;
} sixel_engine;
// enqueue |qs| to any workers with available space. the number of workers with
// a reference will be stored in |qs|->refcount.
static void
enqueue_to_workers(sixel_engine* eng, qstate* qs){
int usecount = 0;
pthread_mutex_lock(&eng->lock);
for(int i = 0 ; i < POPULATION ; ++i){
work_queue* wq = &eng->queues[i];
if(wq->used < WORKERDEPTH){
wq->qstates[wq->writeto] = qs;
++wq->used;
++usecount;
}
if(++wq->writeto == WORKERDEPTH){
wq->writeto = 0;
}
}
qs->refcount = usecount;
pthread_mutex_unlock(&eng->lock);
if(usecount){
pthread_cond_broadcast(&eng->cond);
}
}
// block until all workers have finished up with |qs|
static void
block_on_workers(sixel_engine* eng, qstate* qs){
pthread_mutex_lock(&eng->lock);
while(qs->refcount){
pthread_cond_wait(&eng->cond, &eng->lock);
}
pthread_mutex_unlock(&eng->lock);
}
// FIXME make this part of the context, sheesh
static sixel_engine globsengine;
// returns the number of individual sixels necessary to represent the specified
// pixel geometry. these might encompass more pixel rows than |dimy| would
// suggest, up to the next multiple of 6 (i.e. a single row becomes a 6-row
// bitmap; as do two, three, four, five, or six rows). input is scaled geometry.
static inline int
sixelcount(int dimy, int dimx){
return (dimy + 5) / 6 * dimx;
}
// returns the number of sixel bands (horizontal series of sixels, aka 6 rows)
// for |dimy| source rows. sixels are encoded as a series of sixel bands.
static inline int
sixelbandcount(int dimy){
return sixelcount(dimy, 1);
}
// whip up a sixelmap sans data for the specified pixel geometry and color
// register count.
@ -113,37 +218,6 @@ void sixelmap_free(sixelmap *s){
}
}
// three scaled sixel [0..100x3] components plus a population count.
typedef struct qsample {
unsigned char comps[RGBSIZE];
uint32_t pop;
} qsample;
// lowest samples for each node. first-order nodes track 1000 points in
// sixelspace (10x10x10). there are eight possible second-order nodes from a
// fractured first-order node, covering 125 points each (5x5x5).
typedef struct qnode {
qsample q;
// cidx plays two roles. during merge, we select the active set, and extract
// them (since they'll be sorted, we can't operate directly on the octree).
// here, we use cidx to map back to the initial octree entry, as we need
// update them (from the active set) at the end of merging. afterwards, the
// high bit indicates that it was chosen, and the cidx is a valid index into
// the final color table. it is otherwise a link to the merged qnode.
// during initial filtering, qlink determines whether a node has fractured:
// if qlink is non-zero, it is a one-biased index to an onode.
// FIXME combine these once more, but for now to keep it easy, we have two.
// qlink links back into the octree.
uint16_t qlink;
uint16_t cidx;
} qnode;
// an octree-style node, used for fractured first-order nodes. the first
// bit is whether we're on the top or bottom of the R, then G, then B.
typedef struct onode {
qnode* q[8];
} onode;
// convert rgb [0..255] to sixel [0..99]
static inline unsigned
ss(unsigned c){
@ -184,30 +258,6 @@ qidx(const qnode* q){
return q->cidx & ~0x8000u;
}
typedef struct qstate {
atomic_int refcount; // initialized to worker count
atomic_int bandbuilder; // threads take bands as their work unit
// we always work in terms of quantized colors (quantization is the first
// step of rendering), using indexes into the derived palette. the actual
// palette need only be stored during the initial render, since the sixel
// header can be preserved, and the palette is unchanged by wipes/restores.
unsigned char* table; // |colors| x RGBSIZE components
qnode* qnodes;
onode* onodes;
unsigned dynnodes_free;
unsigned dynnodes_total;
unsigned onodes_free;
unsigned onodes_total;
const struct blitterargs* bargs;
const uint32_t* data;
int linesize;
sixelmap* smap;
// these are the leny and lenx passed to sixel_blit(), which are likely
// different from those reachable through bargs->len{y,x}!
int leny, lenx;
struct qstate* next; // next in the threading engine's queue
} qstate;
#define QNODECOUNT 1000
// create+zorch an array of QNODECOUNT qnodes. this is 1000 entries covering
@ -218,28 +268,31 @@ typedef struct qstate {
// we must have 8 dynnodes available for every onode we create, or we can run
// into a situation where we don't have an available dynnode
// (see insert_color()).
static int
alloc_qstate(unsigned colorregs, qstate* qs){
qs->dynnodes_free = colorregs;
qs->dynnodes_total = qs->dynnodes_free;
if((qs->qnodes = malloc((QNODECOUNT + qs->dynnodes_total) * sizeof(qnode))) == NULL){
return -1;
}
qs->onodes_free = qs->dynnodes_total / 8;
qs->onodes_total = qs->onodes_free;
if((qs->onodes = malloc(qs->onodes_total * sizeof(*qs->onodes))) == NULL){
free(qs->qnodes);
return -1;
}
// don't technically need to clear the components, as we could
// check the pop, but it's hidden under the compulsory cache misses.
// we only initialize the static nodes, not the dynamic ones--we know
// when we pull a dynamic one that it needs its popcount initialized.
memset(qs->qnodes, 0, sizeof(qnode) * QNODECOUNT);
qs->table = NULL;
qs->refcount = 1 + POPULATION;
qs->next = NULL;
return 0;
static qstate*
alloc_qstate(unsigned colorregs){
qstate* qs = malloc(sizeof(*qs));
if(qs){
qs->dynnodes_free = colorregs;
qs->dynnodes_total = qs->dynnodes_free;
if((qs->qnodes = malloc((QNODECOUNT + qs->dynnodes_total) * sizeof(qnode))) == NULL){
free(qs);
return NULL;
}
qs->onodes_free = qs->dynnodes_total / 8;
qs->onodes_total = qs->onodes_free;
if((qs->onodes = malloc(qs->onodes_total * sizeof(*qs->onodes))) == NULL){
free(qs->qnodes);
free(qs);
return NULL;
}
// don't technically need to clear the components, as we could
// check the pop, but it's hidden under the compulsory cache misses.
// we only initialize the static nodes, not the dynamic ones--we know
// when we pull a dynamic one that it needs its popcount initialized.
memset(qs->qnodes, 0, sizeof(qnode) * QNODECOUNT);
qs->table = NULL;
}
return qs;
}
// free internals of qstate object
@ -250,6 +303,7 @@ free_qstate(qstate *qs){
free(qs->qnodes);
free(qs->onodes);
free(qs->table);
free(qs);
}
}
@ -983,21 +1037,6 @@ build_sixel_band(qstate* qs, int bnum){
return 0;
}
// we keep a few worker threads spun up to assist with quantization.
typedef struct sixel_engine {
// FIXME we'll want maybe one per core in our cpuset?
pthread_t tids[POPULATION];
unsigned workers;
unsigned workers_wanted;
pthread_mutex_t lock;
pthread_cond_t cond;
bool done;
qstate* qs;
} sixel_engine;
// FIXME make this part of the context, sheesh
static sixel_engine globsengine;
static int
bandworker(qstate* qs){
int b;
@ -1005,7 +1044,6 @@ bandworker(qstate* qs){
if(build_sixel_band(qs, b) < 0){
return -1;
}
//fprintf(stderr, "%lu DID BAND %d on %p\n", pthread_self(), b, qs);
}
return 0;
}
@ -1020,11 +1058,7 @@ build_data_table(qstate* qs){
return -1;
}
qs->bandbuilder = 0;
pthread_mutex_lock(&globsengine.lock);
// FIXME need enqueue it
globsengine.qs = qs;
pthread_mutex_unlock(&globsengine.lock);
pthread_cond_broadcast(&globsengine.cond);
enqueue_to_workers(&globsengine, qs);
size_t tsize = RGBSIZE * smap->colors;
qs->table = malloc(tsize);
if(qs->table == NULL){
@ -1032,7 +1066,7 @@ build_data_table(qstate* qs){
}
load_color_table(qs);
bandworker(qs);
// FIXME need to drop our reference, possibly drop qs
block_on_workers(&globsengine, qs);
return 0;
}
@ -1364,28 +1398,28 @@ int sixel_blit(ncplane* n, int linesize, const void* data, int leny, int lenx,
return -1;
}
assert(n->tam);
qstate qs;
if(alloc_qstate(bargs->u.pixel.colorregs, &qs)){
qstate* qs;
if((qs = alloc_qstate(bargs->u.pixel.colorregs)) == NULL){
logerror("couldn't allocate qstate");
sixelmap_free(smap);
return -1;
}
qs.bargs = bargs;
qs.data = data;
qs.linesize = linesize;
qs.smap = smap;
qs.leny = leny;
qs.lenx = lenx;
if(extract_color_table(&qs)){
qs->bargs = bargs;
qs->data = data;
qs->linesize = linesize;
qs->smap = smap;
qs->leny = leny;
qs->lenx = lenx;
if(extract_color_table(qs)){
free(bargs->u.pixel.spx->needs_refresh);
bargs->u.pixel.spx->needs_refresh = NULL;
sixelmap_free(smap);
free_qstate(&qs);
free_qstate(qs);
return -1;
}
// takes ownership of sixelmap on success
int r = sixel_blit_inner(&qs, smap, bargs, n->tam);
free_qstate(&qs);
int r = sixel_blit_inner(qs, smap, bargs, n->tam);
free_qstate(qs);
if(r < 0){
sixelmap_free(smap);
// FIXME free refresh table?
@ -1484,47 +1518,52 @@ int sixel_draw(const tinfo* ti, const ncpile* p, sprixel* s, fbuf* f,
// a quantization worker.
static void *
sixel_worker(void* v){
sixel_engine *sengine = v;
pthread_mutex_lock(&globsengine.lock);
if(++sengine->workers < sengine->workers_wanted){
pthread_mutex_unlock(&globsengine.lock);
// don't bail on a failure here
if(pthread_create(&globsengine.tids[sengine->workers], NULL, sixel_worker, sengine)){
logerror("couldn't spin up sixel worker %u", sengine->workers);
}
}else{
pthread_mutex_unlock(&globsengine.lock);
}
sixel_engine *sengine = &globsengine;
work_queue* wq = v;
qstate* qs = NULL;
pthread_mutex_lock(&globsengine.lock);
unsigned bufpos = 0; // index into worker queue
do{
while((sengine->qs == NULL || sengine->qs == qs) && !sengine->done){
pthread_cond_wait(&globsengine.cond, &globsengine.lock);
pthread_mutex_lock(&sengine->lock);
while(wq->used == 0 && !sengine->done){
pthread_cond_wait(&sengine->cond, &sengine->lock);
}
if(sengine->done){
pthread_mutex_unlock(&globsengine.lock);
return NULL;
if(!sengine->done){
qs = wq->qstates[bufpos];
}else{
qs = NULL;
}
pthread_mutex_unlock(&sengine->lock);
if(qs == NULL){
break;
}
qs = sengine->qs;
pthread_mutex_unlock(&globsengine.lock);
bandworker(qs);
bool sendsignal = false;
pthread_mutex_lock(&sengine->lock);
--wq->used;
if(--qs->refcount == 0){
pthread_mutex_lock(&globsengine.lock);
qstate* qnext = qs->next;
sengine->qs = qnext;
}else{
pthread_mutex_lock(&globsengine.lock);
sendsignal = true;
}
pthread_mutex_unlock(&sengine->lock);
if(sendsignal){
pthread_cond_broadcast(&sengine->cond);
}
if(++bufpos == WORKERDEPTH){
bufpos = 0;
}
}while(1);
return NULL;
}
static int
sixel_init_core(const char* initstr, int fd){
globsengine.workers = 0;
globsengine.workers_wanted = sizeof(globsengine.tids) / sizeof(*globsengine.tids);
// don't fail on an error here
if(pthread_create(globsengine.tids, NULL, sixel_worker, &globsengine)){
logerror("couldn't spin up sixel workers");
const int workers_wanted = sizeof(globsengine.tids) / sizeof(*globsengine.tids);
for(int w = 0 ; w < workers_wanted ; ++w){
if(pthread_create(&globsengine.tids[w], NULL, sixel_worker, &globsengine.queues[w])){
logerror("couldn't spin up sixel worker %d/%d", w, workers_wanted);
// FIXME kill any created workers
return -1;
}
}
return tty_emit(initstr, fd);
}
@ -1623,10 +1662,9 @@ int sixel_rebuild(sprixel* s, int ycell, int xcell, uint8_t* auxvec){
void sixel_cleanup(tinfo* ti){
(void)ti; // FIXME pick up globsengine from ti!
unsigned tids = 0;
const unsigned tids = POPULATION;
pthread_mutex_lock(&globsengine.lock);
globsengine.done = 1;
tids = globsengine.workers;
pthread_mutex_unlock(&globsengine.lock);
pthread_cond_broadcast(&globsengine.cond);
// FIXME what if we spawned another worker since taking zee lock?

Loading…
Cancel
Save