diff --git a/app/__init__.py b/app/__init__.py index 7a4db3f..bff20fc 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,6 +1,6 @@ from app.request import send_tor_signal -from app.utils.session_utils import generate_user_keys -from app.utils.gen_ddg_bangs import gen_bangs_json +from app.utils.session import generate_user_keys +from app.utils.bangs import gen_bangs_json from flask import Flask from flask_session import Session import json diff --git a/app/filter.py b/app/filter.py index 4c953d5..66e9c6e 100644 --- a/app/filter.py +++ b/app/filter.py @@ -1,5 +1,5 @@ from app.request import VALID_PARAMS -from app.utils.filter_utils import * +from app.utils.results import * from bs4.element import ResultSet from cryptography.fernet import Fernet import re @@ -208,7 +208,7 @@ class Filter: # Add no-js option if self.nojs: - gen_nojs(link) + append_nojs(link) else: link['href'] = href diff --git a/app/request.py b/app/request.py index 00e2ce0..6e752fb 100644 --- a/app/request.py +++ b/app/request.py @@ -23,8 +23,8 @@ class TorError(Exception): """Exception raised for errors in Tor requests. Attributes: - message -- a message describing the error that occurred - disable -- optionally disables Tor in the user config (note: + message: a message describing the error that occurred + disable: optionally disables Tor in the user config (note: this should only happen if the connection has been dropped altogether). """ @@ -133,9 +133,9 @@ class Request: search suggestions, and loading of external content (images, audio, etc). Attributes: - normal_ua -- the user's current user agent - root_path -- the root path of the whoogle instance - config -- the user's current whoogle configuration + normal_ua: the user's current user agent + root_path: the root path of the whoogle instance + config: the user's current whoogle configuration """ def __init__(self, normal_ua, root_path, config: Config): diff --git a/app/routes.py b/app/routes.py index acb38a6..b084f71 100644 --- a/app/routes.py +++ b/app/routes.py @@ -16,8 +16,9 @@ from requests import exceptions from app import app from app.models.config import Config from app.request import Request, TorError -from app.utils.session_utils import valid_user_session -from app.utils.routing_utils import * +from app.utils.bangs import resolve_bang +from app.utils.session import valid_user_session +from app.utils.search import * # Load DDG bang json files only on init bang_json = json.load(open(app.config['BANG_FILE'])) @@ -199,13 +200,13 @@ def search(): # Update user config if specified in search args g.user_config = g.user_config.from_params(g.request_params) - search_util = RoutingUtils(request, g.user_config, session, - cookies_disabled=g.cookies_disabled) + search_util = Search(request, g.user_config, session, + cookies_disabled=g.cookies_disabled) query = search_util.new_search_query() - resolved_bangs = search_util.bang_operator(bang_json) - if resolved_bangs != '': - return redirect(resolved_bangs) + bang = resolve_bang(query=query, bangs_dict=bang_json) + if bang != '': + return redirect(bang) # Redirect to home if invalid/blank search if not query: diff --git a/app/utils/bangs.py b/app/utils/bangs.py new file mode 100644 index 0000000..56daf4f --- /dev/null +++ b/app/utils/bangs.py @@ -0,0 +1,61 @@ +import json +import requests + +DDG_BANGS = 'https://duckduckgo.com/bang.v255.js' + + +def gen_bangs_json(bangs_file: str) -> None: + """Generates a json file from the DDG bangs list + + Args: + bangs_file: The str path to the new DDG bangs json file + + Returns: + None + + """ + try: + # Request full list from DDG + r = requests.get(DDG_BANGS) + r.raise_for_status() + except requests.exceptions.HTTPError as err: + raise SystemExit(err) + + # Convert to json + data = json.loads(r.text) + + # Set up a json object (with better formatting) for all available bangs + bangs_data = {} + + for row in data: + bang_command = '!' + row['t'] + bangs_data[bang_command] = { + 'url': row['u'].replace('{{{s}}}', '{}'), + 'suggestion': bang_command + ' (' + row['s'] + ')' + } + + json.dump(bangs_data, open(bangs_file, 'w')) + + +def resolve_bang(query: str, bangs_dict: dict) -> str: + """Transform's a user's query to a bang search, if an operator is found + + Args: + query: The search query + bangs_dict: The dict of available bang operators, with corresponding + format string search URLs + (i.e. "!w": "https://en.wikipedia.org...?search={}") + + Returns: + str: A formatted redirect for a bang search, or an empty str if there + wasn't a match or didn't contain a bang operator + + """ + split_query = query.split(' ') + for operator in bangs_dict.keys(): + if operator not in split_query: + continue + + return bangs_dict[operator]['url'].format( + query.replace(operator, '').strip()) + return '' diff --git a/app/utils/gen_ddg_bangs.py b/app/utils/gen_ddg_bangs.py deleted file mode 100644 index 0ed3953..0000000 --- a/app/utils/gen_ddg_bangs.py +++ /dev/null @@ -1,26 +0,0 @@ -import json -import requests - - -def gen_bangs_json(bangs_file): - # Request list - try: - r = requests.get('https://duckduckgo.com/bang.v255.js') - r.raise_for_status() - except requests.exceptions.HTTPError as err: - raise SystemExit(err) - - # Convert to json - data = json.loads(r.text) - - # Set up a json object (with better formatting) for all available bangs - bangs_data = {} - - for row in data: - bang_command = '!' + row['t'] - bangs_data[bang_command] = { - 'url': row['u'].replace('{{{s}}}', '{}'), - 'suggestion': bang_command + ' (' + row['s'] + ')' - } - - json.dump(bangs_data, open(bangs_file, 'w')) diff --git a/app/utils/filter_utils.py b/app/utils/results.py similarity index 59% rename from app/utils/filter_utils.py rename to app/utils/results.py index 76b99ba..a1d20dc 100644 --- a/app/utils/filter_utils.py +++ b/app/utils/results.py @@ -28,12 +28,30 @@ SITE_ALTS = { } -def has_ad_content(element: str): +def has_ad_content(element: str) -> bool: + """Inspects an HTML element for ad related content + + Args: + element: The HTML element to inspect + + Returns: + bool: True/False for the element containing an ad + + """ return element.upper() in (value.upper() for value in BLACKLIST) \ or 'ⓘ' in element -def get_first_link(soup): +def get_first_link(soup: BeautifulSoup) -> str: + """Retrieves the first result link from the query response + + Args: + soup: The BeautifulSoup response body + + Returns: + str: A str link to the first result + + """ # Replace hrefs with only the intended destination (no "utm" type tags) for a in soup.find_all('a', href=True): # Return the first search result URL @@ -41,7 +59,16 @@ def get_first_link(soup): return filter_link_args(a['href']) -def get_site_alt(link: str): +def get_site_alt(link: str) -> str: + """Returns an alternative to a particular site, if one is configured + + Args: + link: A string result URL to check against the SITE_ALTS map + + Returns: + str: An updated (or ignored) result link + + """ for site_key in SITE_ALTS.keys(): if site_key not in link: continue @@ -55,13 +82,22 @@ def get_site_alt(link: str): return link -def filter_link_args(query_link): - parsed_link = urlparse.urlparse(query_link) +def filter_link_args(link: str) -> str: + """Filters out unnecessary URL args from a result link + + Args: + link: The string result link to check for extraneous URL params + + Returns: + str: An updated (or ignored) result link + + """ + parsed_link = urlparse.urlparse(link) link_args = parse_qs(parsed_link.query) safe_args = {} if len(link_args) == 0 and len(parsed_link) > 0: - return query_link + return link for arg in link_args.keys(): if arg in SKIP_ARGS: @@ -70,19 +106,28 @@ def filter_link_args(query_link): safe_args[arg] = link_args[arg] # Remove original link query and replace with filtered args - query_link = query_link.replace(parsed_link.query, '') + link = link.replace(parsed_link.query, '') if len(safe_args) > 0: - query_link = query_link + urlparse.urlencode(safe_args, doseq=True) + link = link + urlparse.urlencode(safe_args, doseq=True) else: - query_link = query_link.replace('?', '') + link = link.replace('?', '') + + return link + + +def append_nojs(result: BeautifulSoup) -> None: + """Appends a no-Javascript alternative for a search result - return query_link + Args: + result: The search result to append a no-JS link to + Returns: + None -def gen_nojs(sibling): + """ nojs_link = BeautifulSoup(features='html.parser').new_tag('a') - nojs_link['href'] = '/window?location=' + sibling['href'] + nojs_link['href'] = '/window?location=' + result['href'] nojs_link['style'] = 'display:block;width:100%;' nojs_link.string = 'NoJS Link: ' + nojs_link['href'] - sibling.append(BeautifulSoup('


