Improve static typing throughout repo

Eventually this should be part of a separate mypy ci build, but right
now it's just a general guideline. Future commits and PRs should be
validated for static typing wherever possible.

For reference, the testing commands used for this commit were:

mypy --ignore-missing-imports --pretty --disallow-untyped-calls app/
mypy --ignore-missing-imports --pretty --disallow-untyped-calls test/
pull/264/head
Ben Busby 3 years ago committed by Ben Busby
parent 892b646a4e
commit 8ad8e66d37

@ -1,6 +1,7 @@
from app.request import VALID_PARAMS from app.request import VALID_PARAMS
from app.utils.results import * from app.utils.results import *
from bs4.element import ResultSet from bs4 import BeautifulSoup
from bs4.element import ResultSet, Tag
from cryptography.fernet import Fernet from cryptography.fernet import Fernet
import re import re
import urllib.parse as urlparse import urllib.parse as urlparse
@ -8,7 +9,7 @@ from urllib.parse import parse_qs
class Filter: class Filter:
def __init__(self, user_keys: dict, mobile=False, config=None): def __init__(self, user_keys: dict, mobile=False, config=None) -> None:
if config is None: if config is None:
config = {} config = {}
@ -29,7 +30,7 @@ class Filter:
def elements(self): def elements(self):
return self._elements return self._elements
def reskin(self, page): def reskin(self, page: str) -> str:
# Aesthetic only re-skinning # Aesthetic only re-skinning
if self.dark: if self.dark:
page = page.replace( page = page.replace(
@ -39,22 +40,22 @@ class Filter:
return page return page
def encrypt_path(self, msg, is_element=False): def encrypt_path(self, path, is_element=False) -> str:
# Encrypts path to avoid plaintext results in logs # Encrypts path to avoid plaintext results in logs
if is_element: if is_element:
# Element paths are encrypted separately from text, to allow key # Element paths are encrypted separately from text, to allow key
# regeneration once all items have been served to the user # regeneration once all items have been served to the user
enc_path = Fernet( enc_path = Fernet(
self.user_keys['element_key'] self.user_keys['element_key']
).encrypt(msg.encode()).decode() ).encrypt(path.encode()).decode()
self._elements += 1 self._elements += 1
return enc_path return enc_path
return Fernet( return Fernet(
self.user_keys['text_key'] self.user_keys['text_key']
).encrypt(msg.encode()).decode() ).encrypt(path.encode()).decode()
def clean(self, soup): def clean(self, soup) -> BeautifulSoup:
self.main_divs = soup.find('div', {'id': 'main'}) self.main_divs = soup.find('div', {'id': 'main'})
self.remove_ads() self.remove_ads()
self.fix_question_section() self.fix_question_section()
@ -90,7 +91,12 @@ class Filter:
return soup return soup
def remove_ads(self): def remove_ads(self) -> None:
"""Removes ads found in the list of search result divs
Returns:
None (The soup object is modified directly)
"""
if not self.main_divs: if not self.main_divs:
return return
@ -99,7 +105,16 @@ class Filter:
if has_ad_content(_.text)] if has_ad_content(_.text)]
_ = div.decompose() if len(div_ads) else None _ = div.decompose() if len(div_ads) else None
def fix_question_section(self): def fix_question_section(self) -> None:
"""Collapses the "People Also Asked" section into a "details" element
These sections are typically the only sections in the results page that
are structured as <div><h2>Title</h2><div>...</div></div>, so they are
extracted by checking all result divs for h2 children.
Returns:
None (The soup object is modified directly)
"""
if not self.main_divs: if not self.main_divs:
return return
@ -126,7 +141,14 @@ class Filter:
for question in questions: for question in questions:
question['style'] = 'padding: 10px; font-style: italic;' question['style'] = 'padding: 10px; font-style: italic;'
def update_element_src(self, element, mime): def update_element_src(self, element: Tag, mime: str) -> None:
"""Encrypts the original src of an element and rewrites the element src
to use the "/element?src=" pass-through.
Returns:
None (The soup element is modified directly)
"""
src = element['src'] src = element['src']
if src.startswith('//'): if src.startswith('//'):
@ -145,7 +167,8 @@ class Filter:
src, src,
is_element=True) + '&type=' + urlparse.quote(mime) is_element=True) + '&type=' + urlparse.quote(mime)
def update_styling(self, soup): def update_styling(self, soup) -> None:
""""""
# Remove unnecessary button(s) # Remove unnecessary button(s)
for button in soup.find_all('button'): for button in soup.find_all('button'):
button.decompose() button.decompose()
@ -168,7 +191,17 @@ class Filter:
except AttributeError: except AttributeError:
pass pass
def update_link(self, link): def update_link(self, link: Tag) -> None:
"""Update internal link paths with encrypted path, otherwise remove
unnecessary redirects and/or marketing params from the url
Args:
link: A bs4 Tag element to inspect and update
Returns:
None (the tag is updated directly)
"""
# Replace href with only the intended destination (no "utm" type tags) # Replace href with only the intended destination (no "utm" type tags)
href = link['href'].replace('https://www.google.com', '') href = link['href'].replace('https://www.google.com', '')
if 'advanced_search' in href or 'tbm=shop' in href: if 'advanced_search' in href or 'tbm=shop' in href:

