Use wget-at with ZSTD.

pull/5/head
arkiver 4 years ago
parent 831f79f0d9
commit 40063adcaf

5
.gitignore vendored

@ -1,4 +1,7 @@
*~ *~
*.pyc *.pyc
data/
wget-lua wget-lua
wget-at
STOP
BANNED
data/

@ -0,0 +1,9 @@
FROM warcforceone/grab-base
RUN echo deb http://deb.debian.org/debian buster-backports main contrib > /etc/apt/sources.list.d/backports.list \
&& DEBIAN_FRONTEND=noninteractive DEBIAN_PRIORITY=critical apt-get -qqy --no-install-recommends -o Dpkg::Options::=--force-confdef -o Dpkg::Options::=--force-confold -o Dpkg::Options::=--force-unsafe-io update \
&& DEBIAN_FRONTEND=noninteractive DEBIAN_PRIORITY=critical apt-get -qqy --no-install-recommends -o Dpkg::Options::=--force-confdef -o Dpkg::Options::=--force-confold -o Dpkg::Options::=--force-unsafe-io install lua-socket \
&& DEBIAN_FRONTEND=noninteractive DEBIAN_PRIORITY=critical apt-get -qqy --no-install-recommends -o Dpkg::Options::=--force-confdef -o Dpkg::Options::=--force-confold -o Dpkg::Options::=--force-unsafe-io -t buster-backports install zstd libzstd-dev libzstd1 \
&& pip install zstandard
COPY . /grab
RUN wget -O /grab/wget-at http://xor.meo.ws/-qt0VqH8KqsCm5xAkw1Pc7oeXToeyU0u/wget-lua \
&& chmod +x /grab/wget-at

