@ -5,6 +5,7 @@ import hashlib
import os . path
import random
from seesaw . config import realize , NumberConfigValue
from seesaw . externalprocess import ExternalProcess
from seesaw . item import ItemInterpolation , ItemValue
from seesaw . task import SimpleTask , LimitConcurrent
from seesaw . tracker import GetItemFromTracker , PrepareStatsForTracker , \
@ -15,8 +16,15 @@ import subprocess
import sys
import time
import string
import requests
import re
import random
try :
import warcio
from warcio . archiveiterator import ArchiveIterator
from warcio . warcwriter import WARCWriter
except :
raise Exception ( " Please install warc with ' sudo pip install warcio --upgrade ' . " )
import seesaw
from seesaw . externalprocess import WgetDownload
@ -24,10 +32,12 @@ from seesaw.pipeline import Pipeline
from seesaw . project import Project
from seesaw . util import find_executable
from tornado import httpclient
# check the seesaw version
if StrictVersion ( seesaw . __version__ ) < StrictVersion ( " 0.8.5 " ) :
raise Exception ( " This pipeline needs seesaw version 0.8.5 or higher. " )
if StrictVersion ( seesaw . __version__ ) < StrictVersion ( ' 0.8.5 ' ) :
raise Exception ( ' This pipeline needs seesaw version 0.8.5 or higher. ' )
###########################################################################
@ -37,21 +47,21 @@ if StrictVersion(seesaw.__version__) < StrictVersion("0.8.5"):
# 1. does not crash with --version, and
# 2. prints the required version string
WGET_LUA = find_executable (
" Wget+Lua " ,
[ " GNU Wget 1.14.lua.20130523-9a5c " ] ,
' Wget+Lua ' ,
[ ' GNU Wget 1.14.lua.20130523-9a5c ' , ' GNU Wget 1.14.lua.20160530-955376b ' ] ,
[
" ./wget-lua " ,
" ./wget-lua-warrior " ,
" ./wget-lua-local " ,
" ../wget-lua " ,
" ../../wget-lua " ,
" /home/warrior/wget-lua " ,
" /usr/bin/wget-lua "
' ./wget-lua ' ,
' ./wget-lua-warrior ' ,
' ./wget-lua-local ' ,
' ../wget-lua ' ,
' ../../wget-lua ' ,
' /home/warrior/wget-lua ' ,
' /usr/bin/wget-lua '
]
)
if not WGET_LUA :
raise Exception ( " No usable Wget+Lua found. " )
raise Exception ( ' No usable Wget+Lua found. ' )
###########################################################################
@ -59,7 +69,7 @@ if not WGET_LUA:
#
# Update this each time you make a non-cosmetic change.
# It will be added to the WARC files and reported to the tracker.
VERSION = " 20150620.02 "
VERSION = ' 20190222.01 '
USER_AGENT = ' ArchiveTeam '
TRACKER_ID = ' reddit '
TRACKER_HOST = ' tracker.archiveteam.org '
@ -73,7 +83,7 @@ TRACKER_HOST = 'tracker.archiveteam.org'
# each item.
class CheckIP ( SimpleTask ) :
def __init__ ( self ) :
SimpleTask . __init__ ( self , " CheckIP " )
SimpleTask . __init__ ( self , ' CheckIP ' )
self . _counter = 0
def process ( self , item ) :
@ -106,39 +116,98 @@ class CheckIP(SimpleTask):
class PrepareDirectories ( SimpleTask ) :
def __init__ ( self , warc_prefix ) :
SimpleTask . __init__ ( self , " PrepareDirectories " )
SimpleTask . __init__ ( self , ' PrepareDirectories ' )
self . warc_prefix = warc_prefix
def process ( self , item ) :
item_name = item [ " item_name " ]
item_name = item [ ' item_name ' ]
escaped_item_name = item_name . replace ( ' : ' , ' _ ' ) . replace ( ' / ' , ' _ ' ) . replace ( ' ~ ' , ' _ ' )
dirname = " / " . join ( ( item [ " data_dir " ] , escaped_item_name ) )
item_hash = hashlib . sha1 ( item_name . encode ( ' utf-8 ' ) ) . hexdigest ( )
dirname = ' / ' . join ( ( item [ ' data_dir ' ] , item_hash ) )
if os . path . isdir ( dirname ) :
shutil . rmtree ( dirname )
os . makedirs ( dirname )
item [ " item_dir " ] = dirname
item [ " warc_file_base " ] = " %s - %s - %s " % ( self . warc_prefix , escaped_item_name ,
time . strftime ( " % Y % m %d - % H % M % S " ) )
item [ ' item_dir ' ] = dirname
item [ ' warc_file_base ' ] = ' %s - %s - %s ' % ( self . warc_prefix , item_hash ,
time . strftime ( ' % Y % m %d - % H % M % S ' ) )
open ( " %(item_dir)s / %(warc_file_base)s .warc.gz " % item , " w " ) . close ( )
open ( ' %(item_dir)s / %(warc_file_base)s .warc.gz ' % item , ' w ' ) . close ( )
open ( ' %(item_dir)s / %(warc_file_base)s _data.txt ' % item , ' w ' ) . close ( )
class Deduplicate ( SimpleTask ) :
def __init__ ( self ) :
SimpleTask . __init__ ( self , ' Deduplicate ' )
def process ( self , item ) :
digests = { }
input_filename = ' %(item_dir)s / %(warc_file_base)s .warc.gz ' % item
output_filename = ' %(item_dir)s / %(warc_file_base)s -deduplicated.warc.gz ' % item
with open ( input_filename , ' rb ' ) as f_in , \
open ( output_filename , ' wb ' ) as f_out :
writer = WARCWriter ( filebuf = f_out , gzip = True )
for record in ArchiveIterator ( f_in ) :
url = record . rec_headers . get_header ( ' WARC-Target-URI ' )
if url is not None and url . startswith ( ' < ' ) :
url = re . search ( ' ^<(.+)>$ ' , url ) . group ( 1 )
record . rec_headers . replace_header ( ' WARC-Target-URI ' , url )
if record . rec_headers . get_header ( ' WARC-Type ' ) == ' response ' :
digest = record . rec_headers . get_header ( ' WARC-Payload-Digest ' )
if digest in digests :
writer . write_record (
self . _record_response_to_revisit ( writer , record ,
digests [ digest ] )
)
else :
digests [ digest ] = (
record . rec_headers . get_header ( ' WARC-Record-ID ' ) ,
record . rec_headers . get_header ( ' WARC-Date ' ) ,
record . rec_headers . get_header ( ' WARC-Target-URI ' )
)
writer . write_record ( record )
elif record . rec_headers . get_header ( ' WARC-Type ' ) == ' warcinfo ' :
record . rec_headers . replace_header ( ' WARC-Filename ' , output_filename )
writer . write_record ( record )
else :
writer . write_record ( record )
def _record_response_to_revisit ( self , writer , record , duplicate ) :
warc_headers = record . rec_headers
warc_headers . replace_header ( ' WARC-Refers-To ' , duplicate [ 0 ] )
warc_headers . replace_header ( ' WARC-Refers-To-Date ' , duplicate [ 1 ] )
warc_headers . replace_header ( ' WARC-Refers-To-Target-URI ' , duplicate [ 2 ] )
warc_headers . replace_header ( ' WARC-Type ' , ' revisit ' )
warc_headers . replace_header ( ' WARC-Truncated ' , ' length ' )
warc_headers . replace_header ( ' WARC-Profile ' ,
' http://netpreserve.org/warc/1.0/ ' \
' revisit/identical-payload-digest ' )
warc_headers . remove_header ( ' WARC-Block-Digest ' )
warc_headers . remove_header ( ' Content-Length ' )
return writer . create_warc_record (
record . rec_headers . get_header ( ' WARC-Target-URI ' ) ,
' revisit ' ,
warc_headers = warc_headers ,
http_headers = record . http_headers
)
class MoveFiles ( SimpleTask ) :
def __init__ ( self ) :
SimpleTask . __init__ ( self , " MoveFiles " )
SimpleTask . __init__ ( self , ' MoveFiles ' )
def process ( self , item ) :
# NEW for 2014! Check if wget was compiled with zlib support
if os . path . exists ( " %(item_dir)s / %(warc_file_base)s .warc " % item ) :
if os . path . exists ( ' %(item_dir)s / %(warc_file_base)s .warc ' % item ) :
raise Exception ( ' Please compile wget with zlib support! ' )
os . rename ( " %(item_dir)s / %(warc_file_base)s .warc.gz " % item ,
" %(data_dir)s / %(warc_file_base)s .warc.gz " % item )
os . rename ( ' %(item_dir)s / %(warc_file_base)s -deduplicated.warc.gz ' % item ,
' %(data_dir)s / %(warc_file_base)s -deduplicated.warc.gz ' % item )
os . rename ( ' %(item_dir)s / %(warc_file_base)s _data.txt ' % item ,
' %(data_dir)s / %(warc_file_base)s _data.txt ' % item )
shutil . rmtree ( " %(item_dir)s " % item )
shutil . rmtree ( ' %(item_dir)s ' % item )
def get_hash ( filename ) :
@ -163,62 +232,54 @@ def stats_id_function(item):
class WgetArgs ( object ) :
post_chars = string . digits + string . ascii_lowercase
def int_to_str ( self , i ) :
d , m = divmod ( i , 36 )
if d > 0 :
return self . int_to_str ( d ) + self . post_chars [ m ]
return self . post_chars [ m ]
def realize ( self , item ) :
wget_args = [
WGET_LUA ,
" -U " , USER_AGENT ,
" -nv " ,
" --lua-script " , " reddit.lua " ,
" --load-cookies " , " cookies " ,
" -o " , ItemInterpolation ( " %(item_dir)s /wget.log " ) ,
" --no-check-certificate " ,
" --output-document " , ItemInterpolation ( " %(item_dir)s /wget.tmp " ) ,
" --truncate-output " ,
" -e " , " robots=off " ,
" --rotate-dns " ,
" --recursive " , " --level=inf " ,
" --no-parent " ,
" --page-requisites " ,
" --timeout " , " 30 " ,
" --tries " , " inf " ,
" --domains " , " reddit.com,redditmedia.com " ,
" --span-hosts " ,
" --waitretry " , " 30 " ,
" --warc-file " , ItemInterpolation ( " %(item_dir)s / %(warc_file_base)s " ) ,
" --warc-header " , " operator: Archive Team " ,
" --warc-header " , " reddit-dld-script-version: " + VERSION ,
" --warc-header " , ItemInterpolation ( " reddit-user: %(item_name)s " ) ,
' -U ' , USER_AGENT ,
' -nv ' ,
' --lua-script ' , ' reddit.lua ' ,
' --load-cookies ' , ' cookies ' ,
' -o ' , ItemInterpolation ( ' %(item_dir)s /wget.log ' ) ,
' --no-check-certificate ' ,
' --output-document ' , ItemInterpolation ( ' %(item_dir)s /wget.tmp ' ) ,
' --truncate-output ' ,
' -e ' , ' robots=off ' ,
' --rotate-dns ' ,
' --recursive ' , ' --level=inf ' ,
' --no-parent ' ,
' --page-requisites ' ,
' --timeout ' , ' 30 ' ,
' --tries ' , ' inf ' ,
' --domains ' , ' reddit.com ' ,
' --span-hosts ' ,
' --waitretry ' , ' 30 ' ,
' --warc-file ' , ItemInterpolation ( ' %(item_dir)s / %(warc_file_base)s ' ) ,
' --warc-header ' , ' operator: Archive Team ' ,
' --warc-header ' , ' reddit-dld-script-version: ' + VERSION ,
' --warc-header ' , ItemInterpolation ( ' reddit-item: %(item_name)s ' )
]
item_name = item [ ' item_name ' ]
assert ' : ' in item_name
item_type , item_value = item_name . split ( ' : ' , 1 )
item [ ' item_type ' ] = item_type
item [ ' item_value ' ] = item_value
assert item_type in ( ' 36comments ' )
if item_type == ' 36comments ' :
suffixes = string . digits + string . ascii_lowercase
for url in [ ' http://redd.it/ {0} {1} ' . format ( item_value , a ) for a in suffixes ] :
wget_args . append ( url )
# for suffix in suffixes:
# commenturl = 'https://www.reddit.com/comments/{0}{1}/'.format(item_value, suffix)
# html = requests.get(commenturl, headers={'User-Agent': 'ArchiveTeam'})
# print('Downloaded', html.status_code, getattr(html, 'reason'))
# sys.stdout.flush()
# if html.status_code == 200:
# if not html.text:
# raise Exception('Something went wrong during the download. ({0})'.format(html.status_code))
# else:
# for origurl in re.findall(r'href="(https?:\/\/www\.reddit\.com\/r\/[^/]+\/comments\/{0}{1}\/[^"]+)"'.format(item_value, suffix), html.text):
# if (re.search(r'https?:\/\/www\.reddit\.com\/r\/[^/]+\/comments\/[^/]+\/[^/]+\/', origurl) or re.search(r'https?:\/\/www\.reddit\.com\/r\/[^/]+\/comments\/[^/]+\/', origurl)) and not re.search(r'https?:\/\/www\.reddit\.com\/r\/[^/]+\/comments\/[^/]+\/[^/]+\/.', origurl):
# wget_args.append(origurl)
# elif html.status_code == 404:
# print('This url is 404.')
# else:
# raise Exception('Something went wrong during the download. ({0})'.format(html.status_code))
if item_type in ( ' posts ' ) :
start , end = item_value . split ( ' - ' )
for i in range ( int ( start ) , int ( end ) + 1 ) :
post_id = self . int_to_str ( i )
wget_args . extend ( [ ' --warc-header ' , ' reddit-post: {} ' . format ( post_id ) ] )
wget_args . append ( ' https://www.reddit.com/comments/ {} ' . format ( post_id ) )
wget_args . append ( ' https://old.reddit.com/comments/ {} ' . format ( post_id ) )
else :
raise Exception ( ' Unknown item ' )
@ -237,59 +298,67 @@ class WgetArgs(object):
# This will be shown in the warrior management panel. The logo should not
# be too big. The deadline is optional.
project = Project (
title = " reddit " ,
project_html = """
< img class = " project-logo " alt = " Project logo " src = " http ://archiveteam.org/images/thumb/ b/b5/Reddit_logo.png/320px- Reddit_logo.png" height = " 50px " title = " " / >
< h2 > www. reddit. com < span class = " links " > < a href = " https:// www. reddit.com/" > Website < / a > & middot ; < a href = " http://tracker.archiveteam.org/reddit/ " > Leaderboard < / a > < / span > < / h2 >
< p > Grabbing reddit .< / p >
"""
title = ' reddit ' ,
project_html = '''
< img class = " project-logo " alt = " Project logo " src = " http s ://www. archiveteam.org/images/b/b5/Reddit_logo.png" height = " 50px " title = " " / >
< h2 > reddit. com < span class = " links " > < a href = " https:// reddit.com/" > Website < / a > & middot ; < a href = " http://tracker.archiveteam.org/reddit/ " > Leaderboard < / a > < / span > < / h2 >
< p > Archiving everything from reddit .< / p >
'''
)
pipeline = Pipeline (
CheckIP ( ) ,
GetItemFromTracker ( " http:// %s / %s " % ( TRACKER_HOST , TRACKER_ID ) , downloader ,
GetItemFromTracker ( ' http:// %s / %s ' % ( TRACKER_HOST , TRACKER_ID ) , downloader ,
VERSION ) ,
PrepareDirectories ( warc_prefix = " reddit " ) ,
PrepareDirectories ( warc_prefix = ' reddit ' ) ,
WgetDownload (
WgetArgs ( ) ,
max_tries = 2 ,
accept_on_exit_code = [ 0 , 8] ,
accept_on_exit_code = [ 0 , 4, 8] ,
env = {
" item_dir " : ItemValue ( " item_dir " ) ,
" item_value " : ItemValue ( " item_value " ) ,
" item_type " : ItemValue ( " item_type " ) ,
' item_dir ' : ItemValue ( ' item_dir ' ) ,
' item_value ' : ItemValue ( ' item_value ' ) ,
' item_type ' : ItemValue ( ' item_type ' ) ,
' warc_file_base ' : ItemValue ( ' warc_file_base ' )
}
) ,
Deduplicate ( ) ,
PrepareStatsForTracker (
defaults = { " downloader " : downloader , " version " : VERSION } ,
defaults = { ' downloader ' : downloader , ' version ' : VERSION } ,
file_groups = {
" data " : [
ItemInterpolation ( " %(item_dir)s / %(warc_file_base)s .warc.gz " )
' data ' : [
ItemInterpolation ( ' %(item_dir)s / %(warc_file_base)s -deduplicated.warc.gz ' )
]
} ,
id_function = stats_id_function ,
) ,
MoveFiles ( ) ,
LimitConcurrent ( NumberConfigValue ( min = 1 , max = 4, default = " 1 " ,
name = " shared:rsync_threads " , title = " Rsync threads " ,
description = " The maximum number of concurrent uploads. " ) ,
LimitConcurrent ( NumberConfigValue ( min = 1 , max = 20, default = ' 20 ' ,
name = ' shared:rsync_threads ' , title = ' Rsync threads ' ,
description = ' The maximum number of concurrent uploads. ' ) ,
UploadWithTracker (
" http:// %s / %s " % ( TRACKER_HOST , TRACKER_ID ) ,
' http:// %s / %s ' % ( TRACKER_HOST , TRACKER_ID ) ,
downloader = downloader ,
version = VERSION ,
files = [
ItemInterpolation ( " %(data_dir)s / %(warc_file_base)s .warc.gz " )
ItemInterpolation ( ' %(data_dir)s / %(warc_file_base)s -deduplicated.warc.gz ' ) ,
ItemInterpolation ( ' %(data_dir)s / %(warc_file_base)s _data.txt ' )
] ,
rsync_target_source_path = ItemInterpolation ( " %(data_dir)s / " ) ,
rsync_target_source_path = ItemInterpolation ( ' %(data_dir)s / ' ) ,
rsync_extra_args = [
" --recursive " ,
" --partial " ,
" --partial-dir " , " .rsync-tmp " ,
' --sockopts=SO_SNDBUF=8388608,SO_RCVBUF=8388608 ' ,
' --recursive ' ,
' --partial ' ,
' --partial-dir ' , ' .rsync-tmp ' ,
' --min-size ' , ' 1 ' ,
' --no-compress ' ,
' --compress-level ' , ' 0 '
]
) ,
) ,
SendDoneToTracker (
tracker_url = " http:// %s / %s " % ( TRACKER_HOST , TRACKER_ID ) ,
stats = ItemValue ( " stats " )
tracker_url = ' http:// %s / %s ' % ( TRACKER_HOST , TRACKER_ID ) ,
stats = ItemValue ( ' stats ' )
)
)