You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
148 lines
5.3 KiB
Python
148 lines
5.3 KiB
Python
import sys
|
|
import time
|
|
import queue
|
|
import traceback
|
|
import threading
|
|
import collections
|
|
from queue import Queue, Empty
|
|
from itertools import islice
|
|
|
|
from . import config
|
|
from . import loghandler
|
|
|
|
from onioningestor.databases import StorageDispatcher, StorageThread, StorageSync
|
|
|
|
class Ingestor:
|
|
"""ThreatIngestor main work logic.
|
|
|
|
Handles reading the config file, calling sources, maintaining state, and
|
|
sending artifacts to operators.
|
|
"""
|
|
def __init__(self, args):
|
|
# Load logger
|
|
log = loghandler.LoggerHandler(args.logLevel)
|
|
self.logger = log.start_logging()
|
|
|
|
# Load config
|
|
self.config = config.Config(args.configFile, self.logger)
|
|
self.blacklist = self.config.blacklist()
|
|
|
|
# Create Queues
|
|
self.queue = self.config.monitorQueue()
|
|
|
|
# Get asynchronously o synchronously save
|
|
self.save_thread = self.config.save_thread()
|
|
|
|
# Track some statistics about artifacts in a summary object.
|
|
self.summary = collections.Counter()
|
|
|
|
# Threads
|
|
self.threads = []
|
|
try:
|
|
# Load Storage Engines - ElasticSearch, Telegram, Twitter etc
|
|
self.storage = StorageDispatcher(self.logger)
|
|
|
|
for name, db, kwargs in self.config.database_engines():
|
|
# start the threads handling database storage if needed
|
|
if self.save_thread:
|
|
self.logger.debug(f"Starting daemon thread for {str(db)}")
|
|
t = StorageThread(db(self.logger, **kwargs))
|
|
self.threads.append(t)
|
|
t.setDaemon(True)
|
|
t.start()
|
|
# save onions synchronously
|
|
else:
|
|
s = StorageSync(db(self.logger, **kwargs))
|
|
self.storage.add_storage(s)
|
|
|
|
if self.save_thread:
|
|
self.logger.info("Onions will be saved asynchronously")
|
|
else:
|
|
self.logger.info("Onions will be saved synchronously")
|
|
|
|
# Instantiate operator plugins.
|
|
self.logger.debug("initializing operators")
|
|
self.operators = {name: operator(self.logger, self.config.torController(), self.blacklist, **kwargs)
|
|
for name, operator, kwargs in self.config.operators()}
|
|
|
|
except Exception as e:
|
|
# Error loading starting plugins.
|
|
self.logger.error(e)
|
|
self.logger.debug(traceback.print_exc())
|
|
sys.exit(1)
|
|
|
|
def collect_sources(self):
|
|
self.logger.debug("Initializing sources")
|
|
for name, collect, kwargs in self.config.sources():
|
|
# Run the source to collect onion links from clear net.
|
|
self.logger.info(f"Running source '{name}'")
|
|
try:
|
|
# get the generator of onions
|
|
source = collect(self.logger, **kwargs)
|
|
source.set_onionQueue(self.queue) #priority 2
|
|
t = source.run()
|
|
self.threads.append(t)
|
|
#self.logger.info(f'Starting of thread: {t.currentThread().name}')
|
|
#t.start()
|
|
except Exception as e:
|
|
self.logger.error(e)
|
|
self.logger.error(traceback.print_exc())
|
|
continue
|
|
|
|
def process(self, onion):
|
|
for operator in self.operators:
|
|
self.logger.info(f"Processing found onions with operator '{operator}'")
|
|
# Set CrawlQueue for every operator
|
|
self.operators[operator].set_crawlQueue(self.queue)
|
|
# Process list of onions
|
|
self.operators[operator].process(onion)
|
|
|
|
def run(self):
|
|
"""Run once, or forever, depending on config."""
|
|
if self.config.daemon():
|
|
self.logger.info("Running forever, in a loop")
|
|
self.run_forever()
|
|
else:
|
|
self.logger.info("Running once, to completion")
|
|
self.run_once()
|
|
|
|
|
|
def run_once(self):
|
|
"""Run each source once, passing artifacts to each operator."""
|
|
# Start collecting sources
|
|
self.collect_sources()
|
|
# Sources will fill various queues
|
|
# MonitorQueue has priority high
|
|
# OnionQueue are those found in clearnet medium
|
|
# crawlQueue are those found crawling onionlinks low
|
|
done = False
|
|
while not done:
|
|
try:
|
|
onion = self.queue.get(True, 5)
|
|
## Process onions with each operator.
|
|
self.process(onion)
|
|
## Save Onions for each storage
|
|
self.storage.save_pastie(onion[1], 30)
|
|
except Empty:
|
|
self.logger.info('Queue is empty')
|
|
done = True
|
|
except Exception as e:
|
|
self.logger.error(e)
|
|
self.logger.error(traceback.print_exc())
|
|
break
|
|
except KeyboardInterrupt:
|
|
print('')
|
|
self.logger.info("Ctrl-c received! Sending kill to threads...")
|
|
for t in self.threads:
|
|
t.kill_received = True
|
|
self.logger.info('Exiting')
|
|
sys.exit(0)
|
|
|
|
def run_forever(self):
|
|
"""Run forever, sleeping for the configured interval between each run."""
|
|
while True:
|
|
self.run_once()
|
|
|
|
self.logger.debug(f"Sleeping for {self.config.sleep()} seconds")
|
|
time.sleep(self.config.sleep())
|