', 'html.parser')) - sibling.append(nojs_link) + result.append(BeautifulSoup('


', 'html.parser')) + result.append(nojs_link) diff --git a/app/utils/routing_utils.py b/app/utils/search.py similarity index 69% rename from app/utils/routing_utils.py rename to app/utils/search.py index 4cbbb16..84a457e 100644 --- a/app/utils/routing_utils.py +++ b/app/utils/search.py @@ -1,5 +1,5 @@ from app.filter import Filter, get_first_link -from app.utils.session_utils import generate_user_keys +from app.utils.session import generate_user_keys from app.request import gen_query from bs4 import BeautifulSoup as bsoup from cryptography.fernet import Fernet, InvalidToken @@ -11,6 +11,18 @@ TOR_BANNER = '

You are using Tor


' def needs_https(url: str) -> bool: + """Checks if the current instance needs to be upgraded to HTTPS + + Note that all Heroku instances are available by default over HTTPS, but + do not automatically set up a redirect when visited over HTTP. + + Args: + url: The instance url + + Returns: + bool: True/False representing the need to upgrade + + """ https_only = os.getenv('HTTPS_ONLY', False) is_heroku = url.endswith('.herokuapp.com') is_http = url.startswith('http://') @@ -18,7 +30,15 @@ def needs_https(url: str) -> bool: return (is_heroku and is_http) or (https_only and is_http) -class RoutingUtils: +class Search: + """Search query preprocessor - used before submitting the query or + redirecting to another site + + Attributes: + request: the incoming flask request + config: the current user config settings + session: the flask user session + """ def __init__(self, request, config, session, cookies_disabled=False): method = request.method self.request_params = request.args if method == 'GET' else request.form @@ -31,19 +51,28 @@ class RoutingUtils: self.search_type = self.request_params.get( 'tbm') if 'tbm' in self.request_params else '' - def __getitem__(self, name): + def __getitem__(self, name) -> Any: return getattr(self, name) - def __setitem__(self, name, value): + def __setitem__(self, name, value) -> None: return setattr(self, name, value) - def __delitem__(self, name): + def __delitem__(self, name) -> None: return delattr(self, name) - def __contains__(self, name): + def __contains__(self, name) -> bool: return hasattr(self, name) def new_search_query(self) -> str: + """Parses a plaintext query into a valid string for submission + + Also decrypts the query string, if encrypted (in the case of + paginated results). + + Returns: + str: A valid query string + + """ # Generate a new element key each time a new search is performed self.session['fernet_keys']['element_key'] = generate_user_keys( cookies_disabled=self.cookies_disabled)['element_key'] @@ -70,17 +99,18 @@ class RoutingUtils: self.query = q[2:] if self.feeling_lucky else q return self.query - def bang_operator(self, bangs_dict: dict) -> str: - split_query = self.query.split(' ') - for operator in bangs_dict.keys(): - if operator not in split_query: - continue + def generate_response(self) -> Tuple[Any, int]: + """Generates a response for the user's query - return bangs_dict[operator]['url'].format( - self.query.replace(operator, '').strip()) - return '' + Returns: + Tuple[Any, int]: A tuple in the format (response, # of elements) + For example, in the case of a "feeling lucky" + search, the response is a result URL, with no + encrypted elements to account for. Otherwise, the + response is a BeautifulSoup response body, with + N encrypted elements to track before key regen. - def generate_response(self) -> Tuple[Any, int]: + """ mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent content_filter = Filter( @@ -102,7 +132,7 @@ class RoutingUtils: if g.user_request.tor_valid else bsoup('', 'html.parser')) if self.feeling_lucky: - return get_first_link(html_soup), 1 + return get_first_link(html_soup), 0 else: formatted_results = content_filter.clean(html_soup) diff --git a/app/utils/session.py b/app/utils/session.py new file mode 100644 index 0000000..f34d725 --- /dev/null +++ b/app/utils/session.py @@ -0,0 +1,45 @@ +from cryptography.fernet import Fernet +from flask import current_app as app + +REQUIRED_SESSION_VALUES = ['uuid', 'config', 'fernet_keys'] + + +def generate_user_keys(cookies_disabled=False) -> dict: + """Generates a set of user keys + + Args: + cookies_disabled: Flag for whether or not cookies are disabled by the + user. If so, the user can only use the default key + set generated on app init for queries. + + Returns: + dict: A new Fernet key set + + """ + if cookies_disabled: + return app.default_key_set + + # Generate/regenerate unique key per user + return { + 'element_key': Fernet.generate_key(), + 'text_key': Fernet.generate_key() + } + + +def valid_user_session(session: dict) -> bool: + """Validates the current user session + + Args: + session: The current Flask user session + + Returns: + bool: True/False indicating that all required session values are + available + + """ + # Generate secret key for user if unavailable + for value in REQUIRED_SESSION_VALUES: + if value not in session: + return False + + return True diff --git a/app/utils/session_utils.py b/app/utils/session_utils.py deleted file mode 100644 index f959abe..0000000 --- a/app/utils/session_utils.py +++ /dev/null @@ -1,24 +0,0 @@ -from cryptography.fernet import Fernet -from flask import current_app as app - -REQUIRED_SESSION_VALUES = ['uuid', 'config', 'fernet_keys'] - - -def generate_user_keys(cookies_disabled=False) -> dict: - if cookies_disabled: - return app.default_key_set - - # Generate/regenerate unique key per user - return { - 'element_key': Fernet.generate_key(), - 'text_key': Fernet.generate_key() - } - - -def valid_user_session(session): - # Generate secret key for user if unavailable - for value in REQUIRED_SESSION_VALUES: - if value not in session: - return False - - return True diff --git a/test/conftest.py b/test/conftest.py index 4b19636..f0912de 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,5 +1,5 @@ from app import app -from app.utils.session_utils import generate_user_keys +from app.utils.session import generate_user_keys import pytest import random diff --git a/test/test_misc.py b/test/test_misc.py index 92fcadb..e399b4a 100644 --- a/test/test_misc.py +++ b/test/test_misc.py @@ -1,4 +1,4 @@ -from app.utils.session_utils import generate_user_keys, valid_user_session +from app.utils.session import generate_user_keys, valid_user_session def test_generate_user_keys(): diff --git a/test/test_results.py b/test/test_results.py index 74af29c..c0f7fd1 100644 --- a/test/test_results.py +++ b/test/test_results.py @@ -1,6 +1,6 @@ from bs4 import BeautifulSoup from app.filter import Filter -from app.utils.session_utils import generate_user_keys +from app.utils.session import generate_user_keys from datetime import datetime from dateutil.parser import *