From c752c09e257d2e3b9218df5d883dac125485564f Mon Sep 17 00:00:00 2001 From: danieleperera Date: Tue, 21 Jul 2020 14:03:57 +0000 Subject: [PATCH] fixed indexing issue --- onioningestor/__init__.py | 35 ++++++++----- onioningestor/config.py | 3 ++ onioningestor/databases/__init__.py | 20 +------- onioningestor/databases/elasticsearch.py | 1 - onioningestor/databases/telegram.py | 5 +- onioningestor/onion.py | 12 ++--- onioningestor/operators/__init__.py | 64 ++++++++++++++++-------- onioningestor/operators/html.py | 37 +++++--------- onioningestor/operators/onionscan.py | 54 +++++++------------- 9 files changed, 108 insertions(+), 123 deletions(-) diff --git a/onioningestor/__init__.py b/onioningestor/__init__.py index 8437f68..d5535f0 100644 --- a/onioningestor/__init__.py +++ b/onioningestor/__init__.py @@ -5,6 +5,7 @@ import traceback import threading import collections from queue import Queue +from itertools import islice from . import config from . import loghandler @@ -61,7 +62,7 @@ class Ingestor: # Instantiate operator plugins. self.logger.debug("initializing operators") - self.operators = {name: operator(self.logger, self.blacklist, **kwargs) + self.operators = {name: operator(self.logger, self.config.torController(), self.blacklist, **kwargs) for name, operator, kwargs in self.config.operators()} except Exception as e: @@ -70,6 +71,21 @@ class Ingestor: self.logger.debug(traceback.print_exc()) sys.exit(1) + def iter_batches(self, data, batch_size): + data = iter(data) + while True: + batch = list(islice(data, batch_size)) + if len(batch) == 0: + break + yield batch + + def process(self, onions): + for operator in self.operators: + self.logger.info(f"Processing found onions with operator '{operator}'") + # Set CrawlQueue for every operator + self.operators[operator].set_crawlQueue(self.queue) + # Process list of onions + self.operators[operator].process(onions) def run(self): """Run once, or forever, depending on config.""" @@ -86,7 +102,6 @@ class Ingestor: """Run each source once, passing artifacts to each operator.""" # Start collecting sources self.collect_sources() - # Sources will fill various queues # MonitorQueue has priority high # OnionQueue are those found in clearnet medium @@ -97,16 +112,12 @@ class Ingestor: while not done: try: ## Process onions with each operator. - for operator in self.operators: - self.logger.info(f"Processing found onions with operator '{operator}'") - # Set CrawlQueue for every operator - self.operators[operator].set_crawlQueue(self.queue) - # Process list of onions - self.operators[operator].process(onions) - done = True - ## Save Onions for each storage - for onion in onions: - self.storage.save_pastie(onion[1], 30) + for batched_onions in self.iter_batches(onions, batch_size=10): + self.process(batched_onions) + ## Save Onions for each storage + for onion in batched_onions: + self.storage.save_pastie(onion[1], 30) + done = True except Exception as e: self.logger.error(e) self.logger.error(traceback.print_exc()) diff --git a/onioningestor/config.py b/onioningestor/config.py index 400257f..d0e2af1 100644 --- a/onioningestor/config.py +++ b/onioningestor/config.py @@ -56,6 +56,9 @@ class Config: def blacklist(self): return self.config["general"]["blacklist"].split(",") + def torController(self): + return self.config["general"]["TorController"] + def monitorQueue(self): fp = self.config["monitor"].get("filename", False) q = PriorityQueue(maxsize=0) diff --git a/onioningestor/databases/__init__.py b/onioningestor/databases/__init__.py index 20f6098..3c1b3a9 100644 --- a/onioningestor/databases/__init__.py +++ b/onioningestor/databases/__init__.py @@ -116,24 +116,8 @@ class PastieStorage(): self.logger.debug('{0}: pastie[{1}] saved in {2}s'.format(self.name, pastie.url, delta)) except Exception as e: self.logger.error('{0}: unable to save pastie[{1}]: {2}'.format(self.name, pastie.url, e)) - raise - - def __seen_pastie__(self, pastie_id, **kwargs): - raise NotImplementedError - - def seen_pastie(self, pastie_id, **kwargs): - if not self.lookup: - return False - try: - start = time.time() - self.logger.debug('{0}: looking up pastie[{1}]'.format(self.name, pastie_id)) - res = self.__seen_pastie__(pastie_id, **kwargs) - delta = time.time() - start - self.logger.debug('{0}: pastie[{1}] looked-up in {2}s'.format(self.name, pastie_id, delta)) - return res - except Exception as e: - self.logger.error('{0}: unable to lookup pastie[{1}]: {2}'.format(self.name, pastie_id, e)) - raise + pass + #raise class Notifier(object): def __init__(self, logger, **kwargs): diff --git a/onioningestor/databases/elasticsearch.py b/onioningestor/databases/elasticsearch.py index 3675065..5a2dd00 100644 --- a/onioningestor/databases/elasticsearch.py +++ b/onioningestor/databases/elasticsearch.py @@ -61,7 +61,6 @@ class Plugin(PastieStorage): 'port':self.config['port']}]) self.es.indices.create( index=self.index, - #body=self.mapping, ignore=400) except Exception as e: self.logger.error(e) diff --git a/onioningestor/databases/telegram.py b/onioningestor/databases/telegram.py index ba21682..70796e8 100644 --- a/onioningestor/databases/telegram.py +++ b/onioningestor/databases/telegram.py @@ -14,10 +14,11 @@ class Plugin(PastieStorage): def __save_pastie__(self, pastie): message = ''' -HiddenSite: {site} +HiddenSite +{site} Source : {url} Monitor : {content} -Status : {status} +Status : {status} '''.format( site=pastie.url, url=pastie.source, diff --git a/onioningestor/onion.py b/onioningestor/onion.py index 6496210..8e814ee 100644 --- a/onioningestor/onion.py +++ b/onioningestor/onion.py @@ -10,14 +10,11 @@ class Onion(object): self.monitor = monitor self.denylist = denylist self.datetime = dt.now() + self.operators = {} - def simpleHTML(self, response): - self.simpleHTML = response - # if any match update denylist + def set_operator(self, response): + self.operators.update(response) - def onionscan(self, response): - self.onionscan = response - def asdict(self): d = { 'hiddenService':self.url, @@ -27,8 +24,7 @@ class Onion(object): 'monitor': self.monitor, 'denylist': self.denylist, 'dateFound': self.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f")+"Z", - 'simpleHTML': self.simpleHTML, - 'onionscan':self.onionscan + 'operators': self.operators, } return d diff --git a/onioningestor/operators/__init__.py b/onioningestor/operators/__init__.py index d154e67..c55cd0f 100644 --- a/onioningestor/operators/__init__.py +++ b/onioningestor/operators/__init__.py @@ -1,11 +1,15 @@ import re import sys import json +import time +import requests from queue import Queue -from itertools import islice from datetime import datetime as dt from concurrent.futures import ThreadPoolExecutor +from stem.control import Controller +from stem import Signal + from onioningestor.onion import Onion class Operator: @@ -22,7 +26,7 @@ class Operator: override other existing methods from this class. """ - def __init__(self, logger, allowed_sources=None): + def __init__(self, logger, config, allowed_sources=None): """Override this constructor in child classes. The arguments above (artifact_types, filter_string, allowed_sources) @@ -47,12 +51,26 @@ class Operator: """ self.logger = logger self.onion = Onion + self.torControl = config deny = allowed_sources or [] self.blacklist = re.compile('|'.join([re.escape(word) for word in deny]), re.IGNORECASE) + # signal TOR for a new connection + def renew_connection(self): + with Controller.from_port(port = int(self.torControl['port'])) as controller: + # Now we switch TOR identities to make sure we have a good connection + self.logger.info('Getting new Tor IP') + # authenticate to our local TOR controller + controller.authenticate(self.torControl['password']) + # send the signal for a new identity + controller.signal(Signal.NEWNYM) + # wait for the new identity to be initialized + time.sleep(controller.get_newnym_wait()) + self.logger.info(f"IP is {requests.get('http://httpbin.org/ip').json()['origin']}") + def set_crawlQueue(self, queue): self.queueCrawl = queue - + def handle_onion(self, url): """Override with the same signature. @@ -61,7 +79,7 @@ class Operator: """ raise NotImplementedError() - def response(self, status, content): + def response(self, operator, status, content): """ status: success/failure content: dict @@ -69,7 +87,7 @@ class Operator: return: dict """ try: - return {'status':status, 'content': content} + return {operator:{'status':status, 'content': content}} except Exception as e: self.logger.error(e) @@ -80,29 +98,33 @@ class Operator: onion.denylist = blacklist onion.status = 'blocked' + def findCrawls(self, content, hiddenService): + crawl = set() + for onion in re.findall(r'\s?(\w+.onion)', str(content)): + if onion != hiddenService: + crawl.add(onion) + for item in crawl: + print(f'crawling queue added: {item}') + self.queueCrawl.put(( + 3, + self.onion( + url=item, + source='crawled', + type='domain', + status='offline', + monitor=False, + denylist=False))) + def collect(self, onions): for onion in onions: - self.logger.info(f'thread function processing {onion}') - if onion.monitor: - self.handle_onion(onion) - else: - if self._onion_is_allowed(onion.url): - self.handle_onion(onion) - - def iter_batches(self, data, batch_size): - data = iter(data) - while True: - batch = list(islice(data, batch_size)) - if len(batch) == 0: - break - yield batch + self.logger.info(f'thread function processing {onion[1]}') + self.handle_onion(onion[1]) + def process(self, onions): """Process all applicable onions.""" for onion in onions: self.handle_onion(onion[1]) - #self.save_pastie() - #with ThreadPoolExecutor(max_workers=1) as executor: # collect_tasks = [executor.submit(self.collect, files_batch) for files_batch in self.iter_batches(onions, batch_size=10)] # for tasks in collect_tasks: diff --git a/onioningestor/operators/html.py b/onioningestor/operators/html.py index fd84b47..768ef8d 100644 --- a/onioningestor/operators/html.py +++ b/onioningestor/operators/html.py @@ -23,8 +23,8 @@ class Plugin(Operator): This plugin collects HTML code from onion link """ - def __init__(self, logger, denylist, **kwargs): - super(Plugin, self).__init__(logger, denylist) + def __init__(self, logger, denylist, config, **kwargs): + super(Plugin, self).__init__(logger, denylist, config) self.name = kwargs['name'] self.logger.info(f"Initializing {self.name}") @@ -37,7 +37,6 @@ class Plugin(Operator): ) self.proxy = kwargs["socks5"] - self.torControl = kwargs["TorController"] self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", @@ -57,33 +56,21 @@ class Plugin(Operator): self.logger.debug(traceback.print_exc()) return s - def renew_connection(self): - with Controller.from_port(port=self.torControl["port"]) as controller: - # Now we switch TOR identities to make sure we have a good connection - self.logger.info("Getting new Tor IP") - # authenticate to our local TOR controller - controller.authenticate(self.torControl["password"]) - # send the signal for a new identity - controller.signal(Signal.NEWNYM) - # wait for the new identity to be initialized - time.sleep(controller.get_newnym_wait()) - session = self.get_tor_session() - self.logger.info( - f"IP is {session.get('http://httpbin.org/ip').json()['origin']}" - ) def run_sessions(self, onion): retry = 0 result = None while True: try: - url = "http://" + onion + url = "http://" + onion.url self.logger.info(url) content = self.get_tor_session().get(url) if content.status_code == 200: result = content.text if result: html = BeautifulSoup(result, features="lxml") + ## Find other onion links + self.findCrawls(html, onion.url) if html: index = { "HTML": result, @@ -100,7 +87,8 @@ class Plugin(Operator): "status": "success", "interestingKeywords": list(set(self.interesting.findall(result))), } - return self.response("success", index) + onion.status = 'online' + return self.response(self.name,"success", index) except requests.exceptions.ConnectionError as connection_error: self.logger.error(f"Failed connecting to http://{url}") @@ -114,10 +102,11 @@ class Plugin(Operator): self.renew_connection() if retry > self.retries: self.logger.error("[x] Max retries exceeded") - return self.response("failed", None) + return self.response(self.name,"failed", None) def handle_onion(self, onion): - html = self.run_sessions(onion.url) - if html['status'] == 'success': - self._onion_is_allowed(html['content']['HTML'], onion) - onion.simpleHTML(html) + html = self.run_sessions(onion) + response = html[self.name] + if response['status'] == 'success': + self._onion_is_allowed(response['content']['HTML'], onion) + onion.set_operator(html) diff --git a/onioningestor/operators/onionscan.py b/onioningestor/operators/onionscan.py index 342c874..ff71a45 100644 --- a/onioningestor/operators/onionscan.py +++ b/onioningestor/operators/onionscan.py @@ -1,5 +1,4 @@ import json -import time import traceback import subprocess from threading import Timer @@ -8,6 +7,10 @@ from concurrent.futures import ProcessPoolExecutor import requests +from stem.control import Controller +from stem import Signal + + from onioningestor.operators import Operator @@ -17,50 +20,26 @@ class Plugin(Operator): Handles reading the config file, calling sources, maintaining state and sending artifacts to operators. """ - def __init__(self, logger, denylist, **kwargs): - super(Plugin, self).__init__(logger, denylist) + def __init__(self, logger, denylist, config, **kwargs): + super(Plugin, self).__init__(logger, denylist, config) self.name = kwargs['name'] self.logger = logger self.logger.info(f'Initializing {self.name}') self.onionscan = kwargs['binpath'] self.timeout = int(kwargs.get('timeout', 300)) - self.torControl = 9051 - self.torControl = "Zue5a29v4xE6FciWpPF93rR2M2T" def parseDoc(self, data): data.pop('simpleReport', None) crawls = data.pop('crawls', None) hiddenService = data.pop('hiddenService', None) data['crawls'] = [*crawls] - crawl = set() - for onion in re.findall(r'\s?(\w+.onion)', str(crawls.keys())): - if onion != hiddenService: - crawl.add(onion) - for items in crawl: - print(f'crawling queue added: {item}') - self.queueCrawl.put(( - 3, - self.onion( - url=item, - source='crawled', - type='domain', - status='offline', - monitor=False, - denylist=False))) + try: + if data['linkedOnions']: + self.findCrawls(data['linkedOnions'], hiddenService) + except KeyError as e: + pass return data - # signal TOR for a new connection - def renew_connection(self): - with Controller.from_port(port = self.torControl['port']) as controller: - # Now we switch TOR identities to make sure we have a good connection - self.logger.info('Getting new Tor IP') - # authenticate to our local TOR controller - controller.authenticate(self.torControl['password']) - # send the signal for a new identity - controller.signal(Signal.NEWNYM) - # wait for the new identity to be initialized - time.sleep(controller.get_newnym_wait()) - self.logger.info(f"IP is {requests.get('http://httpbin.org/ip').json()['origin']}") def handle_timeout(self, process, onion): # @@ -74,7 +53,6 @@ class Plugin(Operator): except: pass self.renew_connection() - return def run_onionscan(self, onion): self.logger.info("[*] Running onionscan on %s", onion) @@ -94,21 +72,23 @@ class Plugin(Operator): process_timer.cancel() try: return self.response( + self.name, "success", self.parseDoc(json.loads(stdout))) except json.decoder.JSONDecodeError: return self.response( + self.name, "success", self.parseDoc(stdout)) self.logger.info("[!!!] Process timed out for %s", onion) - print(stdout) - return self.response("failed",stdout) + return self.response(self.name,"failed", None) def handle_onion(self, onion): try: - results = self.run_onionscan(onion.url) - onion.onionscan(results) + if onion.status != 'inactive': + results = self.run_onionscan(onion.url) + onion.set_operator(results) except Exception as e: self.logger.error(e) self.logger.error(traceback.print_exc())