From 40063adcaf4825af4cc342897eaffaff511139fa Mon Sep 17 00:00:00 2001 From: arkiver Date: Tue, 30 Jun 2020 19:11:06 -0400 Subject: [PATCH] Use wget-at with ZSTD. --- .gitignore | 5 +- Dockerfile | 9 +++ README.md | 2 +- get-wget-lua.sh | 32 ++++----- pipeline.py | 188 ++++++++++++++++++++---------------------------- 5 files changed, 109 insertions(+), 127 deletions(-) create mode 100644 Dockerfile diff --git a/.gitignore b/.gitignore index 44ebf6a..2f2421b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,7 @@ *~ *.pyc -data/ wget-lua +wget-at +STOP +BANNED +data/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ec76068 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,9 @@ +FROM warcforceone/grab-base +RUN echo deb http://deb.debian.org/debian buster-backports main contrib > /etc/apt/sources.list.d/backports.list \ + && DEBIAN_FRONTEND=noninteractive DEBIAN_PRIORITY=critical apt-get -qqy --no-install-recommends -o Dpkg::Options::=--force-confdef -o Dpkg::Options::=--force-confold -o Dpkg::Options::=--force-unsafe-io update \ + && DEBIAN_FRONTEND=noninteractive DEBIAN_PRIORITY=critical apt-get -qqy --no-install-recommends -o Dpkg::Options::=--force-confdef -o Dpkg::Options::=--force-confold -o Dpkg::Options::=--force-unsafe-io install lua-socket \ + && DEBIAN_FRONTEND=noninteractive DEBIAN_PRIORITY=critical apt-get -qqy --no-install-recommends -o Dpkg::Options::=--force-confdef -o Dpkg::Options::=--force-confold -o Dpkg::Options::=--force-unsafe-io -t buster-backports install zstd libzstd-dev libzstd1 \ + && pip install zstandard +COPY . /grab +RUN wget -O /grab/wget-at http://xor.meo.ws/-qt0VqH8KqsCm5xAkw1Pc7oeXToeyU0u/wget-lua \ + && chmod +x /grab/wget-at diff --git a/README.md b/README.md index d3f2bf1..1034f89 100644 --- a/README.md +++ b/README.md @@ -163,5 +163,5 @@ Are you a developer? Help write code for us! Look at our [developer documentatio ### Other problems -Have an issue not listed here? Join us on IRC and ask! We can be found at irc.efnet.org #shreddit. +Have an issue not listed here? Join us on IRC and ask! We can be found on IRC hackint #shreddit. diff --git a/get-wget-lua.sh b/get-wget-lua.sh index d387dfa..9ce79e6 100755 --- a/get-wget-lua.sh +++ b/get-wget-lua.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash # -# This script downloads and compiles wget-lua. +# This script clones and compiles wget-lua. # # first, try to detect gnutls or openssl @@ -18,35 +18,35 @@ then fi fi -WGET_DOWNLOAD_URL="http://warriorhq.archiveteam.org/downloads/wget-lua/wget-1.14.lua.LATEST.tar.bz2" +if ! zstd --version | grep -q 1.4.4 +then + echo "Need version 1.4.4 of libzstd-dev and zstd" + exit 1 +fi rm -rf get-wget-lua.tmp/ mkdir -p get-wget-lua.tmp cd get-wget-lua.tmp -if builtin type -p curl &>/dev/null -then - curl -L $WGET_DOWNLOAD_URL | tar -xj --strip-components=1 -elif builtin type -p wget &>/dev/null -then - wget --output-document=- $WGET_DOWNLOAD_URL | tar -xj --strip-components=1 -else - echo "You need Curl or Wget to download the source files." - exit 1 -fi +git clone https://github.com/archiveteam/wget-lua.git + +cd wget-lua +git checkout v1.20.3-at + +#echo -n 1.20.3-at-lua | tee ./.version ./.tarball-version > /dev/null -if ./configure $CONFIGURE_SSL_OPT --disable-nls && make && src/wget -V | grep -q lua +if ./bootstrap && ./configure $CONFIGURE_SSL_OPT --disable-nls && make && src/wget -V | grep -q lua then - cp src/wget ../wget-lua - cd ../ + cp src/wget ../../wget-at + cd ../../ echo echo echo "###################################################################" echo echo "wget-lua successfully built." echo - ./wget-lua --help | grep -iE "gnu|warc|lua" + ./wget-at --help | grep -iE "gnu|warc|lua" rm -rf get-wget-lua.tmp exit 0 else diff --git a/pipeline.py b/pipeline.py index 9edc4b5..5d43834 100644 --- a/pipeline.py +++ b/pipeline.py @@ -16,8 +16,6 @@ import subprocess import sys import time import string -import re -import random import seesaw from seesaw.externalprocess import WgetDownload @@ -27,8 +25,9 @@ from seesaw.util import find_executable from tornado import httpclient +import requests +import zstandard -# check the seesaw version if StrictVersion(seesaw.__version__) < StrictVersion('0.8.5'): raise Exception('This pipeline needs seesaw version 0.8.5 or higher.') @@ -36,25 +35,18 @@ if StrictVersion(seesaw.__version__) < StrictVersion('0.8.5'): ########################################################################### # Find a useful Wget+Lua executable. # -# WGET_LUA will be set to the first path that +# WGET_AT will be set to the first path that # 1. does not crash with --version, and # 2. prints the required version string -WGET_LUA = find_executable( - 'Wget+Lua', - ['GNU Wget 1.14.lua.20130523-9a5c', 'GNU Wget 1.14.lua.20160530-955376b'], - [ - './wget-lua', - './wget-lua-warrior', - './wget-lua-local', - '../wget-lua', - '../../wget-lua', - '/home/warrior/wget-lua', - '/usr/bin/wget-lua' - ] + +WGET_AT = find_executable( + 'Wget+AT', + ['GNU Wget 1.20.3-at.20200401.01'], + ['./wget-at'] ) -if not WGET_LUA: - raise Exception('No usable Wget+Lua found.') +if not WGET_AT: + raise Exception('No usable Wget+At found.') ########################################################################### @@ -62,10 +54,10 @@ if not WGET_LUA: # # Update this each time you make a non-cosmetic change. # It will be added to the WARC files and reported to the tracker. -VERSION = '20200102.03' +VERSION = '20200701.01' USER_AGENT = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; WOW64; Trident/4.0; SLCC1)' TRACKER_ID = 'reddit' -TRACKER_HOST = 'tracker.archiveteam.org' +TRACKER_HOST = 'trackerproxy.meo.ws' ########################################################################### @@ -115,8 +107,7 @@ class PrepareDirectories(SimpleTask): def process(self, item): item_name = item['item_name'] escaped_item_name = item_name.replace(':', '_').replace('/', '_').replace('~', '_') - item_hash = hashlib.sha1(item_name.encode('utf-8')).hexdigest() - dirname = '/'.join((item['data_dir'], item_hash)) + dirname = '/'.join((item['data_dir'], escaped_item_name)) if os.path.isdir(dirname): shutil.rmtree(dirname) @@ -124,83 +115,21 @@ class PrepareDirectories(SimpleTask): os.makedirs(dirname) item['item_dir'] = dirname - item['warc_file_base'] = '%s-%s-%s' % (self.warc_prefix, item_hash, + item['warc_file_base'] = '%s-%s-%s' % (self.warc_prefix, escaped_item_name[:50], time.strftime('%Y%m%d-%H%M%S')) - open('%(item_dir)s/%(warc_file_base)s.warc.gz' % item, 'w').close() + open('%(item_dir)s/%(warc_file_base)s.warc.zst' % item, 'w').close() open('%(item_dir)s/%(warc_file_base)s_data.txt' % item, 'w').close() - -class Deduplicate(SimpleTask): - def __init__(self): - SimpleTask.__init__(self, 'Deduplicate') - - def process(self, item): - digests = {} - input_filename = '%(item_dir)s/%(warc_file_base)s.warc.gz' % item - output_filename = '%(item_dir)s/%(warc_file_base)s-deduplicated.warc.gz' % item - with open(input_filename, 'rb') as f_in, \ - open(output_filename, 'wb') as f_out: - writer = WARCWriter(filebuf=f_out, gzip=True) - for record in ArchiveIterator(f_in): - url = record.rec_headers.get_header('WARC-Target-URI') - if url is not None and url.startswith('<'): - url = re.search('^<(.+)>$', url).group(1) - record.rec_headers.replace_header('WARC-Target-URI', url) - if record.rec_headers.get_header('WARC-Type') == 'response': - digest = record.rec_headers.get_header('WARC-Payload-Digest') - if digest in digests: - writer.write_record( - self._record_response_to_revisit(writer, record, - digests[digest]) - ) - else: - digests[digest] = ( - record.rec_headers.get_header('WARC-Record-ID'), - record.rec_headers.get_header('WARC-Date'), - record.rec_headers.get_header('WARC-Target-URI') - ) - writer.write_record(record) - elif record.rec_headers.get_header('WARC-Type') == 'warcinfo': - record.rec_headers.replace_header('WARC-Filename', output_filename) - writer.write_record(record) - else: - writer.write_record(record) - - def _record_response_to_revisit(self, writer, record, duplicate): - warc_headers = record.rec_headers - warc_headers.replace_header('WARC-Refers-To', duplicate[0]) - warc_headers.replace_header('WARC-Refers-To-Date', duplicate[1]) - warc_headers.replace_header('WARC-Refers-To-Target-URI', duplicate[2]) - warc_headers.replace_header('WARC-Type', 'revisit') - warc_headers.replace_header('WARC-Truncated', 'length') - warc_headers.replace_header('WARC-Profile', - 'http://netpreserve.org/warc/1.0/' \ - 'revisit/identical-payload-digest') - warc_headers.remove_header('WARC-Block-Digest') - warc_headers.remove_header('Content-Length') - return writer.create_warc_record( - record.rec_headers.get_header('WARC-Target-URI'), - 'revisit', - warc_headers=warc_headers, - http_headers=record.http_headers - ) - - class MoveFiles(SimpleTask): def __init__(self): SimpleTask.__init__(self, 'MoveFiles') def process(self, item): - if os.path.exists('%(item_dir)s/%(warc_file_base)s.warc' % item): - raise Exception('Please compile wget with zlib support!') - - #os.rename('%(item_dir)s/%(warc_file_base)s-deduplicated.warc.gz' % item, - # '%(data_dir)s/%(warc_file_base)s-deduplicated.warc.gz' % item) - os.rename('%(item_dir)s/%(warc_file_base)s.warc.gz' % item, - '%(data_dir)s/%(warc_file_base)s.warc.gz' % item) + os.rename('%(item_dir)s/%(warc_file_base)s.warc.zst' % item, + '%(data_dir)s/%(warc_file_base)s.%(dict_project)s.%(dict_id)s.warc.zst' % item) os.rename('%(item_dir)s/%(warc_file_base)s_data.txt' % item, - '%(data_dir)s/%(warc_file_base)s_data.txt' % item) + '%(data_dir)s/%(warc_file_base)s_data.txt' % item) shutil.rmtree('%(item_dir)s' % item) @@ -209,14 +138,11 @@ def get_hash(filename): with open(filename, 'rb') as in_file: return hashlib.sha1(in_file.read()).hexdigest() - CWD = os.getcwd() PIPELINE_SHA1 = get_hash(os.path.join(CWD, 'pipeline.py')) LUA_SHA1 = get_hash(os.path.join(CWD, 'reddit.lua')) - def stats_id_function(item): - # NEW for 2014! Some accountability hashes and stats. d = { 'pipeline_hash': PIPELINE_SHA1, 'lua_hash': LUA_SHA1, @@ -226,6 +152,41 @@ def stats_id_function(item): return d +class ZstdDict(object): + created = 0 + data = None + + @classmethod + def get_dict(cls): + if cls.data is not None and time.time() - cls.created < 1800: + return cls.data + response = requests.get( + 'http://tracker.archiveteam.org:25654/dictionary', + params={ + 'project': TRACKER_ID + } + ) + response.raise_for_status() + response = response.json() + if cls.data is not None and response['id'] == cls.data['id']: + cls.created = time.time() + return cls.data + print('Downloading latest dictionary.') + response_dict = requests.get(response['url']) + response_dict.raise_for_status() + raw_data = response_dict.content + if hashlib.sha256(raw_data).hexdigest() != response['sha256']: + raise ValueError('Hash of downloaded dictionary does not match.') + if raw_data[:4] == b'\x28\xB5\x2F\xFD': + raw_data = zstandard.ZstdDecompressor().decompress(raw_data) + cls.data = { + 'id': response['id'], + 'dict': raw_data + } + cls.created = time.time() + return cls.data + + class WgetArgs(object): post_chars = string.digits + string.ascii_lowercase @@ -237,11 +198,12 @@ class WgetArgs(object): def realize(self, item): wget_args = [ - WGET_LUA, + WGET_AT, '-U', USER_AGENT, '-nv', + '--no-cookies', + '--content-on-error', '--lua-script', 'reddit.lua', - '--load-cookies', 'cookies.txt', '-o', ItemInterpolation('%(item_dir)s/wget.log'), '--no-check-certificate', '--output-document', ItemInterpolation('%(item_dir)s/wget.tmp'), @@ -259,16 +221,28 @@ class WgetArgs(object): '--warc-file', ItemInterpolation('%(item_dir)s/%(warc_file_base)s'), '--warc-header', 'operator: Archive Team', '--warc-header', 'reddit-dld-script-version: ' + VERSION, - '--warc-header', ItemInterpolation('reddit-item: %(item_name)s') + '--warc-header', ItemInterpolation('reddit-item: %(item_name)s'), + '--warc-dedup-url-agnostic', + '--warc-compression-use-zstd', + '--warc-zstd-dict-no-include' ] - + + dict_data = ZstdDict.get_dict() + with open(os.path.join(item['item_dir'], 'zstdict'), 'wb') as f: + f.write(dict_data['dict']) + item['dict_id'] = dict_data['id'] + item['dict_project'] = TRACKER_ID + wget_args.extend([ + '--warc-zstd-dict', ItemInterpolation('%(item_dir)s/zstdict'), + ]) + item_name = item['item_name'] item_type, item_value = item_name.split(':', 1) - + item['item_type'] = item_type item['item_value'] = item_value - if item_type in ('posts'): + if item_type == 'posts': start, end = item_value.split('-') for i in range(int(start), int(end)+1): post_id = self.int_to_str(i) @@ -277,14 +251,14 @@ class WgetArgs(object): #wget_args.append('https://old.reddit.com/comments/{}'.format(post_id)) else: raise Exception('Unknown item') - + if 'bind_address' in globals(): wget_args.extend(['--bind-address', globals()['bind_address']]) print('') print('*** Wget will bind address at {0} ***'.format( globals()['bind_address'])) print('') - + return realize(wget_args, item) ########################################################################### @@ -314,21 +288,20 @@ pipeline = Pipeline( 'item_dir': ItemValue('item_dir'), 'item_value': ItemValue('item_value'), 'item_type': ItemValue('item_type'), - 'warc_file_base': ItemValue('warc_file_base') + 'warc_file_base': ItemValue('warc_file_base'), } ), PrepareStatsForTracker( defaults={'downloader': downloader, 'version': VERSION}, file_groups={ 'data': [ - ItemInterpolation('%(item_dir)s/%(warc_file_base)s.warc.gz') - #ItemInterpolation('%(item_dir)s/%(warc_file_base)s-deduplicated.warc.gz') + ItemInterpolation('%(item_dir)s/%(warc_file_base)s.warc.zst') ] }, id_function=stats_id_function, ), MoveFiles(), - LimitConcurrent(NumberConfigValue(min=1, max=20, default='20', + LimitConcurrent(NumberConfigValue(min=1, max=20, default='2', name='shared:rsync_threads', title='Rsync threads', description='The maximum number of concurrent uploads.'), UploadWithTracker( @@ -336,13 +309,11 @@ pipeline = Pipeline( downloader=downloader, version=VERSION, files=[ - ItemInterpolation('%(data_dir)s/%(warc_file_base)s.warc.gz'), - #ItemInterpolation('%(data_dir)s/%(warc_file_base)s-deduplicated.warc.gz'), + ItemInterpolation('%(data_dir)s/%(warc_file_base)s.%(dict_project)s.%(dict_id)s.warc.zst'), ItemInterpolation('%(data_dir)s/%(warc_file_base)s_data.txt') ], rsync_target_source_path=ItemInterpolation('%(data_dir)s/'), rsync_extra_args=[ - '--sockopts=SO_SNDBUF=8388608,SO_RCVBUF=8388608', '--recursive', '--partial', '--partial-dir', '.rsync-tmp', @@ -350,11 +321,10 @@ pipeline = Pipeline( '--no-compress', '--compress-level', '0' ] - ), + ), ), SendDoneToTracker( tracker_url='http://%s/%s' % (TRACKER_HOST, TRACKER_ID), stats=ItemValue('stats') ) ) -