@ -163,5 +163,5 @@ Are you a developer? Help write code for us! Look at our [developer documentatio
### Other problems ### Other problems
Have an issue not listed here? Join us on IRC and ask! We can be found at irc.efnet.org #shreddit. Have an issue not listed here? Join us on IRC and ask! We can be found on IRC hackint #shreddit.

@ -1,6 +1,6 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# #
# This script downloads and compiles wget-lua. # This script clones and compiles wget-lua.
# #
# first, try to detect gnutls or openssl # first, try to detect gnutls or openssl
@ -18,35 +18,35 @@ then
fi fi
fi fi
WGET_DOWNLOAD_URL="http://warriorhq.archiveteam.org/downloads/wget-lua/wget-1.14.lua.LATEST.tar.bz2" if ! zstd --version | grep -q 1.4.4
then
echo "Need version 1.4.4 of libzstd-dev and zstd"
exit 1
fi
rm -rf get-wget-lua.tmp/ rm -rf get-wget-lua.tmp/
mkdir -p get-wget-lua.tmp mkdir -p get-wget-lua.tmp
cd get-wget-lua.tmp cd get-wget-lua.tmp
if builtin type -p curl &>/dev/null git clone https://github.com/archiveteam/wget-lua.git
then
curl -L $WGET_DOWNLOAD_URL | tar -xj --strip-components=1 cd wget-lua
elif builtin type -p wget &>/dev/null git checkout v1.20.3-at
then
wget --output-document=- $WGET_DOWNLOAD_URL | tar -xj --strip-components=1 #echo -n 1.20.3-at-lua | tee ./.version ./.tarball-version > /dev/null
else
echo "You need Curl or Wget to download the source files."
exit 1
fi
if ./configure $CONFIGURE_SSL_OPT --disable-nls && make && src/wget -V | grep -q lua if ./bootstrap && ./configure $CONFIGURE_SSL_OPT --disable-nls && make && src/wget -V | grep -q lua
then then
cp src/wget ../wget-lua cp src/wget ../../wget-at
cd ../ cd ../../
echo echo
echo echo
echo "###################################################################" echo "###################################################################"
echo echo
echo "wget-lua successfully built." echo "wget-lua successfully built."
echo echo
./wget-lua --help | grep -iE "gnu|warc|lua" ./wget-at --help | grep -iE "gnu|warc|lua"
rm -rf get-wget-lua.tmp rm -rf get-wget-lua.tmp
exit 0 exit 0
else else

@ -16,8 +16,6 @@ import subprocess
import sys import sys
import time import time
import string import string
import re
import random
import seesaw import seesaw
from seesaw.externalprocess import WgetDownload from seesaw.externalprocess import WgetDownload
@ -27,8 +25,9 @@ from seesaw.util import find_executable
from tornado import httpclient from tornado import httpclient
import requests
import zstandard
# check the seesaw version
if StrictVersion(seesaw.__version__) < StrictVersion('0.8.5'): if StrictVersion(seesaw.__version__) < StrictVersion('0.8.5'):
raise Exception('This pipeline needs seesaw version 0.8.5 or higher.') raise Exception('This pipeline needs seesaw version 0.8.5 or higher.')
@ -36,25 +35,18 @@ if StrictVersion(seesaw.__version__) < StrictVersion('0.8.5'):
########################################################################### ###########################################################################
# Find a useful Wget+Lua executable. # Find a useful Wget+Lua executable.
# #
# WGET_LUA will be set to the first path that # WGET_AT will be set to the first path that
# 1. does not crash with --version, and # 1. does not crash with --version, and
# 2. prints the required version string # 2. prints the required version string
WGET_LUA = find_executable(
'Wget+Lua', WGET_AT = find_executable(
['GNU Wget 1.14.lua.20130523-9a5c', 'GNU Wget 1.14.lua.20160530-955376b'], 'Wget+AT',
[ ['GNU Wget 1.20.3-at.20200401.01'],
'./wget-lua', ['./wget-at']
'./wget-lua-warrior',
'./wget-lua-local',
'../wget-lua',
'../../wget-lua',
'/home/warrior/wget-lua',
'/usr/bin/wget-lua'
]
) )
if not WGET_LUA: if not WGET_AT:
raise Exception('No usable Wget+Lua found.') raise Exception('No usable Wget+At found.')
########################################################################### ###########################################################################
@ -62,10 +54,10 @@ if not WGET_LUA:
# #
# Update this each time you make a non-cosmetic change. # Update this each time you make a non-cosmetic change.
# It will be added to the WARC files and reported to the tracker. # It will be added to the WARC files and reported to the tracker.
VERSION = '20200102.03' VERSION = '20200701.01'
USER_AGENT = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; WOW64; Trident/4.0; SLCC1)' USER_AGENT = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; WOW64; Trident/4.0; SLCC1)'
TRACKER_ID = 'reddit' TRACKER_ID = 'reddit'
TRACKER_HOST = 'tracker.archiveteam.org' TRACKER_HOST = 'trackerproxy.meo.ws'
########################################################################### ###########################################################################
@ -115,8 +107,7 @@ class PrepareDirectories(SimpleTask):
def process(self, item): def process(self, item):
item_name = item['item_name'] item_name = item['item_name']
escaped_item_name = item_name.replace(':', '_').replace('/', '_').replace('~', '_') escaped_item_name = item_name.replace(':', '_').replace('/', '_').replace('~', '_')
item_hash = hashlib.sha1(item_name.encode('utf-8')).hexdigest() dirname = '/'.join((item['data_dir'], escaped_item_name))
dirname = '/'.join((item['data_dir'], item_hash))
if os.path.isdir(dirname): if os.path.isdir(dirname):
shutil.rmtree(dirname) shutil.rmtree(dirname)
@ -124,83 +115,21 @@ class PrepareDirectories(SimpleTask):
os.makedirs(dirname) os.makedirs(dirname)
item['item_dir'] = dirname item['item_dir'] = dirname
item['warc_file_base'] = '%s-%s-%s' % (self.warc_prefix, item_hash, item['warc_file_base'] = '%s-%s-%s' % (self.warc_prefix, escaped_item_name[:50],
time.strftime('%Y%m%d-%H%M%S')) time.strftime('%Y%m%d-%H%M%S'))
open('%(item_dir)s/%(warc_file_base)s.warc.gz' % item, 'w').close() open('%(item_dir)s/%(warc_file_base)s.warc.zst' % item, 'w').close()
open('%(item_dir)s/%(warc_file_base)s_data.txt' % item, 'w').close() open('%(item_dir)s/%(warc_file_base)s_data.txt' % item, 'w').close()
class Deduplicate(SimpleTask):
def __init__(self):
SimpleTask.__init__(self, 'Deduplicate')
def process(self, item):
digests = {}
input_filename = '%(item_dir)s/%(warc_file_base)s.warc.gz' % item
output_filename = '%(item_dir)s/%(warc_file_base)s-deduplicated.warc.gz' % item
with open(input_filename, 'rb') as f_in, \
open(output_filename, 'wb') as f_out:
writer = WARCWriter(filebuf=f_out, gzip=True)
for record in ArchiveIterator(f_in):
url = record.rec_headers.get_header('WARC-Target-URI')
if url is not None and url.startswith('<'):
url = re.search('^<(.+)>$', url).group(1)
record.rec_headers.replace_header('WARC-Target-URI', url)
if record.rec_headers.get_header('WARC-Type') == 'response':
digest = record.rec_headers.get_header('WARC-Payload-Digest')
if digest in digests:
writer.write_record(
self._record_response_to_revisit(writer, record,
digests[digest])
)
else:
digests[digest] = (
record.rec_headers.get_header('WARC-Record-ID'),
record.rec_headers.get_header('WARC-Date'),
record.rec_headers.get_header('WARC-Target-URI')
)
writer.write_record(record)
elif record.rec_headers.get_header('WARC-Type') == 'warcinfo':
record.rec_headers.replace_header('WARC-Filename', output_filename)
writer.write_record(record)
else:
writer.write_record(record)
def _record_response_to_revisit(self, writer, record, duplicate):
warc_headers = record.rec_headers
warc_headers.replace_header('WARC-Refers-To', duplicate[0])
warc_headers.replace_header('WARC-Refers-To-Date', duplicate[1])
warc_headers.replace_header('WARC-Refers-To-Target-URI', duplicate[2])
warc_headers.replace_header('WARC-Type', 'revisit')
warc_headers.replace_header('WARC-Truncated', 'length')
warc_headers.replace_header('WARC-Profile',
'http://netpreserve.org/warc/1.0/' \
'revisit/identical-payload-digest')
warc_headers.remove_header('WARC-Block-Digest')
warc_headers.remove_header('Content-Length')
return writer.create_warc_record(
record.rec_headers.get_header('WARC-Target-URI'),
'revisit',
warc_headers=warc_headers,
http_headers=record.http_headers
)
class MoveFiles(SimpleTask): class MoveFiles(SimpleTask):
def __init__(self): def __init__(self):
SimpleTask.__init__(self, 'MoveFiles') SimpleTask.__init__(self, 'MoveFiles')
def process(self, item): def process(self, item):
if os.path.exists('%(item_dir)s/%(warc_file_base)s.warc' % item): os.rename('%(item_dir)s/%(warc_file_base)s.warc.zst' % item,
raise Exception('Please compile wget with zlib support!') '%(data_dir)s/%(warc_file_base)s.%(dict_project)s.%(dict_id)s.warc.zst' % item)
#os.rename('%(item_dir)s/%(warc_file_base)s-deduplicated.warc.gz' % item,
# '%(data_dir)s/%(warc_file_base)s-deduplicated.warc.gz' % item)
os.rename('%(item_dir)s/%(warc_file_base)s.warc.gz' % item,
'%(data_dir)s/%(warc_file_base)s.warc.gz' % item)
os.rename('%(item_dir)s/%(warc_file_base)s_data.txt' % item, os.rename('%(item_dir)s/%(warc_file_base)s_data.txt' % item,
'%(data_dir)s/%(warc_file_base)s_data.txt' % item) '%(data_dir)s/%(warc_file_base)s_data.txt' % item)
shutil.rmtree('%(item_dir)s' % item) shutil.rmtree('%(item_dir)s' % item)
@ -209,14 +138,11 @@ def get_hash(filename):
with open(filename, 'rb') as in_file: with open(filename, 'rb') as in_file:
return hashlib.sha1(in_file.read()).hexdigest() return hashlib.sha1(in_file.read()).hexdigest()
CWD = os.getcwd() CWD = os.getcwd()
PIPELINE_SHA1 = get_hash(os.path.join(CWD, 'pipeline.py')) PIPELINE_SHA1 = get_hash(os.path.join(CWD, 'pipeline.py'))
LUA_SHA1 = get_hash(os.path.join(CWD, 'reddit.lua')) LUA_SHA1 = get_hash(os.path.join(CWD, 'reddit.lua'))
def stats_id_function(item): def stats_id_function(item):
# NEW for 2014! Some accountability hashes and stats.
d = { d = {
'pipeline_hash': PIPELINE_SHA1, 'pipeline_hash': PIPELINE_SHA1,
'lua_hash': LUA_SHA1, 'lua_hash': LUA_SHA1,
@ -226,6 +152,41 @@ def stats_id_function(item):
return d return d
class ZstdDict(object):
created = 0
data = None
@classmethod
def get_dict(cls):
if cls.data is not None and time.time() - cls.created < 1800:
return cls.data
response = requests.get(
'http://tracker.archiveteam.org:25654/dictionary',
params={
'project': TRACKER_ID
}
)
response.raise_for_status()
response = response.json()
if cls.data is not None and response['id'] == cls.data['id']:
cls.created = time.time()
return cls.data
print('Downloading latest dictionary.')
response_dict = requests.get(response['url'])
response_dict.raise_for_status()
raw_data = response_dict.content
if hashlib.sha256(raw_data).hexdigest() != response['sha256']:
raise ValueError('Hash of downloaded dictionary does not match.')
if raw_data[:4] == b'\x28\xB5\x2F\xFD':
raw_data = zstandard.ZstdDecompressor().decompress(raw_data)
cls.data = {
'id': response['id'],
'dict': raw_data
}
cls.created = time.time()
return cls.data
class WgetArgs(object): class WgetArgs(object):
post_chars = string.digits + string.ascii_lowercase post_chars = string.digits + string.ascii_lowercase
@ -237,11 +198,12 @@ class WgetArgs(object):
def realize(self, item): def realize(self, item):
wget_args = [ wget_args = [
WGET_LUA, WGET_AT,
'-U', USER_AGENT, '-U', USER_AGENT,
'-nv', '-nv',
'--no-cookies',
'--content-on-error',
'--lua-script', 'reddit.lua', '--lua-script', 'reddit.lua',
'--load-cookies', 'cookies.txt',
'-o', ItemInterpolation('%(item_dir)s/wget.log'), '-o', ItemInterpolation('%(item_dir)s/wget.log'),
'--no-check-certificate', '--no-check-certificate',
'--output-document', ItemInterpolation('%(item_dir)s/wget.tmp'), '--output-document', ItemInterpolation('%(item_dir)s/wget.tmp'),
@ -259,16 +221,28 @@ class WgetArgs(object):
'--warc-file', ItemInterpolation('%(item_dir)s/%(warc_file_base)s'), '--warc-file', ItemInterpolation('%(item_dir)s/%(warc_file_base)s'),
'--warc-header', 'operator: Archive Team', '--warc-header', 'operator: Archive Team',
'--warc-header', 'reddit-dld-script-version: ' + VERSION, '--warc-header', 'reddit-dld-script-version: ' + VERSION,
'--warc-header', ItemInterpolation('reddit-item: %(item_name)s') '--warc-header', ItemInterpolation('reddit-item: %(item_name)s'),
'--warc-dedup-url-agnostic',
'--warc-compression-use-zstd',
'--warc-zstd-dict-no-include'
] ]
dict_data = ZstdDict.get_dict()
with open(os.path.join(item['item_dir'], 'zstdict'), 'wb') as f:
f.write(dict_data['dict'])
item['dict_id'] = dict_data['id']
item['dict_project'] = TRACKER_ID
wget_args.extend([
'--warc-zstd-dict', ItemInterpolation('%(item_dir)s/zstdict'),
])
item_name = item['item_name'] item_name = item['item_name']
item_type, item_value = item_name.split(':', 1) item_type, item_value = item_name.split(':', 1)
item['item_type'] = item_type item['item_type'] = item_type
item['item_value'] = item_value item['item_value'] = item_value
if item_type in ('posts'): if item_type == 'posts':
start, end = item_value.split('-') start, end = item_value.split('-')
for i in range(int(start), int(end)+1): for i in range(int(start), int(end)+1):
post_id = self.int_to_str(i) post_id = self.int_to_str(i)
@ -277,14 +251,14 @@ class WgetArgs(object):
#wget_args.append('https://old.reddit.com/comments/{}'.format(post_id)) #wget_args.append('https://old.reddit.com/comments/{}'.format(post_id))
else: else:
raise Exception('Unknown item') raise Exception('Unknown item')
if 'bind_address' in globals(): if 'bind_address' in globals():
wget_args.extend(['--bind-address', globals()['bind_address']]) wget_args.extend(['--bind-address', globals()['bind_address']])
print('') print('')
print('*** Wget will bind address at {0} ***'.format( print('*** Wget will bind address at {0} ***'.format(
globals()['bind_address'])) globals()['bind_address']))
print('') print('')
return realize(wget_args, item) return realize(wget_args, item)
########################################################################### ###########################################################################
@ -314,21 +288,20 @@ pipeline = Pipeline(
'item_dir': ItemValue('item_dir'), 'item_dir': ItemValue('item_dir'),
'item_value': ItemValue('item_value'), 'item_value': ItemValue('item_value'),
'item_type': ItemValue('item_type'), 'item_type': ItemValue('item_type'),
'warc_file_base': ItemValue('warc_file_base') 'warc_file_base': ItemValue('warc_file_base'),
} }
), ),
PrepareStatsForTracker( PrepareStatsForTracker(
defaults={'downloader': downloader, 'version': VERSION}, defaults={'downloader': downloader, 'version': VERSION},
file_groups={ file_groups={
'data': [ 'data': [
ItemInterpolation('%(item_dir)s/%(warc_file_base)s.warc.gz') ItemInterpolation('%(item_dir)s/%(warc_file_base)s.warc.zst')
#ItemInterpolation('%(item_dir)s/%(warc_file_base)s-deduplicated.warc.gz')
] ]
}, },
id_function=stats_id_function, id_function=stats_id_function,
), ),
MoveFiles(), MoveFiles(),
LimitConcurrent(NumberConfigValue(min=1, max=20, default='20', LimitConcurrent(NumberConfigValue(min=1, max=20, default='2',
name='shared:rsync_threads', title='Rsync threads', name='shared:rsync_threads', title='Rsync threads',
description='The maximum number of concurrent uploads.'), description='The maximum number of concurrent uploads.'),
UploadWithTracker( UploadWithTracker(
@ -336,13 +309,11 @@ pipeline = Pipeline(
downloader=downloader, downloader=downloader,
version=VERSION, version=VERSION,
files=[ files=[
ItemInterpolation('%(data_dir)s/%(warc_file_base)s.warc.gz'), ItemInterpolation('%(data_dir)s/%(warc_file_base)s.%(dict_project)s.%(dict_id)s.warc.zst'),
#ItemInterpolation('%(data_dir)s/%(warc_file_base)s-deduplicated.warc.gz'),
ItemInterpolation('%(data_dir)s/%(warc_file_base)s_data.txt') ItemInterpolation('%(data_dir)s/%(warc_file_base)s_data.txt')
], ],
rsync_target_source_path=ItemInterpolation('%(data_dir)s/'), rsync_target_source_path=ItemInterpolation('%(data_dir)s/'),
rsync_extra_args=[ rsync_extra_args=[
'--sockopts=SO_SNDBUF=8388608,SO_RCVBUF=8388608',
'--recursive', '--recursive',
'--partial', '--partial',
'--partial-dir', '.rsync-tmp', '--partial-dir', '.rsync-tmp',
@ -350,11 +321,10 @@ pipeline = Pipeline(
'--no-compress', '--no-compress',
'--compress-level', '0' '--compress-level', '0'
] ]
), ),
), ),
SendDoneToTracker( SendDoneToTracker(
tracker_url='http://%s/%s' % (TRACKER_HOST, TRACKER_ID), tracker_url='http://%s/%s' % (TRACKER_HOST, TRACKER_ID),
stats=ItemValue('stats') stats=ItemValue('stats')
) )
) )

Loading…
Cancel
Save