first pass at complete DALL-E2 + Latent Diffusion integration, latent diffusion on any layer(s) of the cascading ddpm in the decoder.

This commit is contained in:
Phil Wang
2022-04-22 13:53:13 -07:00
parent f2d5b87677
commit 76b32f18b3
4 changed files with 204 additions and 30 deletions

View File

@@ -48,6 +48,12 @@ def is_list_str(x):
return False
return all([type(el) == str for el in x])
def pad_tuple_to_length(t, length):
remain_length = length - len(t)
if remain_length <= 0:
return t
return (*t, *((None,) * remain_length))
# for controlling freezing of CLIP
def set_module_requires_grad_(module, requires_grad):
@@ -540,12 +546,14 @@ class DiffusionPrior(nn.Module):
self.register_buffer('posterior_mean_coef1', betas * torch.sqrt(alphas_cumprod_prev) / (1. - alphas_cumprod))
self.register_buffer('posterior_mean_coef2', (1. - alphas_cumprod_prev) * torch.sqrt(alphas) / (1. - alphas_cumprod))
@torch.no_grad()
def get_image_embed(self, image):
image_encoding = self.clip.visual_transformer(image)
image_cls = image_encoding[:, 0]
image_embed = self.clip.to_visual_latent(image_cls)
return l2norm(image_embed)
@torch.no_grad()
def get_text_cond(self, text):
text_encodings = self.clip.text_transformer(text)
text_cls, text_encodings = text_encodings[:, 0], text_encodings[:, 1:]
@@ -940,11 +948,16 @@ class Unet(nn.Module):
# if the current settings for the unet are not correct
# for cascading DDPM, then reinit the unet with the right settings
def force_lowres_cond(self, lowres_cond):
if lowres_cond == self.lowres_cond:
def cast_model_parameters(
self,
*,
lowres_cond,
channels
):
if lowres_cond == self.lowres_cond and channels == self.channels:
return self
updated_kwargs = {**self._locals, 'lowres_cond': lowres_cond}
updated_kwargs = {**self._locals, 'lowres_cond': lowres_cond, 'channels': channels}
return self.__class__(**updated_kwargs)
def forward_with_cond_scale(
@@ -1100,6 +1113,7 @@ class Decoder(nn.Module):
unet,
*,
clip,
vae = None,
timesteps = 1000,
cond_drop_prob = 0.2,
loss_type = 'l1',
@@ -1120,11 +1134,25 @@ class Decoder(nn.Module):
# automatically take care of ensuring that first unet is unconditional
# while the rest of the unets are conditioned on the low resolution image produced by previous unet
unets = cast_tuple(unet)
vaes = pad_tuple_to_length(cast_tuple(vae), len(unets))
self.unets = nn.ModuleList([])
for ind, one_unet in enumerate(cast_tuple(unet)):
self.vaes = nn.ModuleList([])
for ind, (one_unet, one_vae) in enumerate(zip(unets, vaes)):
is_first = ind == 0
one_unet = one_unet.force_lowres_cond(not is_first)
latent_dim = one_vae.encoded_dim if exists(one_vae) else None
unet_channels = default(latent_dim, self.channels)
one_unet = one_unet.cast_model_parameters(
lowres_cond = not is_first,
channels = unet_channels
)
self.unets.append(one_unet)
self.vaes.append(one_vae.copy_for_eval() if exists(one_vae) else None)
# unet image sizes
@@ -1219,10 +1247,12 @@ class Decoder(nn.Module):
yield
unet.cpu()
@torch.no_grad()
def get_text_encodings(self, text):
text_encodings = self.clip.text_transformer(text)
return text_encodings[:, 1:]
@torch.no_grad()
def get_image_embed(self, image):
image = resize_image_to(image, self.clip_image_size)
image_encoding = self.clip.visual_transformer(image)
@@ -1324,25 +1354,43 @@ class Decoder(nn.Module):
img = None
for unet, channel, image_size in tqdm(zip(self.unets, self.sample_channels, self.image_sizes)):
for unet, vae, channel, image_size in tqdm(zip(self.unets, self.vaes, self.sample_channels, self.image_sizes)):
with self.one_unet_in_gpu(unet = unet):
lowres_cond_img = self.to_lowres_cond(
img,
target_image_size = image_size
) if unet.lowres_cond else None
lowres_cond_img = None
shape = (batch_size, channel, image_size, image_size)
if unet.lowres_cond:
lowres_cond_img = self.to_lowres_cond(img, target_image_size = image_size)
if exists(vae):
image_size //= (2 ** vae.layers)
shape = (batch_size, vae.encoded_dim, image_size, image_size)
if exists(lowres_cond_img):
lowres_cond_img = vae.encode(lowres_cond_img)
img = self.p_sample_loop(
unet,
(batch_size, channel, image_size, image_size),
shape,
image_embed = image_embed,
text_encodings = text_encodings,
cond_scale = cond_scale,
lowres_cond_img = lowres_cond_img
)
if exists(vae):
img = vae.decode(img)
return img
def forward(self, image, text = None, image_embed = None, text_encodings = None, unet_number = None):
def forward(
self,
image,
text = None,
image_embed = None,
text_encodings = None,
unet_number = None
):
assert not (len(self.unets) > 1 and not exists(unet_number)), f'you must specify which unet you want trained, from a range of 1 to {len(self.unets)}, if you are training cascading DDPM (multiple unets)'
unet_number = default(unet_number, 1)
unet_index = unet_number - 1
@@ -1350,6 +1398,7 @@ class Decoder(nn.Module):
unet = self.get_unet(unet_number)
target_image_size = self.image_sizes[unet_index]
vae = self.vaes[unet_index]
b, c, h, w, device, = *image.shape, image.device
@@ -1364,8 +1413,17 @@ class Decoder(nn.Module):
text_encodings = self.get_text_encodings(text) if exists(text) and not exists(text_encodings) else None
lowres_cond_img = self.to_lowres_cond(image, target_image_size = target_image_size, downsample_image_size = self.image_sizes[unet_index - 1]) if unet_number > 1 else None
ddpm_image = resize_image_to(image, target_image_size)
return self.p_losses(unet, ddpm_image, times, image_embed = image_embed, text_encodings = text_encodings, lowres_cond_img = lowres_cond_img)
image = resize_image_to(image, target_image_size)
if exists(vae):
vae.eval()
with torch.no_grad():
image = vae.encode(image)
if exists(lowres_cond_img):
lowres_cond_img = vae.encode(lowres_cond_img)
return self.p_losses(unet, image, times, image_embed = image_embed, text_encodings = text_encodings, lowres_cond_img = lowres_cond_img)
# main class