You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
yt-dlp/yt_dlp/update.py

424 lines
15 KiB
Python

import atexit
import contextlib
import hashlib
import json
import os
import platform
import re
import subprocess
import sys
import urllib.error
from zipimport import zipimporter
from .compat import functools # isort: split
from .compat import compat_realpath, compat_shlex_quote
from .utils import (
Popen,
cached_method,
deprecation_warning,
remove_end,
remove_start,
sanitized_Request,
shell_quote,
system_identifier,
version_tuple,
)
from .version import CHANNEL, UPDATE_HINT, VARIANT, __version__
UPDATE_SOURCES = {
'stable': 'yt-dlp/yt-dlp',
'nightly': 'yt-dlp/yt-dlp-nightly-builds',
}
_VERSION_RE = re.compile(r'(\d+\.)*\d+')
API_BASE_URL = 'https://api.github.com/repos'
# Backwards compatibility variables for the current channel
REPOSITORY = UPDATE_SOURCES[CHANNEL]
API_URL = f'{API_BASE_URL}/{REPOSITORY}/releases'
@functools.cache
def _get_variant_and_executable_path():
"""@returns (variant, executable_path)"""
if getattr(sys, 'frozen', False):
path = sys.executable
if not hasattr(sys, '_MEIPASS'):
return 'py2exe', path
elif sys._MEIPASS == os.path.dirname(path):
return f'{sys.platform}_dir', path
elif sys.platform == 'darwin':
machine = '_legacy' if version_tuple(platform.mac_ver()[0]) < (10, 15) else ''
else:
machine = f'_{platform.machine().lower()}'
# Ref: https://en.wikipedia.org/wiki/Uname#Examples
if machine[1:] in ('x86', 'x86_64', 'amd64', 'i386', 'i686'):
machine = '_x86' if platform.architecture()[0][:2] == '32' else ''
return f'{remove_end(sys.platform, "32")}{machine}_exe', path
path = os.path.dirname(__file__)
if isinstance(__loader__, zipimporter):
return 'zip', os.path.join(path, '..')
elif (os.path.basename(sys.argv[0]) in ('__main__.py', '-m')
and os.path.exists(os.path.join(path, '../.git/HEAD'))):
return 'source', path
return 'unknown', path
def detect_variant():
return VARIANT or _get_variant_and_executable_path()[0]
@functools.cache
def current_git_head():
if detect_variant() != 'source':
return
with contextlib.suppress(Exception):
stdout, _, _ = Popen.run(
['git', 'rev-parse', '--short', 'HEAD'],
text=True, cwd=os.path.dirname(os.path.abspath(__file__)),
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if re.fullmatch('[0-9a-f]+', stdout.strip()):
return stdout.strip()
_FILE_SUFFIXES = {
'zip': '',
'py2exe': '_min.exe',
'win_exe': '.exe',
'win_x86_exe': '_x86.exe',
'darwin_exe': '_macos',
'darwin_legacy_exe': '_macos_legacy',
'linux_exe': '_linux',
'linux_aarch64_exe': '_linux_aarch64',
'linux_armv7l_exe': '_linux_armv7l',
}
_NON_UPDATEABLE_REASONS = {
**{variant: None for variant in _FILE_SUFFIXES}, # Updatable
**{variant: f'Auto-update is not supported for unpackaged {name} executable; Re-download the latest release'
for variant, name in {'win32_dir': 'Windows', 'darwin_dir': 'MacOS', 'linux_dir': 'Linux'}.items()},
'source': 'You cannot update when running from source code; Use git to pull the latest changes',
'unknown': 'You installed yt-dlp with a package manager or setup.py; Use that to update',
'other': 'You are using an unofficial build of yt-dlp; Build the executable again',
}
def is_non_updateable():
if UPDATE_HINT:
return UPDATE_HINT
return _NON_UPDATEABLE_REASONS.get(
detect_variant(), _NON_UPDATEABLE_REASONS['unknown' if VARIANT else 'other'])
def _sha256_file(path):
h = hashlib.sha256()
mv = memoryview(bytearray(128 * 1024))
with open(os.path.realpath(path), 'rb', buffering=0) as f:
for n in iter(lambda: f.readinto(mv), 0):
h.update(mv[:n])
return h.hexdigest()
class Updater:
_exact = True
def __init__(self, ydl, target=None):
self.ydl = ydl
self.target_channel, sep, self.target_tag = (target or CHANNEL).rpartition('@')
if not sep and self.target_tag in UPDATE_SOURCES: # stable => stable@latest
self.target_channel, self.target_tag = self.target_tag, None
elif not self.target_channel:
self.target_channel = CHANNEL
if not self.target_tag:
self.target_tag, self._exact = 'latest', False
elif self.target_tag != 'latest':
self.target_tag = f'tags/{self.target_tag}'
@property
def _target_repo(self):
try:
return UPDATE_SOURCES[self.target_channel]
except KeyError:
return self._report_error(
f'Invalid update channel {self.target_channel!r} requested. '
f'Valid channels are {", ".join(UPDATE_SOURCES)}', True)
def _version_compare(self, a, b, channel=CHANNEL):
if channel != self.target_channel:
return False
if _VERSION_RE.fullmatch(f'{a}.{b}'):
a, b = version_tuple(a), version_tuple(b)
return a == b if self._exact else a >= b
return a == b
@functools.cached_property
def _tag(self):
if self._version_compare(self.current_version, self.latest_version):
return self.target_tag
identifier = f'{detect_variant()} {self.target_channel} {system_identifier()}'
for line in self._download('_update_spec', 'latest').decode().splitlines():
if not line.startswith('lock '):
continue
_, tag, pattern = line.split(' ', 2)
if re.match(pattern, identifier):
if not self._exact:
return f'tags/{tag}'
elif self.target_tag == 'latest' or not self._version_compare(
tag, self.target_tag[5:], channel=self.target_channel):
self._report_error(
f'yt-dlp cannot be updated above {tag} since you are on an older Python version', True)
return f'tags/{self.current_version}'
return self.target_tag
@cached_method
def _get_version_info(self, tag):
url = f'{API_BASE_URL}/{self._target_repo}/releases/{tag}'
self.ydl.write_debug(f'Fetching release info: {url}')
return json.loads(self.ydl.urlopen(sanitized_Request(url, headers={
'Accept': 'application/vnd.github+json',
'User-Agent': 'yt-dlp',
'X-GitHub-Api-Version': '2022-11-28',
})).read().decode())
@property
def current_version(self):
"""Current version"""
return __version__
@staticmethod
def _label(channel, tag):
"""Label for a given channel and tag"""
return f'{channel}@{remove_start(tag, "tags/")}'
def _get_actual_tag(self, tag):
if tag.startswith('tags/'):
return tag[5:]
return self._get_version_info(tag)['tag_name']
@property
def new_version(self):
"""Version of the latest release we can update to"""
return self._get_actual_tag(self._tag)
@property
def latest_version(self):
"""Version of the target release"""
return self._get_actual_tag(self.target_tag)
@property
def has_update(self):
"""Whether there is an update available"""
return not self._version_compare(self.current_version, self.new_version)
@functools.cached_property
def filename(self):
"""Filename of the executable"""
return compat_realpath(_get_variant_and_executable_path()[1])
def _download(self, name, tag):
slug = 'latest/download' if tag == 'latest' else f'download/{tag[5:]}'
url = f'https://github.com/{self._target_repo}/releases/{slug}/{name}'
self.ydl.write_debug(f'Downloading {name} from {url}')
return self.ydl.urlopen(url).read()
@functools.cached_property
def release_name(self):
"""The release filename"""
return f'yt-dlp{_FILE_SUFFIXES[detect_variant()]}'
@functools.cached_property
def release_hash(self):
"""Hash of the latest release"""
hash_data = dict(ln.split()[::-1] for ln in self._download('SHA2-256SUMS', self._tag).decode().splitlines())
return hash_data[self.release_name]
def _report_error(self, msg, expected=False):
self.ydl.report_error(msg, tb=False if expected else None)
self.ydl._download_retcode = 100
def _report_permission_error(self, file):
self._report_error(f'Unable to write to {file}; Try running as administrator', True)
def _report_network_error(self, action, delim=';'):
self._report_error(
f'Unable to {action}{delim} visit '
f'https://github.com/{self._target_repo}/releases/{self.target_tag.replace("tags/", "tag/")}', True)
def check_update(self):
"""Report whether there is an update available"""
if not self._target_repo:
return False
try:
self.ydl.to_screen((
f'Available version: {self._label(self.target_channel, self.latest_version)}, ' if self.target_tag == 'latest' else ''
) + f'Current version: {self._label(CHANNEL, self.current_version)}')
except Exception:
return self._report_network_error('obtain version info', delim='; Please try again later or')
if not is_non_updateable():
self.ydl.to_screen(f'Current Build Hash: {_sha256_file(self.filename)}')
if self.has_update:
return True
if self.target_tag == self._tag:
self.ydl.to_screen(f'yt-dlp is up to date ({self._label(CHANNEL, self.current_version)})')
elif not self._exact:
self.ydl.report_warning('yt-dlp cannot be updated any further since you are on an older Python version')
return False
def update(self):
"""Update yt-dlp executable to the latest version"""
if not self.check_update():
return
err = is_non_updateable()
if err:
return self._report_error(err, True)
self.ydl.to_screen(f'Updating to {self._label(self.target_channel, self.new_version)} ...')
if (_VERSION_RE.fullmatch(self.target_tag[5:])
and version_tuple(self.target_tag[5:]) < (2023, 3, 2)):
self.ydl.report_warning('You are downgrading to a version without --update-to')
directory = os.path.dirname(self.filename)
if not os.access(self.filename, os.W_OK):
return self._report_permission_error(self.filename)
elif not os.access(directory, os.W_OK):
return self._report_permission_error(directory)
new_filename, old_filename = f'{self.filename}.new', f'{self.filename}.old'
if detect_variant() == 'zip': # Can be replaced in-place
new_filename, old_filename = self.filename, None
try:
if os.path.exists(old_filename or ''):
os.remove(old_filename)
except OSError:
return self._report_error('Unable to remove the old version')
try:
newcontent = self._download(self.release_name, self._tag)
except Exception as e:
if isinstance(e, urllib.error.HTTPError) and e.code == 404:
return self._report_error(
f'The requested tag {self._label(self.target_channel, self.target_tag)} does not exist', True)
return self._report_network_error(f'fetch updates: {e}')
try:
expected_hash = self.release_hash
except Exception:
self.ydl.report_warning('no hash information found for the release')
else:
if hashlib.sha256(newcontent).hexdigest() != expected_hash:
return self._report_network_error('verify the new executable')
try:
with open(new_filename, 'wb') as outf:
outf.write(newcontent)
except OSError:
return self._report_permission_error(new_filename)
if old_filename:
mask = os.stat(self.filename).st_mode
try:
os.rename(self.filename, old_filename)
except OSError:
return self._report_error('Unable to move current version')
try:
os.rename(new_filename, self.filename)
except OSError:
self._report_error('Unable to overwrite current version')
return os.rename(old_filename, self.filename)
variant = detect_variant()
if variant.startswith('win') or variant == 'py2exe':
atexit.register(Popen, f'ping 127.0.0.1 -n 5 -w 1000 & del /F "{old_filename}"',
shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
elif old_filename:
try:
os.remove(old_filename)
except OSError:
self._report_error('Unable to remove the old version')
try:
os.chmod(self.filename, mask)
except OSError:
return self._report_error(
f'Unable to set permissions. Run: sudo chmod a+rx {compat_shlex_quote(self.filename)}')
self.ydl.to_screen(f'Updated yt-dlp to {self._label(self.target_channel, self.new_version)}')
return True
@functools.cached_property
def cmd(self):
"""The command-line to run the executable, if known"""
# There is no sys.orig_argv in py < 3.10. Also, it can be [] when frozen
if getattr(sys, 'orig_argv', None):
return sys.orig_argv
elif getattr(sys, 'frozen', False):
return sys.argv
def restart(self):
"""Restart the executable"""
assert self.cmd, 'Must be frozen or Py >= 3.10'
self.ydl.write_debug(f'Restarting: {shell_quote(self.cmd)}')
_, _, returncode = Popen.run(self.cmd)
return returncode
def run_update(ydl):
"""Update the program file with the latest version from the repository
@returns Whether there was a successful update (No update = False)
"""
return Updater(ydl).update()
# Deprecated
def update_self(to_screen, verbose, opener):
import traceback
deprecation_warning(f'"{__name__}.update_self" is deprecated and may be removed '
f'in a future version. Use "{__name__}.run_update(ydl)" instead')
printfn = to_screen
class FakeYDL():
to_screen = printfn
def report_warning(self, msg, *args, **kwargs):
return printfn(f'WARNING: {msg}', *args, **kwargs)
def report_error(self, msg, tb=None):
printfn(f'ERROR: {msg}')
if not verbose:
return
if tb is None:
# Copied from YoutubeDL.trouble
if sys.exc_info()[0]:
tb = ''
if hasattr(sys.exc_info()[1], 'exc_info') and sys.exc_info()[1].exc_info[0]:
tb += ''.join(traceback.format_exception(*sys.exc_info()[1].exc_info))
tb += traceback.format_exc()
else:
tb_data = traceback.format_list(traceback.extract_stack())
tb = ''.join(tb_data)
if tb:
printfn(tb)
def write_debug(self, msg, *args, **kwargs):
printfn(f'[debug] {msg}', *args, **kwargs)
def urlopen(self, url):
return opener.open(url)
return run_update(FakeYDL())
__all__ = ['Updater']