searxng/searx/query.py
Grant Lanham 44a06190bb [refactor] unit tests to utilize paramaterized and break down monolithic tests
- for tests which perform the same arrange/act/assert pattern but with different
  data, the data portion has been moved to the ``paramaterized.expand`` fields

- for monolithic tests which performed multiple arrange/act/asserts,
  they have been broken up into different unit tests.

- when possible, change generic assert statements to more concise
  asserts (i.e. ``assertIsNone``)

This work ultimately is focused on creating smaller and more concise tests.
While paramaterized may make adding new configurations for existing tests
easier, that is just a beneficial side effect.  The main benefit is that smaller
tests are easier to reason about, meaning they are easier to debug when they
start failing.  This improves the developer experience in debugging what went
wrong when refactoring the project.

Total number of tests went from 192 -> 259; or, broke apart larger tests into 69
more concise ones.
2024-10-03 13:20:32 +02:00

351 lines
13 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=invalid-name, missing-module-docstring, missing-class-docstring
from __future__ import annotations
from abc import abstractmethod, ABC
import re
from searx import settings
from searx.sxng_locales import sxng_locales
from searx.engines import categories, engines, engine_shortcuts
from searx.external_bang import get_bang_definition_and_autocomplete
from searx.search import EngineRef
from searx.webutils import VALID_LANGUAGE_CODE
class QueryPartParser(ABC):
__slots__ = "raw_text_query", "enable_autocomplete"
@staticmethod
@abstractmethod
def check(raw_value):
"""Check if raw_value can be parsed"""
def __init__(self, raw_text_query, enable_autocomplete):
self.raw_text_query = raw_text_query
self.enable_autocomplete = enable_autocomplete
@abstractmethod
def __call__(self, raw_value):
"""Try to parse raw_value: set the self.raw_text_query properties
return True if raw_value has been parsed
self.raw_text_query.autocomplete_list is also modified
if self.enable_autocomplete is True
"""
def _add_autocomplete(self, value):
if value not in self.raw_text_query.autocomplete_list:
self.raw_text_query.autocomplete_list.append(value)
class TimeoutParser(QueryPartParser):
@staticmethod
def check(raw_value):
return raw_value[0] == '<'
def __call__(self, raw_value):
value = raw_value[1:]
found = self._parse(value) if len(value) > 0 else False
if self.enable_autocomplete and not value:
self._autocomplete()
return found
def _parse(self, value):
if not value.isdigit():
return False
raw_timeout_limit = int(value)
if raw_timeout_limit < 100:
# below 100, the unit is the second ( <3 = 3 seconds timeout )
self.raw_text_query.timeout_limit = float(raw_timeout_limit)
else:
# 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout )
self.raw_text_query.timeout_limit = raw_timeout_limit / 1000.0
return True
def _autocomplete(self):
for suggestion in ['<3', '<850']:
self._add_autocomplete(suggestion)
class LanguageParser(QueryPartParser):
@staticmethod
def check(raw_value):
return raw_value[0] == ':'
def __call__(self, raw_value):
value = raw_value[1:].lower().replace('_', '-')
found = self._parse(value) if len(value) > 0 else False
if self.enable_autocomplete and not found:
self._autocomplete(value)
return found
def _parse(self, value):
found = False
# check if any language-code is equal with
# declared language-codes
for lc in sxng_locales:
lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)
# if correct language-code is found
# set it as new search-language
if (
value == lang_id or value == lang_name or value == english_name or value.replace('-', ' ') == country
) and value not in self.raw_text_query.languages:
found = True
lang_parts = lang_id.split('-')
if len(lang_parts) == 2:
self.raw_text_query.languages.append(lang_parts[0] + '-' + lang_parts[1].upper())
else:
self.raw_text_query.languages.append(lang_id)
# to ensure best match (first match is not necessarily the best one)
if value == lang_id:
break
# user may set a valid, yet not selectable language
if VALID_LANGUAGE_CODE.match(value) or value == 'auto':
lang_parts = value.split('-')
if len(lang_parts) > 1:
value = lang_parts[0].lower() + '-' + lang_parts[1].upper()
if value not in self.raw_text_query.languages:
self.raw_text_query.languages.append(value)
found = True
return found
def _autocomplete(self, value):
if not value:
# show some example queries
if len(settings['search']['languages']) < 10:
for lang in settings['search']['languages']:
self.raw_text_query.autocomplete_list.append(':' + lang)
else:
for lang in [":en", ":en_us", ":english", ":united_kingdom"]:
self.raw_text_query.autocomplete_list.append(lang)
return
for lc in sxng_locales:
if lc[0] not in settings['search']['languages']:
continue
lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)
# check if query starts with language-id
if lang_id.startswith(value):
if len(value) <= 2:
self._add_autocomplete(':' + lang_id.split('-')[0])
else:
self._add_autocomplete(':' + lang_id)
# check if query starts with language name
if lang_name.startswith(value) or english_name.startswith(value):
self._add_autocomplete(':' + lang_name)
# check if query starts with country
# here "new_zealand" is "new-zealand" (see __call__)
if country.startswith(value.replace('-', ' ')):
self._add_autocomplete(':' + country.replace(' ', '_'))
class ExternalBangParser(QueryPartParser):
@staticmethod
def check(raw_value):
return raw_value.startswith('!!') and len(raw_value) > 2
def __call__(self, raw_value):
value = raw_value[2:]
found, bang_ac_list = self._parse(value) if len(value) > 0 else (False, [])
if self.enable_autocomplete:
self._autocomplete(bang_ac_list)
return found
def _parse(self, value):
found = False
bang_definition, bang_ac_list = get_bang_definition_and_autocomplete(value)
if bang_definition is not None:
self.raw_text_query.external_bang = value
found = True
return found, bang_ac_list
def _autocomplete(self, bang_ac_list):
if not bang_ac_list:
bang_ac_list = ['g', 'ddg', 'bing']
for external_bang in bang_ac_list:
self._add_autocomplete('!!' + external_bang)
class BangParser(QueryPartParser):
@staticmethod
def check(raw_value):
# make sure it's not any bang with double '!!'
return raw_value[0] == '!' and (len(raw_value) < 2 or raw_value[1] != '!')
def __call__(self, raw_value):
value = raw_value[1:].replace('-', ' ').replace('_', ' ')
found = self._parse(value) if len(value) > 0 else False
if found and raw_value[0] == '!':
self.raw_text_query.specific = True
if self.enable_autocomplete:
self._autocomplete(raw_value[0], value)
return found
def _parse(self, value):
# check if prefix is equal with engine shortcut
if value in engine_shortcuts: # pylint: disable=consider-using-get
value = engine_shortcuts[value]
# check if prefix is equal with engine name
if value in engines:
self.raw_text_query.enginerefs.append(EngineRef(value, 'none'))
return True
# check if prefix is equal with category name
if value in categories:
# using all engines for that search, which
# are declared under that category name
self.raw_text_query.enginerefs.extend(
EngineRef(engine.name, value)
for engine in categories[value]
if (engine.name, value) not in self.raw_text_query.disabled_engines
)
return True
return False
def _autocomplete(self, first_char, value):
if not value:
# show some example queries
for suggestion in ['images', 'wikipedia', 'osm']:
if suggestion not in self.raw_text_query.disabled_engines or suggestion in categories:
self._add_autocomplete(first_char + suggestion)
return
# check if query starts with category name
for category in categories:
if category.startswith(value):
self._add_autocomplete(first_char + category.replace(' ', '_'))
# check if query starts with engine name
for engine in engines:
if engine.startswith(value):
self._add_autocomplete(first_char + engine.replace(' ', '_'))
# check if query starts with engine shortcut
for engine_shortcut in engine_shortcuts:
if engine_shortcut.startswith(value):
self._add_autocomplete(first_char + engine_shortcut)
class FeelingLuckyParser(QueryPartParser):
@staticmethod
def check(raw_value):
return raw_value == '!!'
def __call__(self, raw_value):
self.raw_text_query.redirect_to_first_result = True
return True
class RawTextQuery:
"""parse raw text query (the value from the html input)"""
PARSER_CLASSES = [
TimeoutParser, # force the timeout
LanguageParser, # force a language
ExternalBangParser, # external bang (must be before BangParser)
BangParser, # force an engine or category
FeelingLuckyParser, # redirect to the first link in the results list
]
def __init__(self, query: str, disabled_engines: list):
assert isinstance(query, str)
# input parameters
self.query = query
self.disabled_engines = disabled_engines if disabled_engines else []
# parsed values
self.enginerefs = []
self.languages = []
self.timeout_limit = None
self.external_bang = None
self.specific = False
self.autocomplete_list = []
# internal properties
self.query_parts = [] # use self.getFullQuery()
self.user_query_parts = [] # use self.getQuery()
self.autocomplete_location = None
self.redirect_to_first_result = False
self._parse_query()
def _parse_query(self):
"""
parse self.query, if tags are set, which
change the search engine or search-language
"""
# split query, including whitespaces
raw_query_parts = re.split(r'(\s+)', self.query)
last_index_location = None
autocomplete_index = len(raw_query_parts) - 1
for i, query_part in enumerate(raw_query_parts):
# part does only contain spaces, skip
if query_part.isspace() or query_part == '':
continue
# parse special commands
special_part = False
for parser_class in RawTextQuery.PARSER_CLASSES:
if parser_class.check(query_part):
special_part = parser_class(self, i == autocomplete_index)(query_part)
break
# append query part to query_part list
qlist = self.query_parts if special_part else self.user_query_parts
qlist.append(query_part)
last_index_location = (qlist, len(qlist) - 1)
self.autocomplete_location = last_index_location
def get_autocomplete_full_query(self, text):
qlist, position = self.autocomplete_location
qlist[position] = text
return self.getFullQuery()
def changeQuery(self, query):
self.user_query_parts = query.strip().split()
self.query = self.getFullQuery()
self.autocomplete_location = (self.user_query_parts, len(self.user_query_parts) - 1)
self.autocomplete_list = []
return self
def getQuery(self):
return ' '.join(self.user_query_parts)
def getFullQuery(self):
"""
get full query including whitespaces
"""
return '{0} {1}'.format(' '.join(self.query_parts), self.getQuery()).strip()
def __str__(self):
return self.getFullQuery()
def __repr__(self):
return (
f"<{self.__class__.__name__} "
+ f"query={self.query!r} "
+ f"disabled_engines={self.disabled_engines!r}\n "
+ f"languages={self.languages!r} "
+ f"timeout_limit={self.timeout_limit!r} "
+ f"external_bang={self.external_bang!r} "
+ f"specific={self.specific!r} "
+ f"enginerefs={self.enginerefs!r}\n "
+ f"autocomplete_list={self.autocomplete_list!r}\n "
+ f"query_parts={self.query_parts!r}\n "
+ f"user_query_parts={self.user_query_parts!r} >\n"
+ f"redirect_to_first_result={self.redirect_to_first_result!r}"
)