mirror of
https://github.com/brycedrennan/imaginAIry
synced 2024-10-31 03:20:40 +00:00
229 lines
6.3 KiB
Python
229 lines
6.3 KiB
Python
import contextlib
|
|
import csv
|
|
import gc
|
|
import logging
|
|
import os
|
|
import sys
|
|
from functools import partialmethod
|
|
from shutil import rmtree
|
|
|
|
import pytest
|
|
import responses
|
|
import torch.cuda
|
|
from tqdm import tqdm
|
|
from urllib3 import HTTPConnectionPool
|
|
|
|
from imaginairy import api
|
|
from imaginairy.api import imagine
|
|
from imaginairy.schema import ImaginePrompt
|
|
from imaginairy.utils import (
|
|
fix_torch_group_norm,
|
|
fix_torch_nn_layer_norm,
|
|
get_device,
|
|
platform_appropriate_autocast,
|
|
)
|
|
from imaginairy.utils.log_utils import (
|
|
configure_logging,
|
|
suppress_annoying_logs_and_warnings,
|
|
)
|
|
from tests import TESTS_FOLDER
|
|
|
|
if "pytest" in str(sys.argv):
|
|
suppress_annoying_logs_and_warnings()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# SOLVERS_FOR_TESTING = SOLVER_TYPE_OPTIONS
|
|
# if get_device() == "mps:0":
|
|
# SOLVERS_FOR_TESTING = ["plms", "k_euler_a"]
|
|
# elif get_device() == "cpu":
|
|
# SOLVERS_FOR_TESTING = []
|
|
|
|
SOLVERS_FOR_TESTING = ["ddim", "dpmpp"]
|
|
|
|
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
def _pre_setup():
|
|
api.IMAGINAIRY_SAFETY_MODE = "disabled"
|
|
suppress_annoying_logs_and_warnings()
|
|
test_output_folder = f"{TESTS_FOLDER}/test_output"
|
|
|
|
# delete the testoutput folder and recreate it
|
|
with contextlib.suppress(FileNotFoundError):
|
|
rmtree(test_output_folder)
|
|
os.makedirs(test_output_folder, exist_ok=True)
|
|
|
|
orig_urlopen = HTTPConnectionPool.urlopen
|
|
|
|
def urlopen_tattle(self, method, url, *args, **kwargs):
|
|
# traceback.print_stack()
|
|
# current_test = os.environ.get("PYTEST_CURRENT_TEST", "")
|
|
# print(f"{current_test} {method} {self.host}{url}")
|
|
result = orig_urlopen(self, method, url, *args, **kwargs)
|
|
|
|
# raise HTTPError("NO NETWORK CALLS")
|
|
return result
|
|
|
|
HTTPConnectionPool.urlopen = urlopen_tattle
|
|
tqdm.__init__ = partialmethod(tqdm.__init__, disable=True)
|
|
|
|
# real_randn = torch.randn
|
|
# def randn_tattle(*args, **kwargs):
|
|
# print("RANDN CALL RANDN CALL")
|
|
# traceback.print_stack()
|
|
# return real_randn(*args, **kwargs)
|
|
#
|
|
# torch.randn = randn_tattle
|
|
configure_logging("DEBUG")
|
|
|
|
with fix_torch_nn_layer_norm(), fix_torch_group_norm(), platform_appropriate_autocast():
|
|
yield
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _reset_get_device():
|
|
get_device.cache_clear()
|
|
|
|
|
|
@pytest.fixture()
|
|
def filename_base_for_outputs(request):
|
|
filename_base = f"{TESTS_FOLDER}/test_output/{request.node.name}_"
|
|
return filename_base
|
|
|
|
|
|
@pytest.fixture()
|
|
def filename_base_for_orig_outputs(request):
|
|
filename_base = f"{TESTS_FOLDER}/test_output/{request.node.originalname}_"
|
|
return filename_base
|
|
|
|
|
|
@pytest.fixture(params=SOLVERS_FOR_TESTING)
|
|
def solver_type(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture()
|
|
def mocked_responses():
|
|
with responses.RequestsMock() as rsps:
|
|
yield rsps
|
|
|
|
|
|
def pytest_addoption(parser):
|
|
parser.addoption(
|
|
"--subset",
|
|
action="store",
|
|
default=None,
|
|
help="Runs an exclusive subset of tests: '1/3', '2/3', '3/3'. Useful for distributed testing",
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def default_model_loaded():
|
|
"""
|
|
Just to make sure default weights are downloaded before the test runs
|
|
|
|
"""
|
|
prompt = ImaginePrompt(
|
|
"dogs lying on a hot pink couch",
|
|
size=64,
|
|
steps=2,
|
|
seed=1,
|
|
solver_type="ddim",
|
|
)
|
|
|
|
next(imagine(prompt))
|
|
|
|
|
|
cuda_tests_node_ids = []
|
|
cuda_test_tracker_filepath = f"{TESTS_FOLDER}/data/cuda-tests.csv"
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def detect_cuda_tests(request):
|
|
if torch.cuda.is_available():
|
|
torch.cuda.reset_peak_memory_stats()
|
|
start_memory = torch.cuda.max_memory_allocated()
|
|
yield
|
|
if torch.cuda.is_available():
|
|
end_memory = torch.cuda.max_memory_allocated()
|
|
memory_diff = end_memory - start_memory
|
|
if memory_diff > 0:
|
|
test_id = request.node.nodeid
|
|
print(f"Test {test_id} used {memory_diff} bytes of GPU memory")
|
|
cuda_tests_node_ids.append(test_id)
|
|
|
|
torch.cuda.empty_cache()
|
|
gc.collect()
|
|
|
|
|
|
@pytest.hookimpl()
|
|
def pytest_collection_modifyitems(config, items):
|
|
"""Only select a subset of tests to run, based on the --subset option."""
|
|
|
|
node_ids_to_mark = read_stored_cuda_test_nodes()
|
|
for item in items:
|
|
if item.nodeid in node_ids_to_mark:
|
|
item.add_marker(pytest.mark.gputest)
|
|
|
|
filtered_node_ids = set()
|
|
node_ids = [f.nodeid for f in items]
|
|
node_ids.sort()
|
|
subset = config.getoption("--subset")
|
|
|
|
if subset:
|
|
partition_no, total_partitions = subset.split("/")
|
|
partition_no, total_partitions = int(partition_no), int(total_partitions)
|
|
if partition_no < 1 or partition_no > total_partitions:
|
|
raise ValueError("Invalid subset")
|
|
for i, node_id in enumerate(node_ids):
|
|
if i % total_partitions == partition_no - 1:
|
|
filtered_node_ids.add(node_id)
|
|
|
|
items[:] = [i for i in items if i.nodeid in filtered_node_ids]
|
|
|
|
print(
|
|
f"Running subset {partition_no}/{total_partitions} {len(filtered_node_ids)} tests:"
|
|
)
|
|
filtered_node_ids = list(filtered_node_ids)
|
|
filtered_node_ids.sort()
|
|
for n in filtered_node_ids:
|
|
print(f" {n}")
|
|
|
|
|
|
def pytest_sessionstart(session):
|
|
from imaginairy.utils.debug_info import get_debug_info
|
|
|
|
debug_info = get_debug_info()
|
|
|
|
for k, v in debug_info.items():
|
|
if k == "nvidia_smi":
|
|
continue
|
|
k += ":"
|
|
print(f"{k: <30} {v}")
|
|
|
|
if "nvidia_smi" in debug_info:
|
|
print(debug_info["nvidia_smi"])
|
|
|
|
|
|
def pytest_sessionfinish(session, exitstatus):
|
|
existing_node_ids = read_stored_cuda_test_nodes()
|
|
updated_node_ids = existing_node_ids.union(set(cuda_tests_node_ids))
|
|
|
|
# Write updated, sorted list of node IDs to file
|
|
with open(cuda_test_tracker_filepath, "w", newline="") as file:
|
|
writer = csv.writer(file, lineterminator="\n")
|
|
for node_id in sorted(updated_node_ids):
|
|
writer.writerow([node_id])
|
|
|
|
|
|
def read_stored_cuda_test_nodes():
|
|
node_ids = set()
|
|
try:
|
|
with open(cuda_test_tracker_filepath, newline="") as file:
|
|
reader = csv.reader(file)
|
|
for row in reader:
|
|
node_ids.add(row[0])
|
|
except FileNotFoundError:
|
|
pass # File does not exist yet
|
|
return node_ids
|