From 1cca8da0fb786e16b3834efc4f934ffa920fad15 Mon Sep 17 00:00:00 2001 From: David Shen Date: Fri, 22 Mar 2024 11:54:53 -0400 Subject: [PATCH] Move bang loading and suggestions to bangs.py --- app/__init__.py | 10 +++++-- app/routes.py | 43 ++++------------------------- app/utils/bangs.py | 69 +++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 78 insertions(+), 44 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index b108ab4..d84997a 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,7 +1,7 @@ from app.filter import clean_query from app.request import send_tor_signal from app.utils.session import generate_key -from app.utils.bangs import gen_bangs_json +from app.utils.bangs import gen_bangs_json, load_all_bangs from app.utils.misc import gen_file_hash, read_config_bool from base64 import b64encode from bs4 import MarkupResemblesLocatorWarning @@ -139,7 +139,9 @@ app.config['CSP'] = 'default-src \'none\';' \ 'connect-src \'self\';' # Generate DDG bang filter +generating_bangs = False if not os.path.exists(app.config['BANG_FILE']): + generating_bangs = True json.dump({}, open(app.config['BANG_FILE'], 'w')) bangs_thread = threading.Thread( target=gen_bangs_json, @@ -180,7 +182,11 @@ send_tor_signal(Signal.HEARTBEAT) warnings.simplefilter('ignore', MarkupResemblesLocatorWarning) from app import routes # noqa -routes.load_bangs() + +# The gen_bangs_json function takes care of loading bangs, so skip it here if +# it's already being loaded +if not generating_bangs: + load_all_bangs(app.config['BANG_FILE']) # Disable logging from imported modules logging.config.dictConfig({ diff --git a/app/routes.py b/app/routes.py index a3fa868..331f4e8 100644 --- a/app/routes.py +++ b/app/routes.py @@ -8,7 +8,8 @@ import re import urllib.parse as urlparse import uuid import validators -import glob +import sys +import traceback from datetime import datetime, timedelta from functools import wraps @@ -17,7 +18,7 @@ from app import app from app.models.config import Config from app.models.endpoint import Endpoint from app.request import Request, TorError -from app.utils.bangs import resolve_bang +from app.utils.bangs import suggest_bang, resolve_bang from app.utils.misc import empty_gif, placeholder_img, get_proxy_host_url, \ fetch_favicon from app.filter import Filter @@ -37,38 +38,10 @@ from cryptography.fernet import Fernet, InvalidToken from cryptography.exceptions import InvalidSignature from werkzeug.datastructures import MultiDict -# Load DDG bang json files only on init -bang_json = {} - ac_var = 'WHOOGLE_AUTOCOMPLETE' autocomplete_enabled = os.getenv(ac_var, '1') -def load_bangs(): - global bang_json - bangs = {} - bang_files = glob.glob(os.path.join(app.config['BANG_PATH'], '*.json')) - - # Normalize the paths - bang_files = [os.path.normpath(f) for f in bang_files] - - # Move the ddg bangs file to the beginning - ddg_bangs_file = os.path.normpath(app.config['BANG_FILE']) - bang_files = sorted([f for f in bang_files if f != ddg_bangs_file]) - bang_files.insert(0, ddg_bangs_file) - - for i, bang_file in enumerate(bang_files): - try: - bangs |= json.load(open(bang_file)) - except json.decoder.JSONDecodeError: - # Ignore decoding error only for the ddg bangs file, since this can - # occur if file is still being written - if i != 0: - raise - - bang_json = dict(sorted(bangs.items())) - - def get_search_name(tbm): for tab in app.config['HEADER_TABS'].values(): if tab['tbm'] == tbm: @@ -156,7 +129,6 @@ def session_required(f): @app.before_request def before_request_func(): - global bang_json session.permanent = True # Check for latest version if needed @@ -198,10 +170,6 @@ def before_request_func(): g.app_location = g.user_config.url - # Attempt to reload bangs json if not generated yet - if not bang_json and os.path.getsize(app.config['BANG_FILE']) > 4: - load_bangs() - @app.after_request def after_request_func(resp): @@ -303,8 +271,7 @@ def autocomplete(): # Search bangs if the query begins with "!", but not "! " (feeling lucky) if q.startswith('!') and len(q) > 1 and not q.startswith('! '): - return jsonify([q, [bang_json[_]['suggestion'] for _ in bang_json if - _.startswith(q)]]) + return jsonify([q, suggest_bang(q)]) if not q and not request.data: return jsonify({'?': []}) @@ -335,7 +302,7 @@ def search(): search_util = Search(request, g.user_config, g.session_key) query = search_util.new_search_query() - bang = resolve_bang(query, bang_json) + bang = resolve_bang(query) if bang: return redirect(bang) diff --git a/app/utils/bangs.py b/app/utils/bangs.py index ac18f6a..ea6a060 100644 --- a/app/utils/bangs.py +++ b/app/utils/bangs.py @@ -1,10 +1,58 @@ import json import requests import urllib.parse as urlparse +import os +import glob +bangs_dict = {} DDG_BANGS = 'https://duckduckgo.com/bang.js' +def load_all_bangs(ddg_bangs_file: str, ddg_bangs: dict = {}): + """Loads all the bang files in alphabetical order + + Args: + ddg_bangs_file: The str path to the new DDG bangs json file + ddg_bangs: The dict of ddg bangs. If this is empty, it will load the + bangs from the file + + Returns: + None + + """ + global bangs_dict + ddg_bangs_file = os.path.normpath(ddg_bangs_file) + + if (bangs_dict and not ddg_bangs) or os.path.getsize(ddg_bangs_file) <= 4: + return + + bangs = {} + bangs_dir = os.path.dirname(ddg_bangs_file) + bang_files = glob.glob(os.path.join(bangs_dir, '*.json')) + + # Normalize the paths + bang_files = [os.path.normpath(f) for f in bang_files] + + # Move the ddg bangs file to the beginning + bang_files = sorted([f for f in bang_files if f != ddg_bangs_file]) + + if ddg_bangs: + bangs |= ddg_bangs + else: + bang_files.insert(0, ddg_bangs_file) + + for i, bang_file in enumerate(bang_files): + try: + bangs |= json.load(open(bang_file)) + except json.decoder.JSONDecodeError: + # Ignore decoding error only for the ddg bangs file, since this can + # occur if file is still being written + if i != 0: + raise + + bangs_dict = dict(sorted(bangs.items())) + + def gen_bangs_json(bangs_file: str) -> None: """Generates a json file from the DDG bangs list @@ -37,22 +85,35 @@ def gen_bangs_json(bangs_file: str) -> None: json.dump(bangs_data, open(bangs_file, 'w')) print('* Finished creating ddg bangs json') + load_all_bangs(bangs_file, bangs_data) + + +def suggest_bang(query: str) -> list[str]: + """Suggests bangs for a user's query + + Args: + query: The search query + + Returns: + list[str]: A list of bang suggestions + + """ + global bangs_dict + return [bangs_dict[_]['suggestion'] for _ in bangs_dict if _.startswith(query)] -def resolve_bang(query: str, bangs_dict: dict) -> str: +def resolve_bang(query: str) -> str: """Transform's a user's query to a bang search, if an operator is found Args: query: The search query - bangs_dict: The dict of available bang operators, with corresponding - format string search URLs - (i.e. "!w": "https://en.wikipedia.org...?search={}") Returns: str: A formatted redirect for a bang search, or an empty str if there wasn't a match or didn't contain a bang operator """ + global bangs_dict #if ! not in query simply return (speed up processing) if '!' not in query: