reddit-grab/pipeline.py

388 lines
13 KiB
Python
Raw Normal View History

2015-07-05 10:03:02 +00:00
# encoding=utf8
import datetime
from distutils.version import StrictVersion
import hashlib
import os.path
import random
import re
2015-07-05 10:03:02 +00:00
from seesaw.config import realize, NumberConfigValue
2019-02-22 00:15:18 +00:00
from seesaw.externalprocess import ExternalProcess
2015-07-05 10:03:02 +00:00
from seesaw.item import ItemInterpolation, ItemValue
from seesaw.task import SimpleTask, LimitConcurrent
from seesaw.tracker import GetItemFromTracker, PrepareStatsForTracker, \
UploadWithTracker, SendDoneToTracker
import shutil
import socket
import subprocess
import sys
import time
import string
2019-02-22 00:15:18 +00:00
2015-07-05 10:03:02 +00:00
import seesaw
from seesaw.externalprocess import WgetDownload
from seesaw.pipeline import Pipeline
from seesaw.project import Project
from seesaw.util import find_executable
2019-02-22 00:15:18 +00:00
from tornado import httpclient
2020-06-30 23:11:06 +00:00
import requests
import zstandard
2015-07-05 10:03:02 +00:00
2019-02-22 00:15:18 +00:00
if StrictVersion(seesaw.__version__) < StrictVersion('0.8.5'):
raise Exception('This pipeline needs seesaw version 0.8.5 or higher.')
2015-07-05 10:03:02 +00:00
###########################################################################
# Find a useful Wget+Lua executable.
#
2020-06-30 23:11:06 +00:00
# WGET_AT will be set to the first path that
2015-07-05 10:03:02 +00:00
# 1. does not crash with --version, and
# 2. prints the required version string
2020-06-30 23:11:06 +00:00
class HigherVersion:
def __init__(self, expression, min_version):
self._expression = re.compile(expression)
self._min_version = min_version
def search(self, text):
for result in self._expression.findall(text):
if result >= self._min_version:
print('Found version {}.'.format(result))
return True
2020-06-30 23:11:06 +00:00
WGET_AT = find_executable(
'Wget+AT',
HigherVersion(
r'(GNU Wget 1\.[0-9]{2}\.[0-9]{1}-at\.[0-9]{8}\.[0-9]{2})[^0-9a-zA-Z\.-_]',
'GNU Wget 1.21.3-at.20230623.01'
),
2021-02-03 01:18:34 +00:00
[
'./wget-at',
2023-11-18 15:25:31 +00:00
'/home/warrior/data/wget-at-gnutls'
2021-02-03 01:18:34 +00:00
]
2015-07-05 10:03:02 +00:00
)
2020-06-30 23:11:06 +00:00
if not WGET_AT:
raise Exception('No usable Wget+At found.')
2015-07-05 10:03:02 +00:00
###########################################################################
# The version number of this pipeline definition.
#
# Update this each time you make a non-cosmetic change.
# It will be added to the WARC files and reported to the tracker.
VERSION = '20231127.02'
TRACKER_ID = 'reddit'
2021-02-03 13:31:32 +00:00
TRACKER_HOST = 'legacy-api.arpa.li'
MULTI_ITEM_SIZE = 100
2015-07-05 10:03:02 +00:00
###########################################################################
# This section defines project-specific tasks.
#
# Simple tasks (tasks that do not need any concurrency) are based on the
# SimpleTask class and have a process(item) method that is called for
# each item.
class CheckIP(SimpleTask):
def __init__(self):
2019-02-22 00:15:18 +00:00
SimpleTask.__init__(self, 'CheckIP')
2015-07-05 10:03:02 +00:00
self._counter = 0
def process(self, item):
# NEW for 2014! Check if we are behind firewall/proxy
if self._counter <= 0:
item.log_output('Checking IP address.')
ip_set = set()
ip_set.add(socket.gethostbyname('twitter.com'))
#ip_set.add(socket.gethostbyname('facebook.com'))
2015-07-05 10:03:02 +00:00
ip_set.add(socket.gethostbyname('youtube.com'))
ip_set.add(socket.gethostbyname('microsoft.com'))
ip_set.add(socket.gethostbyname('icanhas.cheezburger.com'))
ip_set.add(socket.gethostbyname('archiveteam.org'))
if len(ip_set) != 5:
2015-07-05 10:03:02 +00:00
item.log_output('Got IP addresses: {0}'.format(ip_set))
item.log_output(
'Are you behind a firewall/proxy? That is a big no-no!')
raise Exception(
'Are you behind a firewall/proxy? That is a big no-no!')
# Check only occasionally
if self._counter <= 0:
self._counter = 10
else:
self._counter -= 1
class PrepareDirectories(SimpleTask):
def __init__(self, warc_prefix):
2019-02-22 00:15:18 +00:00
SimpleTask.__init__(self, 'PrepareDirectories')
2015-07-05 10:03:02 +00:00
self.warc_prefix = warc_prefix
def process(self, item):
2019-02-22 00:15:18 +00:00
item_name = item['item_name']
item_name_hash = hashlib.sha1(item_name.encode('utf8')).hexdigest()
escaped_item_name = item_name_hash
2020-06-30 23:11:06 +00:00
dirname = '/'.join((item['data_dir'], escaped_item_name))
2015-07-05 10:03:02 +00:00
if os.path.isdir(dirname):
shutil.rmtree(dirname)
os.makedirs(dirname)
2019-02-22 00:15:18 +00:00
item['item_dir'] = dirname
item['warc_file_base'] = '-'.join([
self.warc_prefix,
item_name_hash,
time.strftime('%Y%m%d-%H%M%S')
])
2015-07-05 10:03:02 +00:00
2020-06-30 23:11:06 +00:00
open('%(item_dir)s/%(warc_file_base)s.warc.zst' % item, 'w').close()
2019-02-22 00:15:18 +00:00
open('%(item_dir)s/%(warc_file_base)s_data.txt' % item, 'w').close()
2015-07-05 10:03:02 +00:00
class MoveFiles(SimpleTask):
def __init__(self):
2019-02-22 00:15:18 +00:00
SimpleTask.__init__(self, 'MoveFiles')
2015-07-05 10:03:02 +00:00
def process(self, item):
2020-06-30 23:11:06 +00:00
os.rename('%(item_dir)s/%(warc_file_base)s.warc.zst' % item,
'%(data_dir)s/%(warc_file_base)s.%(dict_project)s.%(dict_id)s.warc.zst' % item)
2019-02-22 00:15:18 +00:00
os.rename('%(item_dir)s/%(warc_file_base)s_data.txt' % item,
2020-06-30 23:11:06 +00:00
'%(data_dir)s/%(warc_file_base)s_data.txt' % item)
2015-07-05 10:03:02 +00:00
2019-02-22 00:15:18 +00:00
shutil.rmtree('%(item_dir)s' % item)
2015-07-05 10:03:02 +00:00
class SetBadUrls(SimpleTask):
def __init__(self):
SimpleTask.__init__(self, 'SetBadUrls')
def process(self, item):
item['item_name_original'] = item['item_name']
items = item['item_name'].split('\0')
items_lower = [s.lower() for s in items]
with open('%(item_dir)s/%(warc_file_base)s_bad-items.txt' % item, 'r') as f:
for aborted_item in f:
aborted_item = aborted_item.strip().lower()
index = items_lower.index(aborted_item)
item.log_output('Item {} is aborted.'.format(aborted_item))
items.pop(index)
items_lower.pop(index)
item['item_name'] = '\0'.join(items)
class MaybeSendDoneToTracker(SendDoneToTracker):
def enqueue(self, item):
if len(item['item_name']) == 0:
return self.complete_item(item)
return super(MaybeSendDoneToTracker, self).enqueue(item)
2015-07-05 10:03:02 +00:00
def get_hash(filename):
with open(filename, 'rb') as in_file:
return hashlib.sha1(in_file.read()).hexdigest()
CWD = os.getcwd()
PIPELINE_SHA1 = get_hash(os.path.join(CWD, 'pipeline.py'))
LUA_SHA1 = get_hash(os.path.join(CWD, 'reddit.lua'))
def stats_id_function(item):
d = {
'pipeline_hash': PIPELINE_SHA1,
'lua_hash': LUA_SHA1,
'python_version': sys.version,
}
return d
2020-06-30 23:11:06 +00:00
class ZstdDict(object):
created = 0
data = None
@classmethod
def get_dict(cls):
if cls.data is not None and time.time() - cls.created < 1800:
return cls.data
response = requests.get(
2021-02-25 01:58:24 +00:00
'https://legacy-api.arpa.li/dictionary',
2020-06-30 23:11:06 +00:00
params={
'project': 'reddit'
2020-06-30 23:11:06 +00:00
}
)
response.raise_for_status()
response = response.json()
if cls.data is not None and response['id'] == cls.data['id']:
cls.created = time.time()
return cls.data
print('Downloading latest dictionary.')
response_dict = requests.get(response['url'])
response_dict.raise_for_status()
raw_data = response_dict.content
if hashlib.sha256(raw_data).hexdigest() != response['sha256']:
raise ValueError('Hash of downloaded dictionary does not match.')
if raw_data[:4] == b'\x28\xB5\x2F\xFD':
raw_data = zstandard.ZstdDecompressor().decompress(raw_data)
cls.data = {
'id': response['id'],
'dict': raw_data
}
cls.created = time.time()
return cls.data
2015-07-05 10:03:02 +00:00
class WgetArgs(object):
2019-02-22 00:15:18 +00:00
post_chars = string.digits + string.ascii_lowercase
def int_to_str(self, i):
d, m = divmod(i, 36)
if d > 0:
return self.int_to_str(d) + self.post_chars[m]
return self.post_chars[m]
2015-07-05 10:03:02 +00:00
def realize(self, item):
with open('user-agents', 'r') as f:
user_agent = random.choice(list(f)).strip()
2015-07-05 10:03:02 +00:00
wget_args = [
2020-06-30 23:11:06 +00:00
WGET_AT,
'-U', user_agent,
2019-02-22 00:15:18 +00:00
'-nv',
'--host-lookups', 'dns',
'--hosts-file', '/dev/null',
'--resolvconf-file', '/dev/null',
'--dns-servers', '9.9.9.10,149.112.112.10,2620:fe::10,2620:fe::fe:10',
'--reject-reserved-subnets',
2021-01-14 15:54:55 +00:00
'--load-cookies', 'cookies.txt',
2020-06-30 23:11:06 +00:00
'--content-on-error',
'--no-http-keep-alive',
2019-02-22 00:15:18 +00:00
'--lua-script', 'reddit.lua',
'-o', ItemInterpolation('%(item_dir)s/wget.log'),
'--no-check-certificate',
'--output-document', ItemInterpolation('%(item_dir)s/wget.tmp'),
'--truncate-output',
'-e', 'robots=off',
'--rotate-dns',
'--recursive', '--level=inf',
'--no-parent',
'--page-requisites',
'--timeout', '30',
'--tries', 'inf',
'--domains', 'reddit.com',
'--span-hosts',
'--waitretry', '30',
'--warc-file', ItemInterpolation('%(item_dir)s/%(warc_file_base)s'),
'--warc-header', 'operator: Archive Team',
'--warc-header', 'x-wget-at-project-version: ' + VERSION,
'--warc-header', 'x-wget-at-project-name: ' + TRACKER_ID,
2020-06-30 23:11:06 +00:00
'--warc-dedup-url-agnostic',
'--warc-compression-use-zstd',
'--warc-zstd-dict-no-include',
'--header', 'Accept-Language: en-US;q=0.9, en;q=0.8',
'--ciphers', 'SECURE128'
2015-07-05 10:03:02 +00:00
]
2020-06-30 23:11:06 +00:00
dict_data = ZstdDict.get_dict()
with open(os.path.join(item['item_dir'], 'zstdict'), 'wb') as f:
f.write(dict_data['dict'])
item['dict_id'] = dict_data['id']
item['dict_project'] = 'reddit'
2020-06-30 23:11:06 +00:00
wget_args.extend([
'--warc-zstd-dict', ItemInterpolation('%(item_dir)s/zstdict'),
])
for item_name in item['item_name'].split('\0'):
wget_args.extend(['--warc-header', 'x-wget-at-project-item-name: '+item_name])
wget_args.append('item-name://'+item_name)
item_type, item_value = item_name.split(':', 1)
if item_type == 'post':
wget_args.extend(['--warc-header', 'reddit-post: '+item_value])
wget_args.append('https://www.reddit.com/api/info.json?id=t3_'+item_value)
elif item_type == 'comment':
wget_args.extend(['--warc-header', 'reddit-comment: '+item_value])
wget_args.append('https://www.reddit.com/api/info.json?id=t1_'+item_value)
elif item_type == 'url':
wget_args.extend(['--warc-header', 'reddit-media-url: '+item_value])
wget_args.append(item_value)
else:
raise Exception('Unknown item')
item['item_name_newline'] = item['item_name'].replace('\0', '\n')
2020-06-30 23:11:06 +00:00
2015-07-05 10:03:02 +00:00
if 'bind_address' in globals():
wget_args.extend(['--bind-address', globals()['bind_address']])
print('')
print('*** Wget will bind address at {0} ***'.format(
globals()['bind_address']))
print('')
2020-06-30 23:11:06 +00:00
2015-07-05 10:03:02 +00:00
return realize(wget_args, item)
###########################################################################
# Initialize the project.
#
# This will be shown in the warrior management panel. The logo should not
# be too big. The deadline is optional.
project = Project(
2019-02-22 00:15:18 +00:00
title='reddit',
project_html='''
<img class="project-logo" alt="Project logo" src="https://www.archiveteam.org/images/b/b5/Reddit_logo.png" height="50px" title=""/>
<h2>reddit.com <span class="links"><a href="https://reddit.com/">Website</a> &middot; <a href="http://tracker.archiveteam.org/reddit/">Leaderboard</a></span></h2>
<p>Archiving everything from reddit.</p>
'''
2015-07-05 10:03:02 +00:00
)
pipeline = Pipeline(
CheckIP(),
2021-01-08 21:40:09 +00:00
GetItemFromTracker('http://{}/{}/multi={}/'
.format(TRACKER_HOST, TRACKER_ID, MULTI_ITEM_SIZE),
downloader, VERSION),
2019-02-22 00:15:18 +00:00
PrepareDirectories(warc_prefix='reddit'),
2015-07-05 10:03:02 +00:00
WgetDownload(
WgetArgs(),
max_tries=2,
2019-02-22 00:15:18 +00:00
accept_on_exit_code=[0, 4, 8],
2015-07-05 10:03:02 +00:00
env={
2019-02-22 00:15:18 +00:00
'item_dir': ItemValue('item_dir'),
'item_names': ItemValue('item_name_newline'),
2020-06-30 23:11:06 +00:00
'warc_file_base': ItemValue('warc_file_base'),
2015-07-05 10:03:02 +00:00
}
),
SetBadUrls(),
2015-07-05 10:03:02 +00:00
PrepareStatsForTracker(
2019-02-22 00:15:18 +00:00
defaults={'downloader': downloader, 'version': VERSION},
2015-07-05 10:03:02 +00:00
file_groups={
2019-02-22 00:15:18 +00:00
'data': [
2020-06-30 23:11:06 +00:00
ItemInterpolation('%(item_dir)s/%(warc_file_base)s.warc.zst')
2015-07-05 10:03:02 +00:00
]
},
id_function=stats_id_function,
),
2021-01-08 21:42:03 +00:00
MoveFiles(),
2020-06-30 23:12:32 +00:00
LimitConcurrent(NumberConfigValue(min=1, max=20, default='20',
2019-02-22 00:15:18 +00:00
name='shared:rsync_threads', title='Rsync threads',
description='The maximum number of concurrent uploads.'),
2015-07-05 10:03:02 +00:00
UploadWithTracker(
2019-02-22 00:15:18 +00:00
'http://%s/%s' % (TRACKER_HOST, TRACKER_ID),
2015-07-05 10:03:02 +00:00
downloader=downloader,
version=VERSION,
files=[
2020-06-30 23:11:06 +00:00
ItemInterpolation('%(data_dir)s/%(warc_file_base)s.%(dict_project)s.%(dict_id)s.warc.zst'),
2019-02-22 00:15:18 +00:00
ItemInterpolation('%(data_dir)s/%(warc_file_base)s_data.txt')
2015-07-05 10:03:02 +00:00
],
2019-02-22 00:15:18 +00:00
rsync_target_source_path=ItemInterpolation('%(data_dir)s/'),
2015-07-05 10:03:02 +00:00
rsync_extra_args=[
2019-02-22 00:15:18 +00:00
'--recursive',
'--min-size', '1',
'--no-compress',
'--compress-level', '0'
2015-07-05 10:03:02 +00:00
]
2020-06-30 23:11:06 +00:00
),
2015-07-05 10:03:02 +00:00
),
MaybeSendDoneToTracker(
2019-02-22 00:15:18 +00:00
tracker_url='http://%s/%s' % (TRACKER_HOST, TRACKER_ID),
stats=ItemValue('stats')
2015-07-05 10:03:02 +00:00
)
)