Switch to single Fernet key per session

This moves away from the previous (messy) approach of using two separate
keys for decrypting text and element URLs separately and regenerating
them for new searches. The current implementation of sessions is not very
reliable, which lead to keys being regenerated too soon, which would
break page navigation. Until that can be addressed, the single
key per session approach should work a lot better.

Fixes #250

Fixes #90
This commit is contained in:
Ben Busby 2021-04-01 00:23:30 -04:00
parent 86ae2bda3e
commit a0110fda8d
No known key found for this signature in database
GPG Key ID: 3B08611DF6E62ED2
10 changed files with 54 additions and 83 deletions

View File

@ -1,5 +1,5 @@
from app.request import send_tor_signal
from app.utils.session import generate_user_keys
from app.utils.session import generate_user_key
from app.utils.bangs import gen_bangs_json
from flask import Flask
from flask_session import Session
@ -17,8 +17,7 @@ if os.getenv("WHOOGLE_DOTENV", ''):
load_dotenv(os.path.join(os.path.dirname(os.path.abspath(__file__)),
dotenv_path))
app.user_elements = {}
app.default_key_set = generate_user_keys()
app.default_key = generate_user_key()
app.no_cookie_ips = []
app.config['SECRET_KEY'] = os.urandom(32)
app.config['SESSION_TYPE'] = 'filesystem'

View File

@ -9,7 +9,7 @@ from urllib.parse import parse_qs
class Filter:
def __init__(self, user_keys: dict, mobile=False, config=None) -> None:
def __init__(self, user_key: str, mobile=False, config=None) -> None:
if config is None:
config = {}
@ -19,7 +19,7 @@ class Filter:
self.new_tab = config['new_tab'] if 'new_tab' in config else False
self.alt_redirect = config['alts'] if 'alts' in config else False
self.mobile = mobile
self.user_keys = user_keys
self.user_key = user_key
self.main_divs = ResultSet('')
self._elements = 0
@ -45,15 +45,11 @@ class Filter:
if is_element:
# Element paths are encrypted separately from text, to allow key
# regeneration once all items have been served to the user
enc_path = Fernet(
self.user_keys['element_key']
).encrypt(path.encode()).decode()
enc_path = Fernet(self.user_key).encrypt(path.encode()).decode()
self._elements += 1
return enc_path
return Fernet(
self.user_keys['text_key']
).encrypt(path.encode()).decode()
return Fernet(self.user_key).encrypt(path.encode()).decode()
def clean(self, soup) -> BeautifulSoup:
self.main_divs = soup.find('div', {'id': 'main'})

View File

@ -108,7 +108,7 @@ def gen_query(query, args, config, near_city=None) -> str:
)) if lang else ''
else:
param_dict['lr'] = (
'&lr=' + config.lang_search
'&lr=' + config.lang_search
) if config.lang_search else ''
# 'nfpr' defines the exclusion of results from an auto-corrected query
@ -117,7 +117,7 @@ def gen_query(query, args, config, near_city=None) -> str:
param_dict['cr'] = ('&cr=' + config.ctry) if config.ctry else ''
param_dict['hl'] = (
'&hl=' + config.lang_interface.replace('lang_', '')
'&hl=' + config.lang_interface.replace('lang_', '')
) if config.lang_interface else ''
param_dict['safe'] = '&safe=' + ('active' if config.safe else 'off')

View File

