refactor: convert config from dict to Config object for better type hinting

pull/475/head
Misty 1 year ago
parent 0da98611b5
commit 2154ead2be

@ -13,6 +13,7 @@ from wikiteam3.dumpgenerator.api.index_check import checkIndex
from wikiteam3.utils import getUserAgent
from wikiteam3.dumpgenerator.version import getVersion
from wikiteam3.dumpgenerator.api import getWikiEngine
from wikiteam3.dumpgenerator.config import Config, DefaultConfig, newConfig
def getParameters(params=[]):
@ -274,7 +275,7 @@ def getParameters(params=[]):
parser.print_help()
sys.exit(1)
config = {
config = newConfig({
"curonly": args.curonly,
"date": datetime.datetime.now().strftime("%Y%m%d"),
"api": api,
@ -291,7 +292,7 @@ def getParameters(params=[]):
"cookies": args.cookies or "",
"delay": args.delay,
"retries": int(args.retries),
}
})
other = {
"resume": args.resume,
@ -302,17 +303,17 @@ def getParameters(params=[]):
}
# calculating path, if not defined by user with --path=
if not config["path"]:
config["path"] = "./{}-{}-wikidump".format(
if not config.path:
config.path = "./{}-{}-wikidump".format(
domain2prefix(config=config, session=session),
config["date"],
config.date,
)
print("No --path argument provided. Defaulting to:")
print(" [working_directory]/[domain_prefix]-[date]-wikidump")
print("Which expands to:")
print(" " + config["path"])
print(" " + config.path)
if config["delay"] == 0.5:
if config.delay == 0.5:
print("--delay is the default value of 0.5")
print(
"There will be a 0.5 second delay between HTTP calls in order to keep the server from timing you out."

@ -3,6 +3,7 @@ import threading
import time
import sys
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
class Delay:
@ -19,18 +20,18 @@ class Delay:
except KeyboardInterrupt:
sys.exit()
def __init__(self, config={}, session=None):
def __init__(self, config: Config=None, session=None):
"""Add a delay if configured for that"""
if config["delay"] > 0:
if config.delay > 0:
self.done = False
ellipses_animation = threading.Thread(target=self.animate)
ellipses_animation.start()
# sys.stdout.write("\rSleeping %.2f seconds..." % (config["delay"]))
# sys.stdout.write("\rSleeping %.2f seconds..." % (config.delay))
# sys.stdout.flush()
time.sleep(config["delay"])
time.sleep(config.delay)
self.done = True
sys.stdout.write("\r \r")

@ -1,26 +1,93 @@
import json
import sys
def loadConfig(config={}, configfilename=""):
"""Load config file"""
try:
with open(
"{}/{}".format(config["path"], configfilename), encoding="utf-8"
) as infile:
config = json.load(infile)
except:
print("There is no config file. we can't resume. Start a new dump.")
sys.exit()
return config
def saveConfig(config={}, configfilename=""):
"""Save config file"""
with open(
"{}/{}".format(config["path"], configfilename), "w", encoding="utf-8"
) as outfile:
json.dump(config, outfile)
import dataclasses
import json
import sys
from typing import *
def _dataclass_from_dict(klass_or_obj, d):
if isinstance(klass_or_obj, type): # klass
ret = klass_or_obj()
else:
ret = klass_or_obj
for k,v in d.items():
if hasattr(ret, k):
setattr(ret, k, v)
return ret
'''
config = {
"curonly": args.curonly,
"date": datetime.datetime.now().strftime("%Y%m%d"),
"api": api,
"failfast": args.failfast,
"http_method": "POST",
"index": index,
"images": args.images,
"logs": False,
"xml": args.xml,
"xmlrevisions": args.xmlrevisions,
"namespaces": namespaces,
"exnamespaces": exnamespaces,
"path": args.path and os.path.normpath(args.path) or "",
"cookies": args.cookies or "",
"delay": args.delay,
"retries": int(args.retries),
}
'''
@dataclasses.dataclass
class Config:
# General params
delay: float = 0.0
retries: int = 0
path: str = ''
logs: bool = False
date: str = False
# URL params
index: str = ''
api: str = ''
# Download params
xml: bool = False
curonly: bool = False
xmlrevisions: bool = False
images: bool = False
namespaces: List[int] = None
exnamespaces: List[int] = None
export: str = '' # Special:Export page name
http_method: str = ''
# Meta info params
failfast: bool = False
templates: bool = False
DefaultConfig = Config()
def newConfig(configDict):
return _dataclass_from_dict(Config, configDict)
def loadConfig(config: Config=None, configfilename=""):
"""Load config file"""
configDict = dataclasses.asdict(config)
if config.path:
try:
with open(
"{}/{}".format(config.path, configfilename), encoding="utf-8"
) as infile:
configDict.update(json.load(infile))
return newConfig(configDict)
except:
pass
print("There is no config file. we can't resume. Start a new dump.")
sys.exit()
def saveConfig(config: Config=None, configfilename=""):
"""Save config file"""
with open(
"{}/{}".format(config.path, configfilename), "w", encoding="utf-8"
) as outfile:
json.dump(dataclasses.asdict(config), outfile)

@ -21,6 +21,7 @@ except ImportError:
sys.exit(1)
from wikiteam3.dumpgenerator.config import loadConfig, saveConfig
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
from wikiteam3.dumpgenerator.cli import getParameters, bye, welcome
from wikiteam3.utils import domain2prefix
from wikiteam3.utils import truncateFilename
@ -69,25 +70,25 @@ class DumpGenerator:
with (Tee(other["stdout_log_path"]) if other["stdout_log_path"] is not None else contextlib.nullcontext()):
print(welcome())
print("Analysing %s" % (config["api"] and config["api"] or config["index"]))
print("Analysing %s" % (config.api and config.api or config.index))
# creating path or resuming if desired
c = 2
# to avoid concat blabla-2, blabla-2-3, and so on...
originalpath = config["path"]
originalpath = config.path
# do not enter if resume is requested from begining
while not other["resume"] and os.path.isdir(config["path"]):
print('\nWarning!: "%s" path exists' % (config["path"]))
while not other["resume"] and os.path.isdir(config.path):
print('\nWarning!: "%s" path exists' % (config.path))
reply = ""
if config["failfast"]:
if config.failfast:
retry = "yes"
while reply.lower() not in ["yes", "y", "no", "n"]:
reply = input(
'There is a dump in "%s", probably incomplete.\nIf you choose resume, to avoid conflicts, the parameters you have chosen in the current session will be ignored\nand the parameters available in "%s/%s" will be loaded.\nDo you want to resume ([yes, y], [no, n])? '
% (config["path"], config["path"], configfilename)
% (config.path, config.path, configfilename)
)
if reply.lower() in ["yes", "y"]:
if not os.path.isfile("{}/{}".format(config["path"], configfilename)):
if not os.path.isfile("{}/{}".format(config.path, configfilename)):
print("No config file found. I can't resume. Aborting.")
sys.exit()
print("You have selected: YES")
@ -96,15 +97,15 @@ class DumpGenerator:
elif reply.lower() in ["no", "n"]:
print("You have selected: NO")
other["resume"] = False
config["path"] = "%s-%d" % (originalpath, c)
print('Trying to use path "%s"...' % (config["path"]))
config.path = "%s-%d" % (originalpath, c)
print('Trying to use path "%s"...' % (config.path))
c += 1
if other["resume"]:
print("Loading config file...")
config = loadConfig(config=config, configfilename=configfilename)
else:
os.mkdir(config["path"])
os.mkdir(config.path)
saveConfig(config=config, configfilename=configfilename)
if other["resume"]:
@ -118,36 +119,36 @@ class DumpGenerator:
bye()
@staticmethod
def createNewDump(config={}, other={}):
def createNewDump(config: Config=None, other={}):
images = []
print("Trying generating a new dump into a new directory...")
if config["xml"]:
if config.xml:
getPageTitles(config=config, session=other["session"])
titles = readTitles(config)
generateXMLDump(config=config, titles=titles, session=other["session"])
checkXMLIntegrity(config=config, titles=titles, session=other["session"])
if config["images"]:
if config.images:
images += Image.getImageNames(config=config, session=other["session"])
Image.saveImageNames(config=config, images=images, session=other["session"])
Image.generateImageDump(
config=config, other=other, images=images, session=other["session"]
)
if config["logs"]:
if config.logs:
saveLogs(config=config, session=other["session"])
@staticmethod
def resumePreviousDump(config={}, other={}):
def resumePreviousDump(config: Config=None, other={}):
images = []
print("Resuming previous dump process...")
if config["xml"]:
if config.xml:
titles = readTitles(config)
try:
with FileReadBackwards(
"%s/%s-%s-titles.txt"
% (
config["path"],
config.path,
domain2prefix(config=config, session=other["session"]),
config["date"],
config.date,
),
encoding="utf-8",
) as frb:
@ -172,10 +173,10 @@ class DumpGenerator:
with FileReadBackwards(
"%s/%s-%s-%s.xml"
% (
config["path"],
config.path,
domain2prefix(config=config, session=other["session"]),
config["date"],
config["curonly"] and "current" or "history",
config.date,
config.curonly and "current" or "history",
),
encoding="utf-8",
) as frb:
@ -210,13 +211,13 @@ class DumpGenerator:
titles = readTitles(config)
generateXMLDump(config=config, titles=titles, session=other["session"])
if config["images"]:
if config.images:
# load images
lastimage = ""
try:
f = open(
"%s/%s-%s-images.txt"
% (config["path"], domain2prefix(config=config), config["date"]),
% (config.path, domain2prefix(config=config), config.date),
encoding="utf-8",
)
lines = f.readlines()
@ -240,7 +241,7 @@ class DumpGenerator:
# checking images directory
listdir = []
try:
listdir = os.listdir("%s/images" % (config["path"]))
listdir = os.listdir("%s/images" % (config.path))
except OSError:
pass # probably directory does not exist
listdir.sort()
@ -274,6 +275,6 @@ class DumpGenerator:
session=other["session"],
)
if config["logs"]:
if config.logs:
# fix
pass

@ -2,18 +2,18 @@ import os
from wikiteam3.dumpgenerator.cli import Delay
from wikiteam3.utils import removeIP
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
def saveIndexPHP(config={}, session=None):
def saveIndexPHP(config: Config=None, session=None):
"""Save index.php as .html, to preserve license details available at the botom of the page"""
if os.path.exists("%s/index.html" % (config["path"])):
if os.path.exists("%s/index.html" % (config.path)):
print("index.html exists, do not overwrite")
else:
print("Downloading index.php (Main Page) as index.html")
r = session.post(url=config["index"], params={}, timeout=10)
r = session.post(url=config.index, params={}, timeout=10)
raw = str(r.text)
Delay(config=config, session=session)
raw = removeIP(raw=raw)
with open("%s/index.html" % (config["path"]), "w", encoding="utf-8") as outfile:
with open("%s/index.html" % (config.path), "w", encoding="utf-8") as outfile:
outfile.write(raw)

@ -1,7 +1,7 @@
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
from wikiteam3.dumpgenerator.cli import Delay
def saveLogs(config={}, session=None):
def saveLogs(config: Config=None, session=None):
"""Save Special:Log"""
# get all logs from Special:Log
"""parse

@ -2,22 +2,23 @@ import os
from wikiteam3.dumpgenerator.cli import Delay
from wikiteam3.utils import removeIP
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
def saveSpecialVersion(config={}, session=None):
def saveSpecialVersion(config: Config=None, session=None):
"""Save Special:Version as .html, to preserve extensions details"""
if os.path.exists("%s/Special:Version.html" % (config["path"])):
if os.path.exists("%s/Special:Version.html" % (config.path)):
print("Special:Version.html exists, do not overwrite")
else:
print("Downloading Special:Version with extensions and other related info")
r = session.post(
url=config["index"], params={"title": "Special:Version"}, timeout=10
url=config.index, params={"title": "Special:Version"}, timeout=10
)
raw = str(r.text)
Delay(config=config, session=session)
raw = str(removeIP(raw=raw))
with open(
"%s/Special:Version.html" % (config["path"]), "w", encoding="utf-8"
"%s/Special:Version.html" % (config.path), "w", encoding="utf-8"
) as outfile:
outfile.write(str(raw))

@ -3,20 +3,21 @@ import os
from wikiteam3.dumpgenerator.cli import Delay
from wikiteam3.dumpgenerator.api import getJSON
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
def saveSiteInfo(config={}, session=None):
def saveSiteInfo(config: Config=None, session=None):
"""Save a file with site info"""
if config["api"]:
if os.path.exists("%s/siteinfo.json" % (config["path"])):
if config.api:
if os.path.exists("%s/siteinfo.json" % (config.path)):
print("siteinfo.json exists, do not overwrite")
else:
print("Downloading site info as siteinfo.json")
# MediaWiki 1.13+
r = session.get(
url=config["api"],
url=config.api,
params={
"action": "query",
"meta": "siteinfo",
@ -29,7 +30,7 @@ def saveSiteInfo(config={}, session=None):
# MediaWiki 1.11-1.12
if not "query" in getJSON(r):
r = session.get(
url=config["api"],
url=config.api,
params={
"action": "query",
"meta": "siteinfo",
@ -41,7 +42,7 @@ def saveSiteInfo(config={}, session=None):
# MediaWiki 1.8-1.10
if not "query" in getJSON(r):
r = session.get(
url=config["api"],
url=config.api,
params={
"action": "query",
"meta": "siteinfo",
@ -53,6 +54,6 @@ def saveSiteInfo(config={}, session=None):
result = getJSON(r)
Delay(config=config, session=session)
with open(
"%s/siteinfo.json" % (config["path"]), "w", encoding="utf-8"
"%s/siteinfo.json" % (config.path), "w", encoding="utf-8"
) as outfile:
outfile.write(json.dumps(result, indent=4, sort_keys=True))

@ -12,12 +12,12 @@ from wikiteam3.dumpgenerator.log import logerror
from .page_xml import getXMLPage
from wikiteam3.utils import truncateFilename
from wikiteam3.utils import cleanHTML, undoHTMLEntities
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
class Image:
def getXMLFileDesc(config={}, title="", session=None):
def getXMLFileDesc(config: Config=None, title="", session=None):
"""Get XML for image description page"""
config["curonly"] = 1 # tricky to get only the most recent desc
config.curonly = 1 # tricky to get only the most recent desc
return "".join(
[
x
@ -27,12 +27,12 @@ class Image:
]
)
def generateImageDump(config={}, other={}, images=[], start="", session=None):
def generateImageDump(config: Config=None, other={}, images=[], start="", session=None):
"""Save files and descriptions using a file list"""
# fix use subdirectories md5
print('Retrieving images from "%s"' % (start and start or "start"))
imagepath = "%s/images" % (config["path"])
imagepath = "%s/images" % (config.path)
if not os.path.isdir(imagepath):
print('Creating "%s" directory' % (imagepath))
os.makedirs(imagepath)
@ -98,12 +98,12 @@ class Image:
try:
title = "Image:%s" % (filename)
if (
config["xmlrevisions"]
and config["api"]
and config["api"].endswith("api.php")
config.xmlrevisions
and config.api
and config.api.endswith("api.php")
):
r = session.get(
config["api"]
config.api
+ "?action=query&export&exportnowrap&titles="
+ urllib.parse.quote(title)
)
@ -151,15 +151,15 @@ class Image:
print(f"\n-> Downloaded {c} images\n")
def getImageNames(config={}, session=None):
def getImageNames(config: Config=None, session=None):
"""Get list of image names"""
print(")Retrieving image filenames")
images = []
if "api" in config and config["api"]:
if config.api:
print("Using API to retrieve image names...")
images = Image.getImageNamesAPI(config=config, session=session)
elif "index" in config and config["index"]:
elif config.index:
print("Using index.php (Special:Imagelist) to retrieve image names...")
images = Image.getImageNamesScraper(config=config, session=session)
@ -170,7 +170,7 @@ class Image:
print("%d image names loaded" % (len(images)))
return images
def getImageNamesScraper(config={}, session=None):
def getImageNamesScraper(config: Config=None, session=None):
"""Retrieve file list: filename, url, uploader"""
# (?<! http://docs.python.org/library/re.html
@ -178,13 +178,13 @@ class Image:
images = []
offset = "29990101000000" # january 1, 2999
limit = 5000
retries = config["retries"]
retries = config.retries
while offset:
# 5000 overload some servers, but it is needed for sites like this with
# no next links
# http://www.memoryarchive.org/en/index.php?title=Special:Imagelist&sort=byname&limit=50&wpIlMatch=
r = session.post(
url=config["index"],
url=config.index,
params={"title": "Special:Imagelist", "limit": limit, "offset": offset},
timeout=30,
)
@ -277,7 +277,7 @@ class Image:
images.sort()
return images
def getImageNamesAPI(config={}, session=None):
def getImageNamesAPI(config: Config=None, session=None):
"""Retrieve file list: filename, url, uploader"""
oldAPI = False
# # Commented by @yzqzss:
@ -304,7 +304,7 @@ class Image:
"ailimit": 50,
}
# FIXME Handle HTTP Errors HERE
r = session.get(url=config["api"], params=params, timeout=30)
r = session.get(url=config.api, params=params, timeout=30)
handleStatusCode(r)
jsonimages = getJSON(r)
Delay(config=config, session=session)
@ -343,8 +343,8 @@ class Image:
# unquote() no longer supports bytes-like strings
# so unicode may require the following workaround:
# https://izziswift.com/how-to-unquote-a-urlencoded-unicode-string-in-python/
if "api" in config and (
".wikia." in config["api"] or ".fandom.com" in config["api"]
if (
".wikia." in config.api or ".fandom.com" in config.api
):
filename = urllib.parse.unquote(
re.sub("_", " ", url.split("/")[-3])
@ -388,7 +388,7 @@ class Image:
"format": "json",
}
# FIXME Handle HTTP Errors HERE
r = session.get(url=config["api"], params=params, timeout=30)
r = session.get(url=config.api, params=params, timeout=30)
handleStatusCode(r)
jsonimages = getJSON(r)
Delay(config=config, session=session)
@ -440,14 +440,14 @@ class Image:
return images
def saveImageNames(config={}, images=[], session=None):
def saveImageNames(config: Config=None, images=[], session=None):
"""Save image list in a file, including filename, url and uploader"""
imagesfilename = "{}-{}-images.txt".format(
domain2prefix(config=config), config["date"]
domain2prefix(config=config), config.date
)
imagesfile = open(
"{}/{}".format(config["path"], imagesfilename), "w", encoding="utf-8"
"{}/{}".format(config.path, imagesfilename), "w", encoding="utf-8"
)
imagesfile.write(
"\n".join(
@ -462,21 +462,21 @@ class Image:
print("Image filenames and URLs saved at...", imagesfilename)
def curateImageURL(config={}, url=""):
def curateImageURL(config: Config=None, url=""):
"""Returns an absolute URL for an image, adding the domain if missing"""
if "index" in config and config["index"]:
if config.index:
# remove from :// (http or https) until the first / after domain
domainalone = (
config["index"].split("://")[0]
config.index.split("://")[0]
+ "://"
+ config["index"].split("://")[1].split("/")[0]
+ config.index.split("://")[1].split("/")[0]
)
elif "api" in config and config["api"]:
elif config.api:
domainalone = (
config["api"].split("://")[0]
config.api.split("://")[0]
+ "://"
+ config["api"].split("://")[1].split("/")[0]
+ config.api.split("://")[1].split("/")[0]
)
else:
print("ERROR: no index nor API")

@ -7,20 +7,21 @@ import mwclient
from wikiteam3.dumpgenerator.cli import Delay
from wikiteam3.dumpgenerator.dump.xmlrev.namespaces import getNamespacesAPI, getNamespacesScraper
from wikiteam3.utils import domain2prefix, cleanHTML, undoHTMLEntities
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
def getPageTitlesAPI(config={}, session=None):
def getPageTitlesAPI(config: Config=None, session=None):
"""Uses the API to get the list of page titles"""
titles = []
namespaces, namespacenames = getNamespacesAPI(config=config, session=session)
for namespace in namespaces:
if namespace in config["exnamespaces"]:
if namespace in config.exnamespaces:
print(" Skipping namespace = %d" % (namespace))
continue
c = 0
sys.stdout.write(" Retrieving titles in the namespace %d" % (namespace))
apiurl = urlparse(config["api"])
apiurl = urlparse(config.api)
site = mwclient.Site(
apiurl.netloc, apiurl.path.replace("api.php", ""), scheme=apiurl.scheme, pool=session
)
@ -41,14 +42,14 @@ def getPageTitlesAPI(config={}, session=None):
Delay(config=config, session=session)
def getPageTitlesScraper(config={}, session=None):
def getPageTitlesScraper(config: Config=None, session=None):
"""Scrape the list of page titles from Special:Allpages"""
titles = []
namespaces, namespacenames = getNamespacesScraper(config=config, session=session)
for namespace in namespaces:
print(" Retrieving titles in the namespace", namespace)
url = "{}?title=Special:Allpages&namespace={}".format(
config["index"], namespace
config.index, namespace
)
r = session.get(url=url, timeout=30)
raw = str(r.text)
@ -89,7 +90,7 @@ def getPageTitlesScraper(config={}, session=None):
to = i.group("to")
name = f"{fr}-{to}"
url = "{}?title=Special:Allpages&namespace={}&from={}&to={}".format(
config["index"],
config.index,
namespace,
fr,
to,
@ -101,7 +102,7 @@ def getPageTitlesScraper(config={}, session=None):
fr = fr.split("&amp;namespace=")[0]
name = fr
url = "{}?title=Special:Allpages/{}&namespace={}".format(
config["index"],
config.index,
name,
namespace,
)
@ -109,7 +110,7 @@ def getPageTitlesScraper(config={}, session=None):
fr = fr.split("&amp;namespace=")[0]
name = fr
url = "{}?title=Special:Allpages&from={}&namespace={}".format(
config["index"],
config.index,
name,
namespace,
)
@ -150,7 +151,7 @@ def getPageTitlesScraper(config={}, session=None):
return titles
def getPageTitles(config={}, session=None):
def getPageTitles(config: Config=None, session=None):
"""Get list of page titles"""
# http://en.wikipedia.org/wiki/Special:AllPages
# http://wiki.archiveteam.org/index.php?title=Special:AllPages
@ -158,35 +159,35 @@ def getPageTitles(config={}, session=None):
print(
"Loading page titles from namespaces = %s"
% (
config["namespaces"]
and ",".join([str(i) for i in config["namespaces"]])
config.namespaces
and ",".join([str(i) for i in config.namespaces])
or "None"
)
)
print(
"Excluding titles from namespaces = %s"
% (
config["exnamespaces"]
and ",".join([str(i) for i in config["exnamespaces"]])
config.exnamespaces
and ",".join([str(i) for i in config.exnamespaces])
or "None"
)
)
titles = []
if "api" in config and config["api"]:
if config.api:
try:
titles = getPageTitlesAPI(config=config, session=session)
except:
print("Error: could not get page titles from the API")
titles = getPageTitlesScraper(config=config, session=session)
elif "index" in config and config["index"]:
elif config.index:
titles = getPageTitlesScraper(config=config, session=session)
titlesfilename = "{}-{}-titles.txt".format(
domain2prefix(config=config), config["date"]
domain2prefix(config=config), config.date
)
titlesfile = open(
"{}/{}".format(config["path"], titlesfilename), "wt", encoding="utf-8"
"{}/{}".format(config.path, titlesfilename), "wt", encoding="utf-8"
)
c = 0
for title in titles:
@ -203,13 +204,13 @@ def getPageTitles(config={}, session=None):
return titlesfilename
def readTitles(config={}, start=None, batch=False):
def readTitles(config: Config=None, start=None, batch=False):
"""Read title list from a file, from the title "start" """
titlesfilename = "{}-{}-titles.txt".format(
domain2prefix(config=config), config["date"]
domain2prefix(config=config), config.date
)
titlesfile = open("{}/{}".format(config["path"], titlesfilename), encoding="utf-8")
titlesfile = open("{}/{}".format(config.path, titlesfilename), encoding="utf-8")
titlelist = []
seeking = False

@ -10,9 +10,10 @@ from wikiteam3.dumpgenerator.exceptions import ExportAbortedError, PageMissingEr
from wikiteam3.dumpgenerator.api import handleStatusCode
from wikiteam3.dumpgenerator.log import logerror
from wikiteam3.utils import uprint
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
def getXMLPageCore(headers={}, params={}, config={}, session=None) -> str:
def getXMLPageCore(headers={}, params={}, config: Config=None, session=None) -> str:
""""""
# returns a XML containing params['limit'] revisions (or current only), ending in </mediawiki>
# if retrieving params['limit'] revisions fails, returns a current only version
@ -20,7 +21,7 @@ def getXMLPageCore(headers={}, params={}, config={}, session=None) -> str:
xml = ""
c = 0
maxseconds = 100 # max seconds to wait in a single sleeping
maxretries = config["retries"] # x retries and skip
maxretries = config.retries # x retries and skip
increment = 20 # increment every retry
while not re.search(r"</mediawiki>", str(xml)):
@ -43,15 +44,15 @@ def getXMLPageCore(headers={}, params={}, config={}, session=None) -> str:
' MediaWiki error for "%s", network error or whatever...'
% (params["pages"])
)
if config["failfast"]:
if config.failfast:
print("Exit, it will be for another time")
sys.exit()
# If it's not already what we tried: our last chance, preserve only the last revision...
# config['curonly'] means that the whole dump is configured to save only the last,
# config.curonly means that the whole dump is configured to save only the last,
# params['curonly'] should mean that we've already tried this
# fallback, because it's set by the following if and passed to
# getXMLPageCore
if not config["curonly"] and "curonly" not in params:
if not config.curonly and "curonly" not in params:
print(" Trying to save only the last revision for this page...")
params["curonly"] = 1
logerror(
@ -69,12 +70,12 @@ def getXMLPageCore(headers={}, params={}, config={}, session=None) -> str:
text='Error while retrieving the last revision of "%s". Skipping.'
% (params["pages"]),
)
raise ExportAbortedError(config["index"])
raise ExportAbortedError(config.index)
return "" # empty xml
# FIXME HANDLE HTTP Errors HERE
try:
r = session.post(
url=config["index"], params=params, headers=headers, timeout=10
url=config.index, params=params, headers=headers, timeout=10
)
handleStatusCode(r)
xml = fixBOM(r)
@ -89,7 +90,7 @@ def getXMLPageCore(headers={}, params={}, config={}, session=None) -> str:
return xml
def getXMLPage(config={}, title="", verbose=True, session=None):
def getXMLPage(config: Config=None, title="", verbose=True, session=None):
"""Get the full history (or current only) of a page"""
# if server errors occurs while retrieving the full page history, it may return [oldest OK versions] + last version, excluding middle revisions, so it would be partialy truncated
@ -101,22 +102,22 @@ def getXMLPage(config={}, title="", verbose=True, session=None):
title_ = re.sub(" ", "_", title_)
# do not convert & into %26, title_ = re.sub('&', '%26', title_)
try:
params = {"title": config["export"], "pages": title_, "action": "submit"}
params = {"title": config.export, "pages": title_, "action": "submit"}
except KeyError:
params = {"title": "Special:Export", "pages": title_, "action": "submit"}
if config["curonly"]:
if config.curonly:
params["curonly"] = 1
params["limit"] = 1
else:
params["offset"] = "1" # 1 always < 2000s
params["limit"] = limit
# in other case, do not set params['templates']
if "templates" in config and config["templates"]:
if config.templates:
params["templates"] = 1
xml = getXMLPageCore(params=params, config=config, session=session)
if xml == "":
raise ExportAbortedError(config["index"])
raise ExportAbortedError(config.index)
if "</page>" not in xml:
raise PageMissingError(params["title"], xml)
else:
@ -137,7 +138,7 @@ def getXMLPage(config={}, title="", verbose=True, session=None):
# search for timestamps in xml to avoid analysing empty pages like
# Special:Allpages and the random one
if not config["curonly"] and re.search(r_timestamp, xml):
if not config.curonly and re.search(r_timestamp, xml):
while not truncated and params["offset"]: # next chunk
# get the last timestamp from the acum XML
params["offset"] = re.findall(r_timestamp, xml)[-1]

@ -2,16 +2,16 @@ import re
from wikiteam3.dumpgenerator.cli import Delay
from wikiteam3.dumpgenerator.api import getJSON
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
def getNamespacesScraper(config={}, session=None):
def getNamespacesScraper(config: Config=None, session=None):
"""Hackishly gets the list of namespaces names and ids from the dropdown in the HTML of Special:AllPages"""
"""Function called if no API is available"""
namespaces = config["namespaces"]
namespaces = config.namespaces
namespacenames = {0: ""} # main is 0, no prefix
if namespaces:
r = session.post(
url=config["index"], params={"title": "Special:Allpages"}, timeout=30
url=config.index, params={"title": "Special:Allpages"}, timeout=30
)
raw = r.text
Delay(config=config, session=session)
@ -43,13 +43,13 @@ def getNamespacesScraper(config={}, session=None):
return namespaces, namespacenames
def getNamespacesAPI(config={}, session=None):
def getNamespacesAPI(config: Config=None, session=None):
"""Uses the API to get the list of namespaces names and ids"""
namespaces = config["namespaces"]
namespaces = config.namespaces
namespacenames = {0: ""} # main is 0, no prefix
if namespaces:
r = session.get(
url=config["api"],
url=config.api,
params={
"action": "query",
"meta": "siteinfo",

@ -7,13 +7,13 @@ from wikiteam3.dumpgenerator.exceptions import PageMissingError
from wikiteam3.dumpgenerator.log import logerror
from wikiteam3.dumpgenerator.dump.page.page_titles import readTitles
from wikiteam3.dumpgenerator.dump.page.page_xml import getXMLPage
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
from wikiteam3.utils import cleanXML, undoHTMLEntities
from .xml_header import getXMLHeader
from .xml_revisions import getXMLRevisions
from .xml_truncate import truncateXMLDump
def generateXMLDump(config={}, titles=[], start=None, session=None):
def generateXMLDump(config: Config=None, titles=[], start=None, session=None):
"""Generates a XML dump for a list of titles or from revision IDs"""
# TODO: titles is now unused.
@ -21,8 +21,8 @@ def generateXMLDump(config={}, titles=[], start=None, session=None):
footer = "</mediawiki>\n" # new line at the end
xmlfilename = "{}-{}-{}.xml".format(
domain2prefix(config=config),
config["date"],
config["curonly"] and "current" or "history",
config.date,
config.curonly and "current" or "history",
)
xmlfile = ""
lock = True
@ -33,18 +33,18 @@ def generateXMLDump(config={}, titles=[], start=None, session=None):
"Removing the last chunk of past XML dump: it is probably incomplete."
)
# truncate XML dump if it already exists
truncateXMLDump("{}/{}".format(config["path"], xmlfilename))
truncateXMLDump("{}/{}".format(config.path, xmlfilename))
if config["xmlrevisions"]:
if config.xmlrevisions:
if start:
print(f"WARNING: will try to start the download from title: {start}")
xmlfile = open(
"{}/{}".format(config["path"], xmlfilename), "a", encoding="utf-8"
"{}/{}".format(config.path, xmlfilename), "a", encoding="utf-8"
)
else:
print("\nRetrieving the XML for every page from the beginning\n")
xmlfile = open(
"{}/{}".format(config["path"], xmlfilename), "w", encoding="utf-8"
"{}/{}".format(config.path, xmlfilename), "w", encoding="utf-8"
)
xmlfile.write(header)
try:
@ -76,13 +76,13 @@ def generateXMLDump(config={}, titles=[], start=None, session=None):
# requested complete xml dump
lock = False
xmlfile = open(
"{}/{}".format(config["path"], xmlfilename), "w", encoding="utf-8"
"{}/{}".format(config.path, xmlfilename), "w", encoding="utf-8"
)
xmlfile.write(header)
xmlfile.close()
xmlfile = open(
"{}/{}".format(config["path"], xmlfilename), "a", encoding="utf-8"
"{}/{}".format(config.path, xmlfilename), "a", encoding="utf-8"
)
c = 1
for title in readTitles(config, start):

@ -8,22 +8,22 @@ import requests
from wikiteam3.dumpgenerator.exceptions import ExportAbortedError, PageMissingError
from wikiteam3.dumpgenerator.log import logerror
from wikiteam3.dumpgenerator.dump.page.page_xml import getXMLPage
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
def getXMLHeader(config: dict = {}, session=None) -> Tuple[str, dict]:
def getXMLHeader(config: Config=None, session=None) -> Tuple[str, dict]:
"""Retrieve a random page to extract XML headers (namespace info, etc)"""
# get the header of a random page, to attach it in the complete XML backup
# similar to: <mediawiki xmlns="http://www.mediawiki.org/xml/export-0.3/"
# xmlns:x....
randomtitle = "Main_Page" # previously AMF5LKE43MNFGHKSDMRTJ
print(config["api"])
print(config.api)
xml = ""
if config["xmlrevisions"] and config["api"] and config["api"].endswith("api.php"):
if config.xmlrevisions and config.api and config.api.endswith("api.php"):
try:
print("Getting the XML header from the API")
# Export and exportnowrap exist from MediaWiki 1.15, allpages from 1.18
r = session.get(
config["api"]
config.api
+ "?action=query&export=1&exportnowrap=1&list=allpages&aplimit=1",
timeout=10,
)
@ -31,7 +31,7 @@ def getXMLHeader(config: dict = {}, session=None) -> Tuple[str, dict]:
# Otherwise try without exportnowrap, e.g. Wikia returns a blank page on 1.19
if not re.match(r"\s*<mediawiki", xml):
r = session.get(
config["api"]
config.api
+ "?action=query&export=1&list=allpages&aplimit=1&format=json",
timeout=10,
)
@ -42,7 +42,7 @@ def getXMLHeader(config: dict = {}, session=None) -> Tuple[str, dict]:
if not re.match(r"\s*<mediawiki", xml):
# Do without a generator, use our usual trick of a random page title
r = session.get(
config["api"]
config.api
+ "?action=query&export=1&exportnowrap=1&titles="
+ randomtitle,
timeout=10,
@ -51,7 +51,7 @@ def getXMLHeader(config: dict = {}, session=None) -> Tuple[str, dict]:
# Again try without exportnowrap
if not re.match(r"\s*<mediawiki", xml):
r = session.get(
config["api"]
config.api
+ "?action=query&export=1&format=json&titles="
+ randomtitle,
timeout=10,
@ -81,10 +81,10 @@ def getXMLHeader(config: dict = {}, session=None) -> Tuple[str, dict]:
# http://albens73.fr/wiki/api.php?action=query&meta=siteinfo&siprop=namespacealiases
except ExportAbortedError:
try:
if config["api"]:
if config.api:
print("Trying the local name for the Special namespace instead")
r = session.get(
url=config["api"],
url=config.api,
params={
"action": "query",
"meta": "siteinfo",
@ -93,7 +93,7 @@ def getXMLHeader(config: dict = {}, session=None) -> Tuple[str, dict]:
},
timeout=120,
)
config["export"] = (
config.export = (
json.loads(r.text)["query"]["namespaces"]["-1"]["*"] + ":Export"
)
xml = "".join(
@ -114,12 +114,12 @@ def getXMLHeader(config: dict = {}, session=None) -> Tuple[str, dict]:
header = xml.split("</mediawiki>")[0]
if not re.match(r"\s*<mediawiki", xml):
if config["xmlrevisions"]:
if config.xmlrevisions:
# Try again the old way
print(
"Export test via the API failed. Wiki too old? Trying without xmlrevisions."
)
config["xmlrevisions"] = False
config.xmlrevisions = False
header, config = getXMLHeader(config=config, session=session)
else:
print(xml)

@ -1,4 +1,6 @@
def checkXMLIntegrity(config={}, titles=[], session=None):
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
def checkXMLIntegrity(config: Config=None, titles=[], session=None):
"""Check XML dump integrity, to detect broken XML chunks"""
return
@ -12,10 +14,10 @@ def checkXMLIntegrity(config={}, titles=[], session=None):
file(
"%s/%s-%s-%s.xml"
% (
config["path"],
config.path,
domain2prefix(config=config, session=session),
config["date"],
config["curonly"] and "current" or "history",
config.date,
config.curonly and "current" or "history",
),
"r",
)
@ -43,7 +45,7 @@ def checkXMLIntegrity(config={}, titles=[], session=None):
else:
print("XML dump seems to be corrupted.")
reply = ""
if config["failfast"]:
if config.failfast:
reply = "yes"
while reply.lower() not in ["yes", "y", "no", "n"]:
reply = raw_input("Regenerate a new dump ([yes, y], [no, n])? ")

@ -10,19 +10,19 @@ from wikiteam3.dumpgenerator.log import logerror
from .namespaces import getNamespacesAPI
from wikiteam3.dumpgenerator.dump.page.page_titles import readTitles
from wikiteam3.dumpgenerator.dump.page.page_xml import makeXmlFromPage, makeXmlPageFromRaw
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
def getXMLRevisions(config={}, session=None, allpages=False, start=None):
def getXMLRevisions(config: Config=None, session=None, allpages=False, start=None):
# FIXME: actually figure out the various strategies for each MediaWiki version
apiurl = urlparse(config["api"])
apiurl = urlparse(config.api)
# FIXME: force the protocol we asked for! Or don't verify SSL if we asked HTTP?
# https://github.com/WikiTeam/wikiteam/issues/358
site = mwclient.Site(
apiurl.netloc, apiurl.path.replace("api.php", ""), scheme=apiurl.scheme, pool=session
)
if "all" not in config["namespaces"]:
namespaces = config["namespaces"]
if "all" not in config.namespaces:
namespaces = config.namespaces
else:
namespaces, namespacenames = getNamespacesAPI(config=config, session=session)
@ -40,7 +40,7 @@ def getXMLRevisions(config={}, session=None, allpages=False, start=None):
"arvlimit": 50,
"arvnamespace": namespace,
}
if not config["curonly"]:
if not config.curonly:
# We have to build the XML manually...
# Skip flags, presumably needed to add <minor/> which is in the schema.
# Also missing: parentid and contentformat.
@ -53,15 +53,15 @@ def getXMLRevisions(config={}, session=None, allpages=False, start=None):
while True:
try:
arvrequest = site.api(
http_method=config["http_method"], **arvparams
http_method=config.http_method, **arvparams
)
except requests.exceptions.HTTPError as e:
if (
e.response.status_code == 405
and config["http_method"] == "POST"
and config.http_method == "POST"
):
print("POST request to the API failed, retrying with GET")
config["http_method"] = "GET"
config.http_method = "GET"
continue
except requests.exceptions.ReadTimeout as err:
# Hopefully temporary, just wait a bit and continue with the same request.
@ -89,15 +89,15 @@ def getXMLRevisions(config={}, session=None, allpages=False, start=None):
arvparams["arvprop"] = "ids"
try:
arvrequest = site.api(
http_method=config["http_method"], **arvparams
http_method=config.http_method, **arvparams
)
except requests.exceptions.HTTPError as e:
if (
e.response.status_code == 405
and config["http_method"] == "POST"
and config.http_method == "POST"
):
print("POST request to the API failed, retrying with GET")
config["http_method"] = "GET"
config.http_method = "GET"
continue
exportparams = {
"action": "query",
@ -126,19 +126,19 @@ def getXMLRevisions(config={}, session=None, allpages=False, start=None):
exportparams["revids"] = revid
try:
exportrequest = site.api(
http_method=config["http_method"], **exportparams
http_method=config.http_method, **exportparams
)
except requests.exceptions.HTTPError as e:
if (
e.response.status_code == 405
and config["http_method"] == "POST"
and config.http_method == "POST"
):
print(
"POST request to the API failed, retrying with GET"
)
config["http_method"] = "GET"
config.http_method = "GET"
exportrequest = site.api(
http_method=config["http_method"], **exportparams
http_method=config.http_method, **exportparams
)
# This gives us a self-standing <mediawiki> element
@ -153,19 +153,19 @@ def getXMLRevisions(config={}, session=None, allpages=False, start=None):
arvparams["arvcontinue"] = arvrequest["continue"]["arvcontinue"]
try:
arvrequest = site.api(
http_method=config["http_method"], **arvparams
http_method=config.http_method, **arvparams
)
except requests.exceptions.HTTPError as e:
if (
e.response.status_code == 405
and config["http_method"] == "POST"
and config.http_method == "POST"
):
print(
"POST request to the API failed, retrying with GET"
)
config["http_method"] = "GET"
config.http_method = "GET"
arvrequest = site.api(
http_method=config["http_method"], **arvparams
http_method=config.http_method, **arvparams
)
except requests.exceptions.ReadTimeout as err:
# As above
@ -183,7 +183,7 @@ def getXMLRevisions(config={}, session=None, allpages=False, start=None):
print(e)
# TODO: check whether the KeyError was really for a missing arv API
print("Warning. Could not use allrevisions. Wiki too old?")
if config["curonly"]:
if config.curonly:
# The raw XML export in the API gets a title and gives the latest revision.
# We could also use the allpages API as generator but let's be consistent.
print("Getting titles to export the latest revision for each")
@ -201,17 +201,17 @@ def getXMLRevisions(config={}, session=None, allpages=False, start=None):
}
try:
exportrequest = site.api(
http_method=config["http_method"], **exportparams
http_method=config.http_method, **exportparams
)
except requests.exceptions.HTTPError as e:
if (
e.response.status_code == 405
and config["http_method"] == "POST"
and config.http_method == "POST"
):
print("POST request to the API failed, retrying with GET")
config["http_method"] = "GET"
config.http_method = "GET"
exportrequest = site.api(
http_method=config["http_method"], **exportparams
http_method=config.http_method, **exportparams
)
xml = str(exportrequest["query"]["export"]["*"])
@ -247,16 +247,16 @@ def getXMLRevisions(config={}, session=None, allpages=False, start=None):
"rvprop": "ids|timestamp|user|userid|size|sha1|contentmodel|comment|content",
}
try:
prequest = site.api(http_method=config["http_method"], **pparams)
prequest = site.api(http_method=config.http_method, **pparams)
except requests.exceptions.HTTPError as e:
if (
e.response.status_code == 405
and config["http_method"] == "POST"
and config.http_method == "POST"
):
print("POST request to the API failed, retrying with GET")
config["http_method"] = "GET"
config.http_method = "GET"
exportrequest = site.api(
http_method=config["http_method"], **exportparams
http_method=config.http_method, **exportparams
)
except mwclient.errors.InvalidResponse:
logerror(
@ -306,17 +306,17 @@ def getXMLRevisions(config={}, session=None, allpages=False, start=None):
try:
prequest = site.api(
http_method=config["http_method"], **pparams
http_method=config.http_method, **pparams
)
except requests.exceptions.HTTPError as e:
if (
e.response.status_code == 405
and config["http_method"] == "POST"
and config.http_method == "POST"
):
print("POST request to the API failed, retrying with GET")
config["http_method"] = "GET"
config.http_method = "GET"
prequest = site.api(
http_method=config["http_method"], **pparams
http_method=config.http_method, **pparams
)
# We're done iterating for this title or titles.

@ -1,10 +1,11 @@
import datetime
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
def logerror(config={},to_stdout=False , text="") -> None:
def logerror(config: Config=None,to_stdout=False , text="") -> None:
"""Log error in errors.log"""
if text:
with open("%s/errors.log" % (config["path"]), "a", encoding="utf-8") as outfile:
with open("%s/errors.log" % (config.path), "a", encoding="utf-8") as outfile:
output = "{}: {}\n".format(
datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
text,

@ -27,6 +27,8 @@ from io import BytesIO
from pathlib import Path
from wikiteam3.utils import getUserAgent, domain2prefix
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
import requests
from internetarchive import get_item
@ -79,7 +81,7 @@ def file_md5(path):
return digest.hexdigest()
def upload(wikis, logfile, config={}, uploadeddumps=[]):
def upload(wikis, logfile, config: Config=None, uploadeddumps=[]):
ia_keys = read_ia_keys(config)
headers = {"User-Agent": getUserAgent()}

@ -4,4 +4,4 @@ from .util import removeIP, cleanXML, cleanHTML, undoHTMLEntities
from .user_agent import getUserAgent
from .domain import domain2prefix
from .truncate import truncateFilename
from .wiki_avoid import avoidWikimediaProjects
from .wiki_avoid import avoidWikimediaProjects

@ -1,15 +1,16 @@
import re
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
def domain2prefix(config={}, session=None):
def domain2prefix(config: Config=None, session=None):
"""Convert domain name to a valid prefix filename."""
# At this point, both api and index are supposed to be defined
domain = ""
if config["api"]:
domain = config["api"]
elif config["index"]:
domain = config["index"]
if config.api:
domain = config.api
elif config.index:
domain = config.index
domain = domain.lower()
domain = re.sub(r"(https?://|www\.|/index\.php.*|/api\.php.*)", "", domain)

@ -1,16 +1,17 @@
import re
import sys
from wikiteam3.dumpgenerator.config import Config, DefaultConfig
def avoidWikimediaProjects(config={}, other={}):
def avoidWikimediaProjects(config: Config=None, other={}):
"""Skip Wikimedia projects and redirect to the dumps website"""
# notice about wikipedia dumps
url = ""
if config["api"]:
url = url + config["api"]
if config["index"]:
url = url + config["index"]
if config.api:
url = url + config.api
if config.index:
url = url + config.index
if re.findall(
r"(?i)(wikipedia|wikisource|wiktionary|wikibooks|wikiversity|wikimedia|wikispecies|wikiquote|wikinews|wikidata|wikivoyage)\.org",
url,

Loading…
Cancel
Save