imaginAIry/imaginairy/utils.py

181 lines
4.8 KiB
Python
Raw Normal View History

2022-09-08 03:59:30 +00:00
import importlib
import logging
import os.path
import platform
from contextlib import contextmanager, nullcontext
2022-09-08 03:59:30 +00:00
from functools import lru_cache
from typing import List, Optional
2022-09-08 03:59:30 +00:00
import requests
2022-09-08 03:59:30 +00:00
import torch
from torch import Tensor, autocast
2022-09-16 16:24:24 +00:00
from torch.nn import functional
2022-09-13 07:46:37 +00:00
from torch.overrides import handle_torch_function, has_torch_function_variadic
from transformers import cached_path
2022-09-08 03:59:30 +00:00
logger = logging.getLogger(__name__)
2022-09-08 03:59:30 +00:00
@lru_cache()
def get_device():
if torch.cuda.is_available():
return "cuda"
2022-09-16 16:24:24 +00:00
if torch.backends.mps.is_available():
2022-09-17 19:24:27 +00:00
return "mps:0"
2022-09-16 16:24:24 +00:00
return "cpu"
2022-09-08 03:59:30 +00:00
@lru_cache()
def get_device_name(device_type):
if device_type == "cuda":
return torch.cuda.get_device_name(0)
return platform.processor()
def log_params(model):
2022-09-08 03:59:30 +00:00
total_params = sum(p.numel() for p in model.parameters())
logger.debug(f"{model.__class__.__name__} has {total_params * 1.e-6:.2f} M params.")
2022-09-08 03:59:30 +00:00
def instantiate_from_config(config):
2022-09-16 16:24:24 +00:00
if "target" not in config:
2022-09-08 03:59:30 +00:00
if config == "__is_first_stage__":
return None
2022-09-16 16:24:24 +00:00
if config == "__is_unconditional__":
2022-09-08 03:59:30 +00:00
return None
raise KeyError("Expected key `target` to instantiate.")
2022-09-16 16:24:24 +00:00
return get_obj_from_str(config["target"])(**config.get("params", {}))
2022-09-08 03:59:30 +00:00
def get_obj_from_str(string, reload=False):
module, cls = string.rsplit(".", 1)
if reload:
module_imp = importlib.import_module(module)
importlib.reload(module_imp)
return getattr(importlib.import_module(module, package=None), cls)
@contextmanager
def platform_appropriate_autocast(precision="autocast"):
"""
Allow calculations to run in mixed precision, which can be faster
"""
precision_scope = nullcontext
if precision == "autocast" and get_device() in ("cuda", "cpu"):
precision_scope = autocast
with precision_scope(get_device()):
yield
def _fixed_layer_norm(
2022-09-16 16:24:24 +00:00
input: Tensor, # noqa
normalized_shape: List[int],
weight: Optional[Tensor] = None,
bias: Optional[Tensor] = None,
eps: float = 1e-5,
) -> Tensor:
2022-09-16 16:24:24 +00:00
"""
Applies Layer Normalization for last certain number of dimensions.
See :class:`~torch.nn.LayerNorm` for details.
"""
if has_torch_function_variadic(input, weight, bias):
return handle_torch_function(
_fixed_layer_norm,
(input, weight, bias),
input,
normalized_shape,
weight=weight,
bias=bias,
eps=eps,
)
return torch.layer_norm(
input.contiguous(),
normalized_shape,
weight,
bias,
eps,
torch.backends.cudnn.enabled,
)
@contextmanager
def fix_torch_nn_layer_norm():
"""https://github.com/CompVis/stable-diffusion/issues/25#issuecomment-1221416526"""
orig_function = functional.layer_norm
functional.layer_norm = _fixed_layer_norm
try:
yield
finally:
functional.layer_norm = orig_function
@contextmanager
def fix_torch_group_norm():
"""
Patch group_norm to cast the weights to the same type as the inputs
From what I can understand all the other repos just switch to full precision instead
of addressing this. I think this would make things slower but I'm not sure.
https://github.com/pytorch/pytorch/pull/81852
"""
orig_group_norm = functional.group_norm
def _group_norm_wrapper(
input: Tensor, # noqa
num_groups: int,
weight: Optional[Tensor] = None,
bias: Optional[Tensor] = None,
eps: float = 1e-5,
) -> Tensor:
if weight is not None and weight.dtype != input.dtype:
weight = weight.to(input.dtype)
if bias is not None and bias.dtype != input.dtype:
bias = bias.to(input.dtype)
return orig_group_norm(
input=input, num_groups=num_groups, weight=weight, bias=bias, eps=eps
)
functional.group_norm = _group_norm_wrapper
try:
yield
finally:
functional.group_norm = orig_group_norm
def get_cache_dir():
xdg_cache_home = os.getenv("XDG_CACHE_HOME", None)
if xdg_cache_home is None:
user_home = os.getenv("HOME", None)
if user_home:
xdg_cache_home = os.path.join(user_home, ".cache")
if xdg_cache_home is not None:
return os.path.join(xdg_cache_home, "imaginairy", "weights")
return os.path.join(os.path.dirname(__file__), ".cached-downloads")
def get_cached_url_path(url):
try:
return cached_path(url)
except OSError:
pass
filename = url.split("/")[-1]
dest = get_cache_dir()
os.makedirs(dest, exist_ok=True)
dest_path = os.path.join(dest, filename)
if os.path.exists(dest_path):
return dest_path
2022-09-16 16:24:24 +00:00
r = requests.get(url) # noqa
with open(dest_path, "wb") as f:
f.write(r.content)
return dest_path