diff --git a/onioningestor/__init__.py b/onioningestor/__init__.py index d5535f0..4e262ea 100644 --- a/onioningestor/__init__.py +++ b/onioningestor/__init__.py @@ -4,7 +4,7 @@ import queue import traceback import threading import collections -from queue import Queue +from queue import Queue, Empty from itertools import islice from . import config @@ -71,92 +71,77 @@ class Ingestor: self.logger.debug(traceback.print_exc()) sys.exit(1) - def iter_batches(self, data, batch_size): - data = iter(data) - while True: - batch = list(islice(data, batch_size)) - if len(batch) == 0: - break - yield batch - - def process(self, onions): + def collect_sources(self): + self.logger.debug("Initializing sources") + for name, collect, kwargs in self.config.sources(): + # Run the source to collect onion links from clear net. + self.logger.info(f"Running source '{name}'") + try: + # get the generator of onions + source = collect(self.logger, **kwargs) + source.set_onionQueue(self.queue) #priority 2 + t = source.run() + self.threads.append(t) + #self.logger.info(f'Starting of thread: {t.currentThread().name}') + #t.start() + except Exception as e: + self.logger.error(e) + self.logger.error(traceback.print_exc()) + continue + + def process(self, onion): for operator in self.operators: self.logger.info(f"Processing found onions with operator '{operator}'") # Set CrawlQueue for every operator self.operators[operator].set_crawlQueue(self.queue) # Process list of onions - self.operators[operator].process(onions) + self.operators[operator].process(onion) def run(self): """Run once, or forever, depending on config.""" - self.run_once() - #if self.config.daemon(): - # self.logger.info("Running forever, in a loop") - # self.run_forever() - #else: - # self.logger.info("Running once, to completion") - # self.run_once() + if self.config.daemon(): + self.logger.info("Running forever, in a loop") + self.run_forever() + else: + self.logger.info("Running once, to completion") + self.run_once() def run_once(self): """Run each source once, passing artifacts to each operator.""" # Start collecting sources - self.collect_sources() + # self.collect_sources() # Sources will fill various queues # MonitorQueue has priority high # OnionQueue are those found in clearnet medium # crawlQueue are those found crawling onionlinks low - onions = list(self.queue.queue) done = False - if onions: - while not done: - try: - ## Process onions with each operator. - for batched_onions in self.iter_batches(onions, batch_size=10): - self.process(batched_onions) - ## Save Onions for each storage - for onion in batched_onions: - self.storage.save_pastie(onion[1], 30) - done = True - except Exception as e: - self.logger.error(e) - self.logger.error(traceback.print_exc()) - break - except KeyboardInterrupt: - print('') - self.logger.info("Ctrl-c received! Sending kill to threads...") - for t in self.threads: - t.kill_received = True - self.logger.info('Exiting') - sys.exit(0) - else: - for t in self.threads: - t.kill_received = True - self.logger.info(f"Sleeping for {self.config.sleep()} seconds") - time.sleep(self.config.sleep()) - + while not done: + try: + onion = self.queue.get(True, 5) + ## Process onions with each operator. + self.process(onion) + ## Save Onions for each storage + self.storage.save_pastie(onion[1], 30) + except Empty: + self.logger.info('Queue is empty') + done = True + except Exception as e: + self.logger.error(e) + self.logger.error(traceback.print_exc()) + break + except KeyboardInterrupt: + print('') + self.logger.info("Ctrl-c received! Sending kill to threads...") + for t in self.threads: + t.kill_received = True + self.logger.info('Exiting') + sys.exit(0) def run_forever(self): """Run forever, sleeping for the configured interval between each run.""" while True: self.run_once() - - def collect_sources(self): - self.logger.debug("Initializing sources") - for name, collect, kwargs in self.config.sources(): - # Run the source to collect onion links from clear net. - self.logger.info(f"Running source '{name}'") - try: - # get the generator of onions - source = collect(self.logger, **kwargs) - source.set_onionQueue(self.queue) #priority 2 - t = source.run() - self.threads.append(t) - #self.logger.info(f'Starting of thread: {t.currentThread().name}') - #t.start() - except Exception as e: - self.logger.error(e) - self.logger.error(traceback.print_exc()) - continue - + self.logger.debug(f"Sleeping for {self.config.sleep()} seconds") + time.sleep(self.config.sleep()) diff --git a/onioningestor/config.py b/onioningestor/config.py index d0e2af1..ca7d5d1 100644 --- a/onioningestor/config.py +++ b/onioningestor/config.py @@ -60,9 +60,9 @@ class Config: return self.config["general"]["TorController"] def monitorQueue(self): - fp = self.config["monitor"].get("filename", False) + fp = Path(self.config["monitor"].get("filename", "this_File_Does_notExsit")) q = PriorityQueue(maxsize=0) - if fp: + if fp.is_file(): with open(fp, 'r') as f: monitorOnions = f.read().splitlines() for monitor in monitorOnions: @@ -77,7 +77,7 @@ class Config: denylist=False))) return q else: - return None + return q def logging(self): """Returns logging config dictionary.""" diff --git a/onioningestor/operators/__init__.py b/onioningestor/operators/__init__.py index c55cd0f..1a9ee53 100644 --- a/onioningestor/operators/__init__.py +++ b/onioningestor/operators/__init__.py @@ -115,16 +115,9 @@ class Operator: monitor=False, denylist=False))) - def collect(self, onions): - for onion in onions: - self.logger.info(f'thread function processing {onion[1]}') - self.handle_onion(onion[1]) - - - def process(self, onions): + def process(self, onion): """Process all applicable onions.""" - for onion in onions: - self.handle_onion(onion[1]) + self.handle_onion(onion[1]) #with ThreadPoolExecutor(max_workers=1) as executor: # collect_tasks = [executor.submit(self.collect, files_batch) for files_batch in self.iter_batches(onions, batch_size=10)] # for tasks in collect_tasks: diff --git a/onioningestor/operators/onionscan.py b/onioningestor/operators/onionscan.py index ff71a45..5fd6785 100644 --- a/onioningestor/operators/onionscan.py +++ b/onioningestor/operators/onionscan.py @@ -34,8 +34,8 @@ class Plugin(Operator): hiddenService = data.pop('hiddenService', None) data['crawls'] = [*crawls] try: - if data['linkedOnions']: - self.findCrawls(data['linkedOnions'], hiddenService) + if data['identifierReport'].get('linkedOnions', False): + self.findCrawls(data['identifierReport']['linkedOnions'], hiddenService) except KeyError as e: pass return data