Move try_rm to test helpers

This commit is contained in:
Philipp Hagemeister 2013-10-06 05:47:17 +02:00
parent 226113c880
commit f4aac741d5
2 changed files with 26 additions and 19 deletions

View File

@ -1,3 +1,4 @@
import errno
import io import io
import json import json
import os.path import os.path
@ -22,18 +23,33 @@ PARAMETERS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "para
with io.open(PARAMETERS_FILE, encoding='utf-8') as pf: with io.open(PARAMETERS_FILE, encoding='utf-8') as pf:
parameters = json.load(pf) parameters = json.load(pf)
def try_rm(filename):
""" Remove a file if it exists """
try:
os.remove(filename)
except OSError as ose:
if ose.errno != errno.ENOENT:
raise
class FakeYDL(YoutubeDL): class FakeYDL(YoutubeDL):
def __init__(self): def __init__(self):
self.result = []
# Different instances of the downloader can't share the same dictionary # Different instances of the downloader can't share the same dictionary
# some test set the "sublang" parameter, which would break the md5 checks. # some test set the "sublang" parameter, which would break the md5 checks.
self.params = dict(parameters) params = dict(parameters)
def to_screen(self, s): super(FakeYDL, self).__init__(params)
self.result = []
def to_screen(self, s, skip_eol=None):
print(s) print(s)
def trouble(self, s, tb=None): def trouble(self, s, tb=None):
raise Exception(s) raise Exception(s)
def download(self, x): def download(self, x):
self.result.append(x) self.result.append(x)
def expect_warning(self, regex): def expect_warning(self, regex):
# Silence an expected warning matching a regex # Silence an expected warning matching a regex
old_report_warning = self.report_warning old_report_warning = self.report_warning

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python #!/usr/bin/env python
import errno
import hashlib import hashlib
import io import io
import os import os
@ -28,14 +27,6 @@ opener = compat_urllib_request.build_opener(proxy_handler, cookie_processor, You
compat_urllib_request.install_opener(opener) compat_urllib_request.install_opener(opener)
socket.setdefaulttimeout(10) socket.setdefaulttimeout(10)
def _try_rm(filename):
""" Remove a file if it exists """
try:
os.remove(filename)
except OSError as ose:
if ose.errno != errno.ENOENT:
raise
md5 = lambda s: hashlib.md5(s.encode('utf-8')).hexdigest() md5 = lambda s: hashlib.md5(s.encode('utf-8')).hexdigest()
class YoutubeDL(youtube_dl.YoutubeDL): class YoutubeDL(youtube_dl.YoutubeDL):
@ -54,7 +45,7 @@ def _file_md5(fn):
with open(fn, 'rb') as f: with open(fn, 'rb') as f:
return hashlib.md5(f.read()).hexdigest() return hashlib.md5(f.read()).hexdigest()
from helper import get_testcases from helper import get_testcases, try_rm
defs = get_testcases() defs = get_testcases()
with io.open(PARAMETERS_FILE, encoding='utf-8') as pf: with io.open(PARAMETERS_FILE, encoding='utf-8') as pf:
@ -97,9 +88,9 @@ def generator(test_case):
test_cases = test_case.get('playlist', [test_case]) test_cases = test_case.get('playlist', [test_case])
for tc in test_cases: for tc in test_cases:
_try_rm(tc['file']) try_rm(tc['file'])
_try_rm(tc['file'] + '.part') try_rm(tc['file'] + '.part')
_try_rm(tc['file'] + '.info.json') try_rm(tc['file'] + '.info.json')
try: try:
for retry in range(1, RETRIES + 1): for retry in range(1, RETRIES + 1):
try: try:
@ -145,9 +136,9 @@ def generator(test_case):
self.assertTrue(key in info_dict.keys() and info_dict[key]) self.assertTrue(key in info_dict.keys() and info_dict[key])
finally: finally:
for tc in test_cases: for tc in test_cases:
_try_rm(tc['file']) try_rm(tc['file'])
_try_rm(tc['file'] + '.part') try_rm(tc['file'] + '.part')
_try_rm(tc['file'] + '.info.json') try_rm(tc['file'] + '.info.json')
return test_template return test_template