You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

361 lines
13 KiB
Python

9 years ago
# encoding=utf8
import datetime
from distutils.version import StrictVersion
import hashlib
import os.path
import random
from seesaw.config import realize, NumberConfigValue
5 years ago
from seesaw.externalprocess import ExternalProcess
9 years ago
from seesaw.item import ItemInterpolation, ItemValue
from seesaw.task import SimpleTask, LimitConcurrent
from seesaw.tracker import GetItemFromTracker, PrepareStatsForTracker, \
UploadWithTracker, SendDoneToTracker
import shutil
import socket
import subprocess
import sys
import time
import string
import re
5 years ago
import random
9 years ago
import seesaw
from seesaw.externalprocess import WgetDownload
from seesaw.pipeline import Pipeline
from seesaw.project import Project
from seesaw.util import find_executable
5 years ago
from tornado import httpclient
9 years ago
# check the seesaw version
5 years ago
if StrictVersion(seesaw.__version__) < StrictVersion('0.8.5'):
raise Exception('This pipeline needs seesaw version 0.8.5 or higher.')
9 years ago
###########################################################################
# Find a useful Wget+Lua executable.
#
# WGET_LUA will be set to the first path that
# 1. does not crash with --version, and
# 2. prints the required version string
WGET_LUA = find_executable(
5 years ago
'Wget+Lua',
['GNU Wget 1.14.lua.20130523-9a5c', 'GNU Wget 1.14.lua.20160530-955376b'],
9 years ago
[
5 years ago
'./wget-lua',
'./wget-lua-warrior',
'./wget-lua-local',
'../wget-lua',
'../../wget-lua',
'/home/warrior/wget-lua',
'/usr/bin/wget-lua'
9 years ago
]
)
if not WGET_LUA:
5 years ago
raise Exception('No usable Wget+Lua found.')
9 years ago
###########################################################################
# The version number of this pipeline definition.
#
# Update this each time you make a non-cosmetic change.
# It will be added to the WARC files and reported to the tracker.
VERSION = '20200102.03'
USER_AGENT = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; WOW64; Trident/4.0; SLCC1)'
9 years ago
TRACKER_ID = 'reddit'
TRACKER_HOST = 'tracker.archiveteam.org'
###########################################################################
# This section defines project-specific tasks.
#
# Simple tasks (tasks that do not need any concurrency) are based on the
# SimpleTask class and have a process(item) method that is called for
# each item.
class CheckIP(SimpleTask):
def __init__(self):
5 years ago
SimpleTask.__init__(self, 'CheckIP')
9 years ago
self._counter = 0
def process(self, item):
# NEW for 2014! Check if we are behind firewall/proxy
if self._counter <= 0:
item.log_output('Checking IP address.')
ip_set = set()
ip_set.add(socket.gethostbyname('twitter.com'))
ip_set.add(socket.gethostbyname('facebook.com'))
ip_set.add(socket.gethostbyname('youtube.com'))
ip_set.add(socket.gethostbyname('microsoft.com'))
ip_set.add(socket.gethostbyname('icanhas.cheezburger.com'))
ip_set.add(socket.gethostbyname('archiveteam.org'))
if len(ip_set) != 6:
item.log_output('Got IP addresses: {0}'.format(ip_set))
item.log_output(
'Are you behind a firewall/proxy? That is a big no-no!')
raise Exception(
'Are you behind a firewall/proxy? That is a big no-no!')
# Check only occasionally
if self._counter <= 0:
self._counter = 10
else:
self._counter -= 1
class PrepareDirectories(SimpleTask):
def __init__(self, warc_prefix):
5 years ago
SimpleTask.__init__(self, 'PrepareDirectories')
9 years ago
self.warc_prefix = warc_prefix
def process(self, item):
5 years ago
item_name = item['item_name']
9 years ago
escaped_item_name = item_name.replace(':', '_').replace('/', '_').replace('~', '_')
5 years ago
item_hash = hashlib.sha1(item_name.encode('utf-8')).hexdigest()
dirname = '/'.join((item['data_dir'], item_hash))
9 years ago
if os.path.isdir(dirname):
shutil.rmtree(dirname)
os.makedirs(dirname)
5 years ago
item['item_dir'] = dirname
item['warc_file_base'] = '%s-%s-%s' % (self.warc_prefix, item_hash,
time.strftime('%Y%m%d-%H%M%S'))
9 years ago
5 years ago
open('%(item_dir)s/%(warc_file_base)s.warc.gz' % item, 'w').close()
open('%(item_dir)s/%(warc_file_base)s_data.txt' % item, 'w').close()
class Deduplicate(SimpleTask):
def __init__(self):
SimpleTask.__init__(self, 'Deduplicate')
def process(self, item):
digests = {}
input_filename = '%(item_dir)s/%(warc_file_base)s.warc.gz' % item
output_filename = '%(item_dir)s/%(warc_file_base)s-deduplicated.warc.gz' % item
with open(input_filename, 'rb') as f_in, \
open(output_filename, 'wb') as f_out:
writer = WARCWriter(filebuf=f_out, gzip=True)
for record in ArchiveIterator(f_in):
url = record.rec_headers.get_header('WARC-Target-URI')
if url is not None and url.startswith('<'):
url = re.search('^<(.+)>$', url).group(1)
record.rec_headers.replace_header('WARC-Target-URI', url)
if record.rec_headers.get_header('WARC-Type') == 'response':
digest = record.rec_headers.get_header('WARC-Payload-Digest')
if digest in digests:
writer.write_record(
self._record_response_to_revisit(writer, record,
digests[digest])
)
else:
digests[digest] = (
record.rec_headers.get_header('WARC-Record-ID'),
record.rec_headers.get_header('WARC-Date'),
record.rec_headers.get_header('WARC-Target-URI')
)
writer.write_record(record)
elif record.rec_headers.get_header('WARC-Type') == 'warcinfo':
record.rec_headers.replace_header('WARC-Filename', output_filename)
writer.write_record(record)
else:
writer.write_record(record)
def _record_response_to_revisit(self, writer, record, duplicate):
warc_headers = record.rec_headers
warc_headers.replace_header('WARC-Refers-To', duplicate[0])
warc_headers.replace_header('WARC-Refers-To-Date', duplicate[1])
warc_headers.replace_header('WARC-Refers-To-Target-URI', duplicate[2])
warc_headers.replace_header('WARC-Type', 'revisit')
warc_headers.replace_header('WARC-Truncated', 'length')
warc_headers.replace_header('WARC-Profile',
'http://netpreserve.org/warc/1.0/' \
'revisit/identical-payload-digest')
warc_headers.remove_header('WARC-Block-Digest')
warc_headers.remove_header('Content-Length')
return writer.create_warc_record(
record.rec_headers.get_header('WARC-Target-URI'),
'revisit',
warc_headers=warc_headers,
http_headers=record.http_headers
)
9 years ago
class MoveFiles(SimpleTask):
def __init__(self):
5 years ago
SimpleTask.__init__(self, 'MoveFiles')
9 years ago
def process(self, item):
5 years ago
if os.path.exists('%(item_dir)s/%(warc_file_base)s.warc' % item):
9 years ago
raise Exception('Please compile wget with zlib support!')
#os.rename('%(item_dir)s/%(warc_file_base)s-deduplicated.warc.gz' % item,
# '%(data_dir)s/%(warc_file_base)s-deduplicated.warc.gz' % item)
os.rename('%(item_dir)s/%(warc_file_base)s.warc.gz' % item,
'%(data_dir)s/%(warc_file_base)s.warc.gz' % item)
5 years ago
os.rename('%(item_dir)s/%(warc_file_base)s_data.txt' % item,
'%(data_dir)s/%(warc_file_base)s_data.txt' % item)
9 years ago
5 years ago
shutil.rmtree('%(item_dir)s' % item)
9 years ago
def get_hash(filename):
with open(filename, 'rb') as in_file:
return hashlib.sha1(in_file.read()).hexdigest()
CWD = os.getcwd()
PIPELINE_SHA1 = get_hash(os.path.join(CWD, 'pipeline.py'))
LUA_SHA1 = get_hash(os.path.join(CWD, 'reddit.lua'))
def stats_id_function(item):
# NEW for 2014! Some accountability hashes and stats.
d = {
'pipeline_hash': PIPELINE_SHA1,
'lua_hash': LUA_SHA1,
'python_version': sys.version,
}
return d
class WgetArgs(object):
5 years ago
post_chars = string.digits + string.ascii_lowercase
def int_to_str(self, i):
d, m = divmod(i, 36)
if d > 0:
return self.int_to_str(d) + self.post_chars[m]
return self.post_chars[m]
9 years ago
def realize(self, item):
wget_args = [
WGET_LUA,
5 years ago
'-U', USER_AGENT,
'-nv',
'--lua-script', 'reddit.lua',
'--load-cookies', 'cookies.txt',
5 years ago
'-o', ItemInterpolation('%(item_dir)s/wget.log'),
'--no-check-certificate',
'--output-document', ItemInterpolation('%(item_dir)s/wget.tmp'),
'--truncate-output',
'-e', 'robots=off',
'--rotate-dns',
'--recursive', '--level=inf',
'--no-parent',
'--page-requisites',
'--timeout', '30',
'--tries', 'inf',
'--domains', 'reddit.com',
'--span-hosts',
'--waitretry', '30',
'--warc-file', ItemInterpolation('%(item_dir)s/%(warc_file_base)s'),
'--warc-header', 'operator: Archive Team',
'--warc-header', 'reddit-dld-script-version: ' + VERSION,
'--warc-header', ItemInterpolation('reddit-item: %(item_name)s')
9 years ago
]
item_name = item['item_name']
item_type, item_value = item_name.split(':', 1)
item['item_type'] = item_type
item['item_value'] = item_value
5 years ago
if item_type in ('posts'):
start, end = item_value.split('-')
for i in range(int(start), int(end)+1):
post_id = self.int_to_str(i)
wget_args.extend(['--warc-header', 'reddit-post: {}'.format(post_id)])
wget_args.append('https://www.reddit.com/comments/{}'.format(post_id))
#wget_args.append('https://old.reddit.com/comments/{}'.format(post_id))
9 years ago
else:
raise Exception('Unknown item')
if 'bind_address' in globals():
wget_args.extend(['--bind-address', globals()['bind_address']])
print('')
print('*** Wget will bind address at {0} ***'.format(
globals()['bind_address']))
print('')
return realize(wget_args, item)
###########################################################################
# Initialize the project.
#
# This will be shown in the warrior management panel. The logo should not
# be too big. The deadline is optional.
project = Project(
5 years ago
title='reddit',
project_html='''
<img class="project-logo" alt="Project logo" src="https://www.archiveteam.org/images/b/b5/Reddit_logo.png" height="50px" title=""/>
<h2>reddit.com <span class="links"><a href="https://reddit.com/">Website</a> &middot; <a href="http://tracker.archiveteam.org/reddit/">Leaderboard</a></span></h2>
<p>Archiving everything from reddit.</p>
'''
9 years ago
)
pipeline = Pipeline(
CheckIP(),
5 years ago
GetItemFromTracker('http://%s/%s' % (TRACKER_HOST, TRACKER_ID), downloader,
9 years ago
VERSION),
5 years ago
PrepareDirectories(warc_prefix='reddit'),
9 years ago
WgetDownload(
WgetArgs(),
max_tries=2,
5 years ago
accept_on_exit_code=[0, 4, 8],
9 years ago
env={
5 years ago
'item_dir': ItemValue('item_dir'),
'item_value': ItemValue('item_value'),
'item_type': ItemValue('item_type'),
'warc_file_base': ItemValue('warc_file_base')
9 years ago
}
),
PrepareStatsForTracker(
5 years ago
defaults={'downloader': downloader, 'version': VERSION},
9 years ago
file_groups={
5 years ago
'data': [
ItemInterpolation('%(item_dir)s/%(warc_file_base)s.warc.gz')
#ItemInterpolation('%(item_dir)s/%(warc_file_base)s-deduplicated.warc.gz')
9 years ago
]
},
id_function=stats_id_function,
),
MoveFiles(),
5 years ago
LimitConcurrent(NumberConfigValue(min=1, max=20, default='20',
name='shared:rsync_threads', title='Rsync threads',
description='The maximum number of concurrent uploads.'),
9 years ago
UploadWithTracker(
5 years ago
'http://%s/%s' % (TRACKER_HOST, TRACKER_ID),
9 years ago
downloader=downloader,
version=VERSION,
files=[
ItemInterpolation('%(data_dir)s/%(warc_file_base)s.warc.gz'),
#ItemInterpolation('%(data_dir)s/%(warc_file_base)s-deduplicated.warc.gz'),
5 years ago
ItemInterpolation('%(data_dir)s/%(warc_file_base)s_data.txt')
9 years ago
],
5 years ago
rsync_target_source_path=ItemInterpolation('%(data_dir)s/'),
9 years ago
rsync_extra_args=[
5 years ago
'--sockopts=SO_SNDBUF=8388608,SO_RCVBUF=8388608',
'--recursive',
'--partial',
'--partial-dir', '.rsync-tmp',
'--min-size', '1',
'--no-compress',
'--compress-level', '0'
9 years ago
]
),
),
SendDoneToTracker(
5 years ago
tracker_url='http://%s/%s' % (TRACKER_HOST, TRACKER_ID),
stats=ItemValue('stats')
9 years ago
)
)
5 years ago