free queue contents properly; use input queue for memory limiting

pull/2/head
root 15 years ago
parent 0b9fc52e47
commit 8a90a6dce7

@ -15,8 +15,8 @@ CLEANUP
BUGS
* fast input or slow output -> blocks pile up, huge memory usage
* performance lags under IO?
* slow input -> CPUs idle while waiting for input
* performance lags under IO?
EFFICIENCY
* should use ordered list in collator

@ -218,9 +218,10 @@ void *decode_block_start(off_t block_seek) {
return bw;
}
queue_t *queue_new(void) {
queue_t *queue_new(queue_free_t freer) {
queue_t *q = malloc(sizeof(queue_t));
q->first = q->last = NULL;
q->freer = freer;
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->pop_cond, NULL);
return q;
@ -229,7 +230,8 @@ queue_t *queue_new(void) {
void queue_free(queue_t *q) {
for (queue_item_t *i = q->first; i; ) {
queue_item_t *tmp = i->next;
free(i->data);
if (q->freer)
q->freer(i->type, i->data);
free(i);
i = tmp;
}

@ -36,12 +36,16 @@ struct queue_item_t {
queue_item_t *next;
};
typedef void (*queue_free_t)(int type, void *p);
typedef struct {
queue_item_t *first;
queue_item_t *last;
pthread_mutex_t mutex;
pthread_cond_t pop_cond;
queue_free_t freer;
} queue_t;
@ -70,8 +74,7 @@ void read_file_index(void);
void dump_file_index(void);
void free_file_index(void);
queue_t *queue_new(void);
queue_t *queue_new(queue_free_t freer);
void queue_free(queue_t *q);
// data should be on heap, if present
void queue_push(queue_t *q, int type, void *data);
int queue_pop(queue_t *q, void **datap);

@ -27,7 +27,7 @@ struct io_block_t {
static size_t gNumEncodeThreads = 0;
static pthread_t *gEncodeThreads = NULL;
static pthread_t gReadThread;
static queue_t *gEncodeQ, *gWriteQ;
static queue_t *gReadQ, *gEncodeQ, *gWriteQ;
static size_t gBlockInSize = 0, gBlockOutSize = 0;
static off_t gMultiHeaderStart = 0;
@ -47,7 +47,7 @@ static size_t gFileIndexBufPos = 0;
static void *read_thread(void *data);
static void *encode_thread(void *data);
static void block_queue_free(int type, void *p);
static bool is_multi_header(const char *name);
static void add_file(off_t offset, const char *name);
@ -90,8 +90,16 @@ int main(int argc, char **argv) {
// thread setup
gNumEncodeThreads = num_threads();
gEncodeThreads = malloc(gNumEncodeThreads * sizeof(pthread_t));
gEncodeQ = queue_new();
gWriteQ = queue_new();
gReadQ = queue_new(block_queue_free);
gEncodeQ = queue_new(block_queue_free);
gWriteQ = queue_new(block_queue_free);
for (size_t i = 0; i < (int)(gNumEncodeThreads * 2 + 4); ++i) {
// create blocks, including a margin of error
io_block_t *ib = malloc(sizeof(io_block_t));
ib->input = malloc(gBlockInSize);
ib->output = malloc(gBlockOutSize);
queue_push(gReadQ, MSG_BLOCK, ib);
}
if (pthread_create(&gReadThread, NULL, &read_thread, NULL))
die("Error creating read thread");
for (int i = 0; i < gNumEncodeThreads; ++i) {
@ -133,6 +141,7 @@ int main(int argc, char **argv) {
die("Error joining read thread");
queue_free(gEncodeQ);
queue_free(gWriteQ);
queue_free(gReadQ);
free(gEncodeThreads);
return 0;
@ -167,13 +176,9 @@ static void *read_thread(void *data) {
// write last block, if necessary
if (gReadBlock) {
if (gReadBlock->insize) {
queue_push(gEncodeQ, MSG_BLOCK, gReadBlock);
} else { // if this block had only one read, and it was EOF
free(gReadBlock->input);
free(gReadBlock->output);
free(gReadBlock);
}
// if this block had only one read, and it was EOF, it's waste
queue_push(gReadBlock->insize ? gEncodeQ : gReadQ, MSG_BLOCK, gReadBlock);
gReadBlock = NULL;
}
// stop the other threads
@ -191,9 +196,7 @@ static void *read_thread(void *data) {
static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) {
if (!gReadBlock) {
gReadBlock = malloc(sizeof(io_block_t));
gReadBlock->input = malloc(gBlockInSize);
gReadBlock->output = malloc(gBlockOutSize);
queue_pop(gReadQ, (void**)&gReadBlock);
gReadBlock->insize = 0;
gReadBlock->seq = gBlockNum++;
}
@ -251,6 +254,22 @@ static void add_file(off_t offset, const char *name) {
gLastFile = f;
}
static void block_queue_free(int type, void *p) {
switch (type) {
case MSG_BLOCK: {
io_block_t *ib = (io_block_t*)p;
free(ib->input);
free(ib->output);
free(ib);
break;
}
case MSG_STOP:
break;
default:
die("Unknown msg type %d", type);
}
}
#pragma mark ENCODING
@ -324,9 +343,7 @@ static void write_blocks(io_block_t **ibs, size_t *seq) {
} else {
*ibs = ib->next;
}
free(ib->input);
free(ib->output);
free(ib);
queue_push(gReadQ, MSG_BLOCK, ib);
++*seq;
block_missing = false;

Loading…
Cancel
Save