@ -29,10 +29,10 @@ class TorError(Exception):
altogether). altogether).
""" """
def __init__(self, message, disable=False): def __init__(self, message, disable=False) -> None:
self.message = message self.message = message
self.disable = disable self.disable = disable
super().__init__(self.message) super().__init__(message)
def send_tor_signal(signal: Signal) -> bool: def send_tor_signal(signal: Signal) -> bool:
@ -64,7 +64,7 @@ def gen_query(query, args, config, near_city=None) -> str:
# Use :past(hour/day/week/month/year) if available # Use :past(hour/day/week/month/year) if available
# example search "new restaurants :past month" # example search "new restaurants :past month"
sub_lang = '' lang = ''
if ':past' in query and 'tbs' not in args: if ':past' in query and 'tbs' not in args:
time_range = str.strip(query.split(':past', 1)[-1]) time_range = str.strip(query.split(':past', 1)[-1])
param_dict['tbs'] = '&tbs=' + ('qdr:' + str.lower(time_range[0])) param_dict['tbs'] = '&tbs=' + ('qdr:' + str.lower(time_range[0]))
@ -79,9 +79,10 @@ def gen_query(query, args, config, near_city=None) -> str:
# Example: # Example:
# &tbs=qdr:h,lr:lang_1pl # &tbs=qdr:h,lr:lang_1pl
# -- the lr param needs to be extracted and remove the leading '1' # -- the lr param needs to be extracted and remove the leading '1'
sub_lang = [_ for _ in result_tbs.split(',') if 'lr:' in _] result_params = [_ for _ in result_tbs.split(',') if 'lr:' in _]
sub_lang = sub_lang[0][sub_lang[0].find('lr:') + if len(result_params) > 0:
3:len(sub_lang[0])] if len(sub_lang) > 0 else '' result_param = result_params[0]
lang = result_param[result_param.find('lr:') + 3:len(result_param)]
# Ensure search query is parsable # Ensure search query is parsable
query = urlparse.quote(query) query = urlparse.quote(query)
@ -103,8 +104,8 @@ def gen_query(query, args, config, near_city=None) -> str:
if 'source' in args: if 'source' in args:
param_dict['source'] = '&source=' + args.get('source') param_dict['source'] = '&source=' + args.get('source')
param_dict['lr'] = ('&lr=' + ''.join( param_dict['lr'] = ('&lr=' + ''.join(
[_ for _ in sub_lang if not _.isdigit()] [_ for _ in lang if not _.isdigit()]
)) if sub_lang else '' )) if lang else ''
else: else:
param_dict['lr'] = ( param_dict['lr'] = (
'&lr=' + config.lang_search '&lr=' + config.lang_search
@ -150,12 +151,12 @@ class Request:
# Set up proxy, if previously configured # Set up proxy, if previously configured
if os.environ.get('WHOOGLE_PROXY_LOC'): if os.environ.get('WHOOGLE_PROXY_LOC'):
auth_str = '' auth_str = ''
if os.environ.get('WHOOGLE_PROXY_USER'): if os.environ.get('WHOOGLE_PROXY_USER', ''):
auth_str = os.environ.get('WHOOGLE_PROXY_USER') + \ auth_str = os.environ.get('WHOOGLE_PROXY_USER', '') + \
':' + os.environ.get('WHOOGLE_PROXY_PASS') ':' + os.environ.get('WHOOGLE_PROXY_PASS', '')
self.proxies = { self.proxies = {
'http': os.environ.get('WHOOGLE_PROXY_TYPE') + '://' + 'http': os.environ.get('WHOOGLE_PROXY_TYPE', '') + '://' +
auth_str + '@' + os.environ.get('WHOOGLE_PROXY_LOC'), auth_str + '@' + os.environ.get('WHOOGLE_PROXY_LOC', ''),
} }
self.proxies['https'] = self.proxies['http'].replace('http', self.proxies['https'] = self.proxies['http'].replace('http',
'https') 'https')

@ -347,7 +347,7 @@ def window():
return render_template('display.html', response=results) return render_template('display.html', response=results)
def run_app(): def run_app() -> None:
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Whoogle Search console runner') description='Whoogle Search console runner')
parser.add_argument( parser.add_argument(

@ -57,6 +57,7 @@ def get_first_link(soup: BeautifulSoup) -> str:
# Return the first search result URL # Return the first search result URL
if 'url?q=' in a['href']: if 'url?q=' in a['href']:
return filter_link_args(a['href']) return filter_link_args(a['href'])
return ''
def get_site_alt(link: str) -> str: def get_site_alt(link: str) -> str:

@ -24,15 +24,24 @@ def needs_https(url: str) -> bool:
bool: True/False representing the need to upgrade bool: True/False representing the need to upgrade
""" """
https_only = os.getenv('HTTPS_ONLY', False) https_only = bool(os.getenv('HTTPS_ONLY', 0))
is_heroku = url.endswith('.herokuapp.com') is_heroku = url.endswith('.herokuapp.com')
is_http = url.startswith('http://') is_http = url.startswith('http://')
return (is_heroku and is_http) or (https_only and is_http) return (is_heroku and is_http) or (https_only and is_http)
def has_captcha(site_contents: str) -> bool: def has_captcha(results: str) -> bool:
return CAPTCHA in site_contents """Checks to see if the search results are blocked by a captcha
Args:
results: The search page html as a string
Returns:
bool: True/False indicating if a captcha element was found
"""
return CAPTCHA in results
class Search: class Search:
@ -118,23 +127,23 @@ class Search:
""" """
mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent
content_filter = Filter( content_filter = Filter(self.session['fernet_keys'],
self.session['fernet_keys'], mobile=mobile,
mobile=mobile, config=self.config)
config=self.config) full_query = gen_query(self.query,
full_query = gen_query( self.request_params,
self.query, self.config,
self.request_params, content_filter.near)
self.config,
content_filter.near)
get_body = g.user_request.send(query=full_query) get_body = g.user_request.send(query=full_query)
# Produce cleanable html soup from response # Produce cleanable html soup from response
html_soup = bsoup(content_filter.reskin(get_body.text), 'html.parser') html_soup = bsoup(content_filter.reskin(get_body.text), 'html.parser')
html_soup.insert(
0, # Indicate whether or not a Tor connection is active
bsoup(TOR_BANNER, 'html.parser') tor_banner = bsoup('', 'html.parser')
if g.user_request.tor_valid else bsoup('', 'html.parser')) if g.user_request.tor_valid:
tor_banner = bsoup(TOR_BANNER, 'html.parser')
html_soup.insert(0, tor_banner)
if self.feeling_lucky: if self.feeling_lucky:
return get_first_link(html_soup), 0 return get_first_link(html_soup), 0

Loading…
Cancel
Save