@ -56,15 +56,12 @@ def before_request_func():
session['config'] = json.load(open(app.config['DEFAULT_CONFIG'])) \
if os.path.exists(app.config['DEFAULT_CONFIG']) else {}
session['uuid'] = str(uuid.uuid4())
session['fernet_keys'] = generate_user_keys(True)
session['key'] = generate_user_key(True)
# Flag cookies as possibly disabled in order to prevent against
# unnecessary session directory expansion
g.cookies_disabled = True
if session['uuid'] not in app.user_elements:
app.user_elements.update({session['uuid']: 0})
# Handle https upgrade
if needs_https(request.url):
return redirect(
@ -88,13 +85,6 @@ def before_request_func():
@app.after_request
def after_request_func(resp):
if app.user_elements[session['uuid']] <= 0 and '/element' in request.url:
# Regenerate element key if all elements have been served to user
session['fernet_keys'][
'element_key'] = '' if not g.cookies_disabled else \
app.default_key_set['element_key']
app.user_elements[session['uuid']] = 0
# Check if address consistently has cookies blocked,
# in which case start removing session files after creation.
#
@ -125,7 +115,7 @@ def unknown_page(e):
@auth_required
def index():
# Reset keys
session['fernet_keys'] = generate_user_keys(g.cookies_disabled)
session['key'] = generate_user_key(g.cookies_disabled)
# Redirect if an error was raised
if 'error_message' in session and session['error_message']:
@ -193,9 +183,6 @@ def autocomplete():
@app.route('/search', methods=['GET', 'POST'])
@auth_required
def search():
# Reset element counter
app.user_elements[session['uuid']] = 0
# Update user config if specified in search args
g.user_config = g.user_config.from_params(g.request_params)
@ -213,7 +200,7 @@ def search():
# Generate response and number of external elements from the page
try:
response, elements = search_util.generate_response()
response = search_util.generate_response()
except TorError as e:
session['error_message'] = e.message + (
"\\n\\nTor config is now disabled!" if e.disable else "")
@ -221,13 +208,9 @@ def search():
'tor']
return redirect(url_for('.index'))
if search_util.feeling_lucky or elements < 0:
if search_util.feeling_lucky:
return redirect(response, code=303)
# Keep count of external elements to fetch before
# the element key can be regenerated
app.user_elements[session['uuid']] = elements
# Return 503 if temporarily blocked by captcha
resp_code = 503 if has_captcha(str(response)) else 200
@ -309,13 +292,12 @@ def imgres():
@app.route('/element')
@auth_required
def element():
cipher_suite = Fernet(session['fernet_keys']['element_key'])
cipher_suite = Fernet(session['key'])
src_url = cipher_suite.decrypt(request.args.get('url').encode()).decode()
src_type = request.args.get('type')
try:
file_data = g.user_request.send(base_url=src_url).content
app.user_elements[session['uuid']] -= 1
tmp_mem = io.BytesIO()
tmp_mem.write(file_data)
tmp_mem.seek(0)

View File

