You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
173 lines
6.0 KiB
Python
173 lines
6.0 KiB
Python
import io
|
|
import importlib
|
|
import traceback
|
|
|
|
import yaml
|
|
|
|
from pathlib import Path
|
|
|
|
SOURCE = 'onioningestor.sources'
|
|
OPERATOR = 'onioningestor.operators'
|
|
|
|
INTERNAL_OPTIONS = [
|
|
'saved_state',
|
|
'module',
|
|
'credentials',
|
|
]
|
|
|
|
ARTIFACT_TYPES = 'artifact_types'
|
|
FILTER_STRING = 'filter'
|
|
ALLOWED_SOURCES = 'allowed_sources'
|
|
NAME = 'name'
|
|
|
|
|
|
class Config:
|
|
"""Config read/write operations, and convenience methods."""
|
|
def __init__(self, filename, logger):
|
|
"""Read a config file."""
|
|
self.logger = logger
|
|
self.filename = filename
|
|
with io.open(self.filename, 'r') as f:
|
|
try:
|
|
self.logger.info("Loading config file")
|
|
self.config = yaml.safe_load(f.read())
|
|
except yaml.error.YAMLError:
|
|
self.logger.error("YAML error in config")
|
|
|
|
|
|
@staticmethod
|
|
def _load_plugin(plugin_type, plugin):
|
|
"""Returns plugin class or raises an exception.
|
|
:raises: threatingestor.exceptions.PluginError
|
|
"""
|
|
try:
|
|
module = importlib.import_module('.'.join([plugin_type, plugin]))
|
|
return module.Plugin
|
|
except Exception as e:
|
|
print(e)
|
|
print(traceback.print_exc())
|
|
|
|
def daemon(self):
|
|
"""Returns boolean, are we daemonizing?"""
|
|
return self.config['general']['daemon']
|
|
|
|
|
|
def elasticsearch(self):
|
|
"""Returns elasticsaerch config"""
|
|
return self.config['general']['elasticsearch']
|
|
|
|
|
|
def sleep(self):
|
|
"""Returns number of seconds to sleep between iterations, if daemonizing."""
|
|
return self.config['general']['sleep']
|
|
|
|
def blacklist(self):
|
|
return self.config['general']['blacklist'].split(',')
|
|
|
|
# def onionscanner(self):
|
|
# """Returns onionscanner config dict"""
|
|
# screenshots = self.config['onionscanner'].pop('screenshots_path', None)
|
|
# if screenshots:
|
|
# self.config['onionscanner']['screenshots_path'] = Path(screenshots)
|
|
# else:
|
|
# self.config['onionscanner']['screenshots_path'] = Path(__file__).parents[1]/'screenshots'
|
|
# blacklist = self.config['onionscanner'].pop('blacklist', None)
|
|
# if blacklist:
|
|
# self.config['onionscanner']['blacklist'] = blacklist.split(',')
|
|
# interestingKeywords = self.config['onionscanner'].pop('interestingKeywords', None)
|
|
# if interestingKeywords:
|
|
# self.config['onionscanner']['interestingKeywords'] = blacklist.split(',')
|
|
# return self.config['onionscanner']
|
|
|
|
|
|
def notifiers(self):
|
|
"""Returns notifiers config dictionary."""
|
|
return self.config.get('notifiers', {})
|
|
|
|
|
|
def logging(self):
|
|
"""Returns logging config dictionary."""
|
|
return self.config.get('logging', {})
|
|
|
|
|
|
def credentials(self, credential_name):
|
|
"""Return a dictionary with the specified credentials."""
|
|
for credential in self.config['credentials']:
|
|
for key, value in credential.items():
|
|
if key == NAME and value == credential_name:
|
|
return credential
|
|
return {}
|
|
|
|
|
|
def sources(self):
|
|
"""Return a list of (name, Source class, {kwargs}) tuples.
|
|
:raises: threatingestor.exceptions.PluginError
|
|
"""
|
|
sources = []
|
|
|
|
for source in self.config['sources']:
|
|
kwargs = {}
|
|
for key, value in source.items():
|
|
if key not in INTERNAL_OPTIONS:
|
|
kwargs[key] = value
|
|
|
|
elif key == 'credentials':
|
|
# Grab these named credentials
|
|
credential_name = value
|
|
for credential_key, credential_value in self.credentials(credential_name).items():
|
|
if credential_key != NAME:
|
|
kwargs[credential_key] = credential_value
|
|
|
|
# load and initialize the plugin
|
|
self.logger.info(f"Found source '{source[NAME]}'")
|
|
sources.append((source[NAME], self._load_plugin(SOURCE, source['module']), kwargs))
|
|
|
|
self.logger.info(f"Found {len(sources)} total sources")
|
|
return sources
|
|
|
|
|
|
def operators(self):
|
|
"""Return a list of (name, Operator class, {kwargs}) tuples.
|
|
:raises: threatingestor.exceptions.PluginError
|
|
"""
|
|
operators = []
|
|
for operator in self.config['operators']:
|
|
kwargs = {}
|
|
for key, value in operator.items():
|
|
if key not in INTERNAL_OPTIONS:
|
|
if key == ARTIFACT_TYPES:
|
|
# parse out special artifact_types option
|
|
artifact_types = []
|
|
for artifact in value:
|
|
try:
|
|
artifact_types.append(threatingestor.artifacts.STRING_MAP[artifact.lower().strip()])
|
|
except KeyError:
|
|
# ignore invalid artifact types
|
|
pass
|
|
kwargs[key] = artifact_types
|
|
|
|
elif key == FILTER_STRING:
|
|
# pass in special filter_string option
|
|
kwargs['filter_string'] = value
|
|
|
|
elif key == NAME:
|
|
# exclude name key from operator kwargs, since it's not used
|
|
pass
|
|
|
|
else:
|
|
kwargs[key] = value
|
|
|
|
elif key == 'credentials':
|
|
# Grab these named credentials
|
|
credential_name = value
|
|
for credential_key, credential_value in self.credentials(credential_name).items():
|
|
if credential_key != NAME:
|
|
kwargs[credential_key] = credential_value
|
|
|
|
# load and initialize the plugin
|
|
self.logger.info(f"Found operator '{operator[NAME]}'")
|
|
operators.append((operator[NAME], self._load_plugin(OPERATOR, operator['module']), kwargs))
|
|
|
|
self.logger.info(f"Found {len(operators)} total operators")
|
|
return operators
|