mirror of
https://github.com/brycedrennan/imaginAIry
synced 2024-10-31 03:20:40 +00:00
298 lines
10 KiB
Python
298 lines
10 KiB
Python
import logging
|
|
|
|
import numpy as np
|
|
import pytorch_lightning as pl
|
|
import torch
|
|
import torch.nn as nn
|
|
from einops import rearrange
|
|
|
|
from imaginairy.modules.diffusion.model import Encoder, Decoder
|
|
from imaginairy.modules.distributions import DiagonalGaussianDistribution
|
|
from imaginairy.utils import instantiate_from_config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class VectorQuantizer(nn.Module):
|
|
"""
|
|
Improved version over original VectorQuantizer, can be used as a drop-in replacement. Mostly
|
|
avoids costly matrix multiplications and allows for post-hoc remapping of indices.
|
|
|
|
https://github.com/CompVis/taming-transformers/blob/141eb746f567a731f71cd703796d4d53a323f45f/taming/modules/vqvae/quantize.py#L213
|
|
"""
|
|
|
|
# NOTE: due to a bug the beta term was applied to the wrong term. for
|
|
# backwards compatibility we use the buggy version by default, but you can
|
|
# specify legacy=False to fix it.
|
|
def __init__(
|
|
self,
|
|
n_e,
|
|
e_dim,
|
|
beta,
|
|
remap=None,
|
|
unknown_index="random",
|
|
sane_index_shape=False,
|
|
legacy=True,
|
|
):
|
|
super().__init__()
|
|
self.n_e = n_e
|
|
self.e_dim = e_dim
|
|
self.beta = beta
|
|
self.legacy = legacy
|
|
|
|
self.embedding = nn.Embedding(self.n_e, self.e_dim)
|
|
self.embedding.weight.data.uniform_(-1.0 / self.n_e, 1.0 / self.n_e)
|
|
|
|
self.remap = remap
|
|
if self.remap is not None:
|
|
self.register_buffer("used", torch.tensor(np.load(self.remap)))
|
|
self.re_embed = self.used.shape[0]
|
|
self.unknown_index = unknown_index # "random" or "extra" or integer
|
|
if self.unknown_index == "extra":
|
|
self.unknown_index = self.re_embed
|
|
self.re_embed = self.re_embed + 1
|
|
print(
|
|
f"Remapping {self.n_e} indices to {self.re_embed} indices. "
|
|
f"Using {self.unknown_index} for unknown indices."
|
|
)
|
|
else:
|
|
self.re_embed = n_e
|
|
|
|
self.sane_index_shape = sane_index_shape
|
|
|
|
def remap_to_used(self, inds):
|
|
ishape = inds.shape
|
|
assert len(ishape) > 1
|
|
inds = inds.reshape(ishape[0], -1)
|
|
used = self.used.to(inds)
|
|
match = (inds[:, :, None] == used[None, None, ...]).long()
|
|
new = match.argmax(-1)
|
|
unknown = match.sum(2) < 1
|
|
if self.unknown_index == "random":
|
|
new[unknown] = torch.randint(0, self.re_embed, size=new[unknown].shape).to(
|
|
device=new.device
|
|
)
|
|
else:
|
|
new[unknown] = self.unknown_index
|
|
return new.reshape(ishape)
|
|
|
|
def unmap_to_all(self, inds):
|
|
ishape = inds.shape
|
|
assert len(ishape) > 1
|
|
inds = inds.reshape(ishape[0], -1)
|
|
used = self.used.to(inds)
|
|
if self.re_embed > self.used.shape[0]: # extra token
|
|
inds[inds >= self.used.shape[0]] = 0 # simply set to zero
|
|
back = torch.gather(used[None, :][inds.shape[0] * [0], :], 1, inds)
|
|
return back.reshape(ishape)
|
|
|
|
def forward(self, z, temp=None, rescale_logits=False, return_logits=False):
|
|
assert temp is None or temp == 1.0, "Only for interface compatible with Gumbel"
|
|
assert rescale_logits == False, "Only for interface compatible with Gumbel"
|
|
assert return_logits == False, "Only for interface compatible with Gumbel"
|
|
# reshape z -> (batch, height, width, channel) and flatten
|
|
z = rearrange(z, "b c h w -> b h w c").contiguous()
|
|
z_flattened = z.view(-1, self.e_dim)
|
|
# distances from z to embeddings e_j (z - e)^2 = z^2 + e^2 - 2 e * z
|
|
|
|
d = (
|
|
torch.sum(z_flattened**2, dim=1, keepdim=True)
|
|
+ torch.sum(self.embedding.weight**2, dim=1)
|
|
- 2
|
|
* torch.einsum(
|
|
"bd,dn->bn", z_flattened, rearrange(self.embedding.weight, "n d -> d n")
|
|
)
|
|
)
|
|
|
|
min_encoding_indices = torch.argmin(d, dim=1)
|
|
z_q = self.embedding(min_encoding_indices).view(z.shape)
|
|
perplexity = None
|
|
min_encodings = None
|
|
|
|
# compute loss for embedding
|
|
if not self.legacy:
|
|
loss = self.beta * torch.mean((z_q.detach() - z) ** 2) + torch.mean(
|
|
(z_q - z.detach()) ** 2
|
|
)
|
|
else:
|
|
loss = torch.mean((z_q.detach() - z) ** 2) + self.beta * torch.mean(
|
|
(z_q - z.detach()) ** 2
|
|
)
|
|
|
|
# preserve gradients
|
|
z_q = z + (z_q - z).detach()
|
|
|
|
# reshape back to match original input shape
|
|
z_q = rearrange(z_q, "b h w c -> b c h w").contiguous()
|
|
|
|
if self.remap is not None:
|
|
min_encoding_indices = min_encoding_indices.reshape(
|
|
z.shape[0], -1
|
|
) # add batch axis
|
|
min_encoding_indices = self.remap_to_used(min_encoding_indices)
|
|
min_encoding_indices = min_encoding_indices.reshape(-1, 1) # flatten
|
|
|
|
if self.sane_index_shape:
|
|
min_encoding_indices = min_encoding_indices.reshape(
|
|
z_q.shape[0], z_q.shape[2], z_q.shape[3]
|
|
)
|
|
|
|
return z_q, loss, (perplexity, min_encodings, min_encoding_indices)
|
|
|
|
def get_codebook_entry(self, indices, shape):
|
|
# shape specifying (batch, height, width, channel)
|
|
if self.remap is not None:
|
|
indices = indices.reshape(shape[0], -1) # add batch axis
|
|
indices = self.unmap_to_all(indices)
|
|
indices = indices.reshape(-1) # flatten again
|
|
|
|
# get quantized latent vectors
|
|
z_q = self.embedding(indices)
|
|
|
|
if shape is not None:
|
|
z_q = z_q.view(shape)
|
|
# reshape back to match original input shape
|
|
z_q = z_q.permute(0, 3, 1, 2).contiguous()
|
|
|
|
return z_q
|
|
|
|
|
|
class VQModel(pl.LightningModule):
|
|
def __init__(
|
|
self,
|
|
ddconfig,
|
|
lossconfig,
|
|
n_embed,
|
|
embed_dim,
|
|
ckpt_path=None,
|
|
ignore_keys=[],
|
|
image_key="image",
|
|
colorize_nlabels=None,
|
|
monitor=None,
|
|
batch_resize_range=None,
|
|
scheduler_config=None,
|
|
lr_g_factor=1.0,
|
|
remap=None,
|
|
sane_index_shape=False, # tell vector quantizer to return indices as bhw
|
|
):
|
|
super().__init__()
|
|
self.embed_dim = embed_dim
|
|
self.n_embed = n_embed
|
|
self.image_key = image_key
|
|
self.encoder = Encoder(**ddconfig)
|
|
self.decoder = Decoder(**ddconfig)
|
|
self.loss = instantiate_from_config(lossconfig)
|
|
self.quantize = VectorQuantizer(
|
|
n_embed,
|
|
embed_dim,
|
|
beta=0.25,
|
|
remap=remap,
|
|
sane_index_shape=sane_index_shape,
|
|
)
|
|
self.quant_conv = torch.nn.Conv2d(ddconfig["z_channels"], embed_dim, 1)
|
|
self.post_quant_conv = torch.nn.Conv2d(embed_dim, ddconfig["z_channels"], 1)
|
|
if colorize_nlabels is not None:
|
|
assert type(colorize_nlabels) == int
|
|
self.register_buffer("colorize", torch.randn(3, colorize_nlabels, 1, 1))
|
|
if monitor is not None:
|
|
self.monitor = monitor
|
|
self.batch_resize_range = batch_resize_range
|
|
if self.batch_resize_range is not None:
|
|
logger.info(
|
|
f"{self.__class__.__name__}: Using per-batch resizing in range {batch_resize_range}."
|
|
)
|
|
|
|
if ckpt_path is not None:
|
|
self.init_from_ckpt(ckpt_path, ignore_keys=ignore_keys)
|
|
self.scheduler_config = scheduler_config
|
|
self.lr_g_factor = lr_g_factor
|
|
|
|
|
|
class VQModelInterface(VQModel):
|
|
def __init__(self, embed_dim, *args, **kwargs):
|
|
super().__init__(embed_dim=embed_dim, *args, **kwargs)
|
|
self.embed_dim = embed_dim
|
|
|
|
def encode(self, x):
|
|
h = self.encoder(x)
|
|
h = self.quant_conv(h)
|
|
return h
|
|
|
|
def decode(self, h, force_not_quantize=False):
|
|
# also go through quantization layer
|
|
if not force_not_quantize:
|
|
quant, emb_loss, info = self.quantize(h)
|
|
else:
|
|
quant = h
|
|
quant = self.post_quant_conv(quant)
|
|
dec = self.decoder(quant)
|
|
return dec
|
|
|
|
|
|
class AutoencoderKL(pl.LightningModule):
|
|
def __init__(
|
|
self,
|
|
ddconfig,
|
|
lossconfig,
|
|
embed_dim,
|
|
ckpt_path=None,
|
|
ignore_keys=[],
|
|
image_key="image",
|
|
colorize_nlabels=None,
|
|
monitor=None,
|
|
):
|
|
super().__init__()
|
|
self.image_key = image_key
|
|
self.encoder = Encoder(**ddconfig)
|
|
self.decoder = Decoder(**ddconfig)
|
|
self.loss = instantiate_from_config(lossconfig)
|
|
assert ddconfig["double_z"]
|
|
self.quant_conv = torch.nn.Conv2d(2 * ddconfig["z_channels"], 2 * embed_dim, 1)
|
|
self.post_quant_conv = torch.nn.Conv2d(embed_dim, ddconfig["z_channels"], 1)
|
|
self.embed_dim = embed_dim
|
|
if colorize_nlabels is not None:
|
|
assert type(colorize_nlabels) == int
|
|
self.register_buffer("colorize", torch.randn(3, colorize_nlabels, 1, 1))
|
|
if monitor is not None:
|
|
self.monitor = monitor
|
|
if ckpt_path is not None:
|
|
self.init_from_ckpt(ckpt_path, ignore_keys=ignore_keys)
|
|
|
|
def init_from_ckpt(self, path, ignore_keys=list()):
|
|
sd = torch.load(path, map_location="cpu")["state_dict"]
|
|
keys = list(sd.keys())
|
|
for k in keys:
|
|
for ik in ignore_keys:
|
|
if k.startswith(ik):
|
|
logger.info("Deleting key {} from state_dict.".format(k))
|
|
del sd[k]
|
|
self.load_state_dict(sd, strict=False)
|
|
logger.info(f"Restored from {path}")
|
|
|
|
def encode(self, x):
|
|
h = self.encoder(x)
|
|
moments = self.quant_conv(h)
|
|
posterior = DiagonalGaussianDistribution(moments)
|
|
return posterior
|
|
|
|
def decode(self, z):
|
|
z = self.post_quant_conv(z)
|
|
dec = self.decoder(z)
|
|
return dec
|
|
|
|
def forward(self, input, sample_posterior=True):
|
|
posterior = self.encode(input)
|
|
if sample_posterior:
|
|
z = posterior.sample()
|
|
else:
|
|
z = posterior.mode()
|
|
dec = self.decode(z)
|
|
return dec, posterior
|
|
|
|
def get_input(self, batch, k):
|
|
x = batch[k]
|
|
if len(x.shape) == 3:
|
|
x = x[..., None]
|
|
x = x.permute(0, 3, 1, 2).to(memory_format=torch.contiguous_format).float()
|
|
return x
|