@ -1,5 +1,5 @@
from app.filter import Filter, get_first_link
from app.utils.session import generate_user_keys
from app.utils.session import generate_user_key
from app.request import gen_query
from bs4 import BeautifulSoup as bsoup
from cryptography.fernet import Fernet, InvalidToken
@ -87,10 +87,6 @@ class Search:
str: A valid query string
"""
# Generate a new element key each time a new search is performed
self.session['fernet_keys']['element_key'] = generate_user_keys(
cookies_disabled=self.cookies_disabled)['element_key']
q = self.request_params.get('q')
if q is None or len(q) == 0:
@ -98,36 +94,26 @@ class Search:
else:
# Attempt to decrypt if this is an internal link
try:
q = Fernet(
self.session['fernet_keys']['text_key']
).decrypt(q.encode()).decode()
q = Fernet(self.session['key']).decrypt(q.encode()).decode()
except InvalidToken:
pass
# Reset text key
self.session['fernet_keys']['text_key'] = generate_user_keys(
cookies_disabled=self.cookies_disabled)['text_key']
# Strip leading '! ' for "feeling lucky" queries
self.feeling_lucky = q.startswith('! ')
self.query = q[2:] if self.feeling_lucky else q
return self.query
def generate_response(self) -> Tuple[Any, int]:
def generate_response(self) -> str:
"""Generates a response for the user's query
Returns:
Tuple[Any, int]: A tuple in the format (response, # of elements)
For example, in the case of a "feeling lucky"
search, the response is a result URL, with no
encrypted elements to account for. Otherwise, the
response is a BeautifulSoup response body, with
N encrypted elements to track before key regen.
str: A string response to the search query, in the form of a URL
or string representation of HTML content.
"""
mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent
content_filter = Filter(self.session['fernet_keys'],
content_filter = Filter(self.session['key'],
mobile=mobile,
config=self.config)
full_query = gen_query(self.query,
@ -146,7 +132,7 @@ class Search:
html_soup.insert(0, tor_banner)
if self.feeling_lucky:
return get_first_link(html_soup), 0
return get_first_link(html_soup)
else:
formatted_results = content_filter.clean(html_soup)
@ -161,4 +147,4 @@ class Search:
continue
link['href'] += param_str
return formatted_results, content_filter.elements
return str(formatted_results)

View File

@ -1,29 +1,26 @@
from cryptography.fernet import Fernet
from flask import current_app as app
REQUIRED_SESSION_VALUES = ['uuid', 'config', 'fernet_keys']
REQUIRED_SESSION_VALUES = ['uuid', 'config', 'key']
def generate_user_keys(cookies_disabled=False) -> dict:
"""Generates a set of user keys
def generate_user_key(cookies_disabled=False) -> bytes:
"""Generates a key for encrypting searches and element URLs
Args:
cookies_disabled: Flag for whether or not cookies are disabled by the
user. If so, the user can only use the default key
set generated on app init for queries.
generated on app init for queries.
Returns:
dict: A new Fernet key set
str: A unique Fernet key
"""
if cookies_disabled:
return app.default_key_set
return app.default_key
# Generate/regenerate unique key per user
return {
'element_key': Fernet.generate_key(),
'text_key': Fernet.generate_key()
}
return Fernet.generate_key()
def valid_user_session(session: dict) -> bool:

View File

@ -1,5 +1,5 @@
from app import app
from app.utils.session import generate_user_keys
from app.utils.session import generate_user_key
import pytest
import random
@ -18,6 +18,6 @@ def client():
with app.test_client() as client:
with client.session_transaction() as session:
session['uuid'] = 'test'
session['fernet_keys'] = generate_user_keys()
session['key'] = generate_user_key()
session['config'] = {}
yield client

View File

@ -1,20 +1,26 @@
from app.utils.session import generate_user_keys, valid_user_session
from cryptography.fernet import Fernet
from app.utils.session import generate_user_key, valid_user_session
def test_generate_user_keys():
keys = generate_user_keys()
assert 'text_key' in keys
assert 'element_key' in keys
assert keys['text_key'] not in keys['element_key']
key = generate_user_key()
assert Fernet(key)
assert generate_user_key() != key
def test_valid_session(client):
assert not valid_user_session({'fernet_keys': '', 'config': {}})
assert not valid_user_session({'key': '', 'config': {}})
with client.session_transaction() as session:
assert valid_user_session(session)
def test_request_key_generation(client):
def test_query_decryption(client):
# FIXME: Handle decryption errors in search.py and rewrite test
# This previously was used to test swapping decryption keys between
# queries. While this worked in theory and usually didn't cause problems,
# they were tied to session IDs and those are really unreliable (meaning
# that occasionally page navigation would break).
rv = client.get('/')
cookie = rv.headers['Set-Cookie']
@ -23,11 +29,9 @@ def test_request_key_generation(client):
with client.session_transaction() as session:
assert valid_user_session(session)
text_key = session['fernet_keys']['text_key']
rv = client.get('/search?q=test+2', headers={'Cookie': cookie})
assert rv._status_code == 200
with client.session_transaction() as session:
assert valid_user_session(session)
assert text_key not in session['fernet_keys']['text_key']

View File

@ -1,13 +1,13 @@
from bs4 import BeautifulSoup
from app.filter import Filter
from app.utils.session import generate_user_keys
from app.utils.session import generate_user_key
from datetime import datetime
from dateutil.parser import *
def get_search_results(data):
secret_key = generate_user_keys()
soup = Filter(user_keys=secret_key).clean(
secret_key = generate_user_key()
soup = Filter(user_key=secret_key).clean(
BeautifulSoup(data, 'html.parser'))
main_divs = soup.find('div', {'id': 'main'})

View File

@ -19,14 +19,21 @@ def test_feeling_lucky(client):
def test_ddg_bang(client):
# Bang at beginning of query
rv = client.get('/search?q=!gh%20whoogle')
assert rv._status_code == 302
assert rv.headers.get('Location').startswith('https://github.com')
rv = client.get('/search?q=!w%20github')
# Move bang to end of query
rv = client.get('/search?q=github%20!w')
assert rv._status_code == 302
assert rv.headers.get('Location').startswith('https://en.wikipedia.org')
# Move bang to middle of query
rv = client.get('/search?q=big%20!r%20chungus')
assert rv._status_code == 302
assert rv.headers.get('Location').startswith('https://www.reddit.com')
def test_config(client):
rv = client.post('/config', data=demo_config)