mirror of https://github.com/searxng/searxng
Merge pull request #2592 from dalf/update-external-bangs
[mod] add utils/fetch_external_bangs.pypull/1/head
commit
bc590cbc47
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,123 @@
|
||||
from searx.external_bang import get_node, resolve_bang_definition, get_bang_url, get_bang_definition_and_autocomplete
|
||||
from searx.search import SearchQuery, EngineRef
|
||||
from searx.testing import SearxTestCase
|
||||
|
||||
|
||||
TEST_DB = {
|
||||
'trie': {
|
||||
'exam': {
|
||||
'ple': '//example.com/' + chr(2) + chr(1) + '0',
|
||||
'*': '//wikipedia.org/wiki/' + chr(2) + chr(1) + '0',
|
||||
},
|
||||
'sea': {
|
||||
'*': 'sea' + chr(2) + chr(1) + '0',
|
||||
'rch': {
|
||||
'*': 'search' + chr(2) + chr(1) + '0',
|
||||
'ing': 'searching' + chr(2) + chr(1) + '0',
|
||||
},
|
||||
's': {
|
||||
'on': 'season' + chr(2) + chr(1) + '0',
|
||||
'capes': 'seascape' + chr(2) + chr(1) + '0',
|
||||
}
|
||||
},
|
||||
'error': ['error in external_bangs.json']
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class TestGetNode(SearxTestCase):
|
||||
|
||||
DB = {
|
||||
'trie': {
|
||||
'exam': {
|
||||
'ple': 'test',
|
||||
'*': 'not used',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def test_found(self):
|
||||
node, before, after = get_node(TestGetNode.DB, 'example')
|
||||
|
||||
self.assertEqual(node, 'test')
|
||||
self.assertEqual(before, 'example')
|
||||
self.assertEqual(after, '')
|
||||
|
||||
def test_get_partial(self):
|
||||
node, before, after = get_node(TestGetNode.DB, 'examp')
|
||||
self.assertEqual(node, TestGetNode.DB['trie']['exam'])
|
||||
self.assertEqual(before, 'exam')
|
||||
self.assertEqual(after, 'p')
|
||||
|
||||
def test_not_found(self):
|
||||
node, before, after = get_node(TestGetNode.DB, 'examples')
|
||||
self.assertEqual(node, 'test')
|
||||
self.assertEqual(before, 'example')
|
||||
self.assertEqual(after, 's')
|
||||
|
||||
|
||||
class TestResolveBangDefinition(SearxTestCase):
|
||||
|
||||
def test_https(self):
|
||||
url, rank = resolve_bang_definition('//example.com/' + chr(2) + chr(1) + '42', 'query')
|
||||
self.assertEqual(url, 'https://example.com/query')
|
||||
self.assertEqual(rank, 42)
|
||||
|
||||
def test_http(self):
|
||||
url, rank = resolve_bang_definition('http://example.com/' + chr(2) + chr(1) + '0', 'text')
|
||||
self.assertEqual(url, 'http://example.com/text')
|
||||
self.assertEqual(rank, 0)
|
||||
|
||||
|
||||
class TestGetBangDefinitionAndAutocomplete(SearxTestCase):
|
||||
|
||||
def test_found(self):
|
||||
global TEST_DB
|
||||
bang_definition, new_autocomplete = get_bang_definition_and_autocomplete('exam', external_bangs_db=TEST_DB)
|
||||
self.assertEqual(bang_definition, TEST_DB['trie']['exam']['*'])
|
||||
self.assertEqual(new_autocomplete, ['example'])
|
||||
|
||||
def test_found_optimized(self):
|
||||
global TEST_DB
|
||||
bang_definition, new_autocomplete = get_bang_definition_and_autocomplete('example', external_bangs_db=TEST_DB)
|
||||
self.assertEqual(bang_definition, TEST_DB['trie']['exam']['ple'])
|
||||
self.assertEqual(new_autocomplete, [])
|
||||
|
||||
def test_partial(self):
|
||||
global TEST_DB
|
||||
bang_definition, new_autocomplete = get_bang_definition_and_autocomplete('examp', external_bangs_db=TEST_DB)
|
||||
self.assertEqual(bang_definition, None)
|
||||
self.assertEqual(new_autocomplete, ['example'])
|
||||
|
||||
def test_partial2(self):
|
||||
global TEST_DB
|
||||
bang_definition, new_autocomplete = get_bang_definition_and_autocomplete('sea', external_bangs_db=TEST_DB)
|
||||
self.assertEqual(bang_definition, TEST_DB['trie']['sea']['*'])
|
||||
self.assertEqual(new_autocomplete, ['search', 'searching', 'seascapes', 'season'])
|
||||
|
||||
def test_error(self):
|
||||
global TEST_DB
|
||||
bang_definition, new_autocomplete = get_bang_definition_and_autocomplete('error', external_bangs_db=TEST_DB)
|
||||
self.assertEqual(bang_definition, None)
|
||||
self.assertEqual(new_autocomplete, [])
|
||||
|
||||
def test_actual_data(self):
|
||||
bang_definition, new_autocomplete = get_bang_definition_and_autocomplete('duckduckgo')
|
||||
self.assertTrue(bang_definition.startswith('//duckduckgo.com/?q='))
|
||||
self.assertEqual(new_autocomplete, [])
|
||||
|
||||
|
||||
class TestExternalBangJson(SearxTestCase):
|
||||
|
||||
def test_no_external_bang_query(self):
|
||||
result = get_bang_url(SearchQuery('test', engineref_list=[EngineRef('wikipedia', 'general')]))
|
||||
self.assertEqual(result, None)
|
||||
|
||||
def test_get_bang_url(self):
|
||||
global TEST_DB
|
||||
url = get_bang_url(SearchQuery('test', engineref_list=[], external_bang='example'), external_bangs_db=TEST_DB)
|
||||
self.assertEqual(url, 'https://example.com/test')
|
||||
|
||||
def test_actual_data(self):
|
||||
google_url = get_bang_url(SearchQuery('test', engineref_list=[], external_bang='g'))
|
||||
self.assertEqual(google_url, 'https://www.google.com/search?q=test')
|
@ -0,0 +1,161 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
Update searx/data/external_bangs.json using the duckduckgo bangs.
|
||||
|
||||
https://duckduckgo.com/newbang loads
|
||||
* a javascript which provides the bang version ( https://duckduckgo.com/bv1.js )
|
||||
* a JSON file which contains the bangs ( https://duckduckgo.com/bang.v260.js for example )
|
||||
|
||||
This script loads the javascript, then the bangs.
|
||||
|
||||
The javascript URL may change in the future ( for example https://duckduckgo.com/bv2.js ),
|
||||
but most probably it will requires to update RE_BANG_VERSION
|
||||
"""
|
||||
# pylint: disable=C0116
|
||||
|
||||
import sys
|
||||
import json
|
||||
import re
|
||||
from os.path import realpath, dirname, join
|
||||
|
||||
import requests
|
||||
|
||||
# set path
|
||||
sys.path.append(realpath(dirname(realpath(__file__)) + '/../'))
|
||||
|
||||
from searx import searx_dir # pylint: disable=E0401 C0413
|
||||
|
||||
|
||||
# from https://duckduckgo.com/newbang
|
||||
URL_BV1 = 'https://duckduckgo.com/bv1.js'
|
||||
RE_BANG_VERSION = re.compile(r'\/bang\.v([0-9]+)\.js')
|
||||
HTTPS_COLON = 'https:'
|
||||
HTTP_COLON = 'http:'
|
||||
|
||||
|
||||
def get_bang_url():
|
||||
response = requests.get(URL_BV1)
|
||||
response.raise_for_status()
|
||||
|
||||
r = RE_BANG_VERSION.findall(response.text)
|
||||
return f'https://duckduckgo.com/bang.v{r[0]}.js', r[0]
|
||||
|
||||
|
||||
def fetch_ddg_bangs(url):
|
||||
response = requests.get(url)
|
||||
response.raise_for_status()
|
||||
return json.loads(response.content.decode())
|
||||
|
||||
|
||||
def merge_when_no_leaf(node):
|
||||
"""Minimize the number of nodes
|
||||
|
||||
A -> B -> C
|
||||
B is child of A
|
||||
C is child of B
|
||||
|
||||
If there are no C equals to '*', then each C are merged into A
|
||||
|
||||
For example:
|
||||
d -> d -> g -> * (ddg*)
|
||||
-> i -> g -> * (dig*)
|
||||
becomes
|
||||
d -> dg -> *
|
||||
-> ig -> *
|
||||
"""
|
||||
restart = False
|
||||
if not isinstance(node, dict):
|
||||
return
|
||||
|
||||
# create a copy of the keys so node can be modified
|
||||
keys = list(node.keys())
|
||||
|
||||
for key in keys:
|
||||
if key == '*':
|
||||
continue
|
||||
|
||||
value = node[key]
|
||||
value_keys = list(value.keys())
|
||||
if '*' not in value_keys:
|
||||
for value_key in value_keys:
|
||||
node[key + value_key] = value[value_key]
|
||||
merge_when_no_leaf(node[key + value_key])
|
||||
del node[key]
|
||||
restart = True
|
||||
else:
|
||||
merge_when_no_leaf(value)
|
||||
|
||||
if restart:
|
||||
merge_when_no_leaf(node)
|
||||
|
||||
|
||||
def optimize_leaf(parent, parent_key, node):
|
||||
if not isinstance(node, dict):
|
||||
return
|
||||
|
||||
if len(node) == 1 and '*' in node and parent is not None:
|
||||
parent[parent_key] = node['*']
|
||||
else:
|
||||
for key, value in node.items():
|
||||
optimize_leaf(node, key, value)
|
||||
|
||||
|
||||
def parse_ddg_bangs(ddg_bangs):
|
||||
bang_trie = {}
|
||||
bang_urls = {}
|
||||
|
||||
for bang_definition in ddg_bangs:
|
||||
# bang_list
|
||||
bang_url = bang_definition['u']
|
||||
if '{{{s}}}' not in bang_url:
|
||||
# ignore invalid bang
|
||||
continue
|
||||
|
||||
bang_url = bang_url.replace('{{{s}}}', chr(2))
|
||||
|
||||
# only for the https protocol: "https://example.com" becomes "//example.com"
|
||||
if bang_url.startswith(HTTPS_COLON + '//'):
|
||||
bang_url = bang_url[len(HTTPS_COLON):]
|
||||
|
||||
#
|
||||
if bang_url.startswith(HTTP_COLON + '//') and bang_url[len(HTTP_COLON):] in bang_urls:
|
||||
# if the bang_url uses the http:// protocol, and the same URL exists in https://
|
||||
# then reuse the https:// bang definition. (written //example.com)
|
||||
bang_def_output = bang_urls[bang_url[len(HTTP_COLON):]]
|
||||
else:
|
||||
# normal use case : new http:// URL or https:// URL (without "https:", see above)
|
||||
bang_rank = str(bang_definition['r'])
|
||||
bang_def_output = bang_url + chr(1) + bang_rank
|
||||
bang_def_output = bang_urls.setdefault(bang_url, bang_def_output)
|
||||
|
||||
bang_urls[bang_url] = bang_def_output
|
||||
|
||||
# bang name
|
||||
bang = bang_definition['t']
|
||||
|
||||
# bang_trie
|
||||
t = bang_trie
|
||||
for bang_letter in bang:
|
||||
t = t.setdefault(bang_letter, {})
|
||||
t = t.setdefault('*', bang_def_output)
|
||||
|
||||
# optimize the trie
|
||||
merge_when_no_leaf(bang_trie)
|
||||
optimize_leaf(None, None, bang_trie)
|
||||
|
||||
return bang_trie
|
||||
|
||||
|
||||
def get_bangs_filename():
|
||||
return join(join(searx_dir, "data"), "external_bangs.json")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
bangs_url, bangs_version = get_bang_url()
|
||||
print(f'fetch bangs from {bangs_url}')
|
||||
output = {
|
||||
'version': bangs_version,
|
||||
'trie': parse_ddg_bangs(fetch_ddg_bangs(bangs_url))
|
||||
}
|
||||
with open(get_bangs_filename(), 'w') as fp:
|
||||
json.dump(output, fp, ensure_ascii=False, indent=4)
|
Loading…
Reference in New Issue