Fixes #18 (hopefully :)

pull/58/head
lanjelot 8 years ago
parent 116040eb8b
commit 8fdf6ac3ca

@ -631,18 +631,12 @@ TODO
# logging {{{ # logging {{{
class Logger: class Logger:
def __init__(self, pipe): def __init__(self, queue):
self.pipe = pipe self.queue = queue
self.name = multiprocessing.current_process().name self.name = multiprocessing.current_process().name
# neat but wont work on windows
# def __getattr__(self, action):
# def send(*args):
# self.pipe.send((self.name, action, args))
# return send
def send(self, action, *args): def send(self, action, *args):
self.pipe.send((self.name, action, args)) self.queue.put((self.name, action, args))
def quit(self): def quit(self):
self.send('quit') self.send('quit')
@ -729,7 +723,7 @@ class MsgFilter(logging.Filter):
else: else:
return 1 return 1
def process_logs(pipe, indicatorsfmt, argv, log_dir): def process_logs(queue, indicatorsfmt, argv, log_dir):
ignore_ctrlc() ignore_ctrlc()
@ -814,7 +808,7 @@ def process_logs(pipe, indicatorsfmt, argv, log_dir):
while True: while True:
pname, action, args = pipe.recv() pname, action, args = queue.get()
if action == 'quit': if action == 'quit':
if log_dir: if log_dir:
@ -1397,14 +1391,14 @@ Please read the README inside for more examples and usage information.
self.ns.start_time = 0 self.ns.start_time = 0
self.ns.total_size = 1 self.ns.total_size = 1
pipe = multiprocessing.Pipe(duplex=False) log_queue = multiprocessing.Queue()
logsvc = Process(name='LogSvc', target=process_logs, args=(pipe[0], module.Response.indicatorsfmt, argv, build_logdir(opts.log_dir, opts.auto_log))) logsvc = Process(name='LogSvc', target=process_logs, args=(log_queue, module.Response.indicatorsfmt, argv, build_logdir(opts.log_dir, opts.auto_log)))
logsvc.daemon = True logsvc.daemon = True
logsvc.start() logsvc.start()
global logger global logger
logger = Logger(pipe[1]) logger = Logger(log_queue)
if opts.debug: if opts.debug:
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@ -1617,23 +1611,23 @@ Please read the README inside for more examples and usage information.
# consumers # consumers
for num in range(self.num_threads): for num in range(self.num_threads):
report_queue = multiprocessing.Queue(maxsize=1000) report_queue = multiprocessing.Queue(maxsize=1000)
t = Process(name='Consumer-%d' % num, target=self.consume, args=(task_queues[num], report_queue, logger.pipe)) t = Process(name='Consumer-%d' % num, target=self.consume, args=(task_queues[num], report_queue, logger.queue))
t.daemon = True t.daemon = True
t.start() t.start()
self.thread_report.append(report_queue) self.thread_report.append(report_queue)
self.thread_progress.append(Progress()) self.thread_progress.append(Progress())
# producer # producer
t = Process(name='Producer', target=self.produce, args=(task_queues, logger.pipe)) t = Process(name='Producer', target=self.produce, args=(task_queues, logger.queue))
t.daemon = True t.daemon = True
t.start() t.start()
def produce(self, task_queues, pipe): def produce(self, task_queues, log_queue):
ignore_ctrlc() ignore_ctrlc()
global logger global logger
logger = Logger(pipe) logger = Logger(log_queue)
iterables = [] iterables = []
total_size = 1 total_size = 1
@ -1766,13 +1760,13 @@ Please read the README inside for more examples and usage information.
logger.debug('producer exits') logger.debug('producer exits')
def consume(self, task_queue, report_queue, pipe): def consume(self, task_queue, report_queue, log_queue):
ignore_ctrlc() ignore_ctrlc()
handle_alarm() handle_alarm()
global logger global logger
logger = Logger(pipe) logger = Logger(log_queue)
module = self.module() module = self.module()

Loading…
Cancel
Save