|
|
|
|
@@ -7,6 +7,7 @@ from contextlib import contextmanager
|
|
|
|
|
import torch
|
|
|
|
|
import torch.nn.functional as F
|
|
|
|
|
from torch import nn, einsum
|
|
|
|
|
import torchvision.transforms as T
|
|
|
|
|
|
|
|
|
|
from einops import rearrange, repeat
|
|
|
|
|
from einops.layers.torch import Rearrange
|
|
|
|
|
@@ -89,6 +90,59 @@ def resize_image_to(t, image_size, mode = 'bilinear'): # take a look at https://
|
|
|
|
|
|
|
|
|
|
return F.interpolate(t, size = shape, mode = mode, align_corners = False)
|
|
|
|
|
|
|
|
|
|
# clip related adapters
|
|
|
|
|
|
|
|
|
|
class BaseClipAdapter(nn.Module):
|
|
|
|
|
def __init__(self, clip):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.clip = clip
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def dim_latent(self):
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def image_size(self):
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def image_channels(self):
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
|
def embed_text(self, text):
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
|
def embed_image(self, image):
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
|
class XClipAdapter(BaseClipAdapter):
|
|
|
|
|
@property
|
|
|
|
|
def dim_latent(self):
|
|
|
|
|
return self.clip.dim_latent
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def image_size(self):
|
|
|
|
|
return self.clip.image_size
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def image_channels(self):
|
|
|
|
|
return self.clip.image_channels
|
|
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
|
|
|
def embed_text(self, text):
|
|
|
|
|
encoder_output = self.clip.text_transformer(text)
|
|
|
|
|
text_cls, text_encodings = encoder_output[:, 0], encoder_output[:, 1:]
|
|
|
|
|
text_embed = self.clip.to_text_latent(text_cls)
|
|
|
|
|
return l2norm(text_embed), text_encodings
|
|
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
|
|
|
def embed_image(self, image):
|
|
|
|
|
image = resize_image_to(image, self.image_size)
|
|
|
|
|
encoder_output = self.clip.visual_transformer(image)
|
|
|
|
|
image_cls, image_encodings = encoder_output[:, 0], encoder_output[:, 1:]
|
|
|
|
|
image_embed = self.clip.to_visual_latent(image_cls)
|
|
|
|
|
return l2norm(image_embed), image_encodings
|
|
|
|
|
|
|
|
|
|
# classifier free guidance functions
|
|
|
|
|
|
|
|
|
|
def prob_mask_like(shape, prob, device):
|
|
|
|
|
@@ -593,7 +647,10 @@ class DiffusionPrior(BaseGaussianDiffusion):
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if exists(clip):
|
|
|
|
|
assert isinstance(clip, CLIP)
|
|
|
|
|
if isinstance(clip, CLIP):
|
|
|
|
|
clip = XClipAdapter(clip)
|
|
|
|
|
|
|
|
|
|
assert isinstance(clip, BaseClipAdapter)
|
|
|
|
|
freeze_model_and_make_eval_(clip)
|
|
|
|
|
self.clip = clip
|
|
|
|
|
else:
|
|
|
|
|
@@ -610,29 +667,6 @@ class DiffusionPrior(BaseGaussianDiffusion):
|
|
|
|
|
self.predict_x_start = predict_x_start
|
|
|
|
|
# in paper, they do not predict the noise, but predict x0 directly for image embedding, claiming empirically better results. I'll just offer both.
|
|
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
|
|
|
def get_image_embed(self, image):
|
|
|
|
|
assert exists(self.clip)
|
|
|
|
|
|
|
|
|
|
image_encoding = self.clip.visual_transformer(image)
|
|
|
|
|
image_cls = image_encoding[:, 0]
|
|
|
|
|
image_embed = self.clip.to_visual_latent(image_cls)
|
|
|
|
|
return l2norm(image_embed)
|
|
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
|
|
|
def get_text_cond(self, text):
|
|
|
|
|
assert exists(self.clip)
|
|
|
|
|
|
|
|
|
|
text_encodings = self.clip.text_transformer(text)
|
|
|
|
|
text_cls, text_encodings = text_encodings[:, 0], text_encodings[:, 1:]
|
|
|
|
|
text_embed = self.clip.to_text_latent(text_cls)
|
|
|
|
|
text_embed = l2norm(text_embed)
|
|
|
|
|
|
|
|
|
|
if not self.condition_on_text_encodings:
|
|
|
|
|
return dict(text_embed = text_embed)
|
|
|
|
|
|
|
|
|
|
return dict(text_encodings = text_encodings, text_embed = text_embed, mask = text != 0)
|
|
|
|
|
|
|
|
|
|
def p_mean_variance(self, x, t, text_cond, clip_denoised: bool):
|
|
|
|
|
pred = self.net(x, t, **text_cond)
|
|
|
|
|
|
|
|
|
|
@@ -704,7 +738,12 @@ class DiffusionPrior(BaseGaussianDiffusion):
|
|
|
|
|
batch_size = text.shape[0]
|
|
|
|
|
image_embed_dim = self.image_embed_dim
|
|
|
|
|
|
|
|
|
|
text_cond = self.get_text_cond(text)
|
|
|
|
|
text_embed, text_encodings = self.clip.embed_text(text)
|
|
|
|
|
|
|
|
|
|
text_cond = dict(text_embed = text_embed)
|
|
|
|
|
|
|
|
|
|
if self.condition_on_text_encodings:
|
|
|
|
|
text_cond = {**text_cond, 'text_encodings': text_encodings, 'mask': text != 0}
|
|
|
|
|
|
|
|
|
|
image_embeds = self.p_sample_loop((batch_size, image_embed_dim), text_cond = text_cond)
|
|
|
|
|
text_embeds = text_cond['text_embed']
|
|
|
|
|
@@ -736,18 +775,19 @@ class DiffusionPrior(BaseGaussianDiffusion):
|
|
|
|
|
assert not (self.condition_on_text_encodings and (not exists(text_encodings) and not exists(text))), 'text encodings must be present if you specified you wish to condition on it on initialization'
|
|
|
|
|
|
|
|
|
|
if exists(image):
|
|
|
|
|
image_embed = self.get_image_embed(image)
|
|
|
|
|
image_embed, _ = self.clip.embed_image(image)
|
|
|
|
|
|
|
|
|
|
# calculate text conditionings, based on what is passed in
|
|
|
|
|
|
|
|
|
|
if exists(text):
|
|
|
|
|
text_cond = self.get_text_cond(text)
|
|
|
|
|
else:
|
|
|
|
|
text_cond = dict(
|
|
|
|
|
text_embed = text_embed,
|
|
|
|
|
text_encodings = text_encodings,
|
|
|
|
|
mask = text_mask
|
|
|
|
|
)
|
|
|
|
|
text_embed, text_encodings = self.clip.embed_text(text)
|
|
|
|
|
text_mask = text != 0
|
|
|
|
|
|
|
|
|
|
text_cond = dict(text_embed = text_embed)
|
|
|
|
|
|
|
|
|
|
if self.condition_on_text_encodings:
|
|
|
|
|
assert exists(text_encodings), 'text encodings must be present for diffusion prior if specified'
|
|
|
|
|
text_cond = {**text_cond, 'text_encodings': text_encodings, 'mask': text_mask}
|
|
|
|
|
|
|
|
|
|
# timestep conditioning from ddpm
|
|
|
|
|
|
|
|
|
|
@@ -756,8 +796,7 @@ class DiffusionPrior(BaseGaussianDiffusion):
|
|
|
|
|
|
|
|
|
|
# calculate forward loss
|
|
|
|
|
|
|
|
|
|
loss = self.p_losses(image_embed, times, text_cond = text_cond, *args, **kwargs)
|
|
|
|
|
return loss
|
|
|
|
|
return self.p_losses(image_embed, times, text_cond = text_cond, *args, **kwargs)
|
|
|
|
|
|
|
|
|
|
# decoder
|
|
|
|
|
|
|
|
|
|
@@ -1208,8 +1247,12 @@ class Decoder(BaseGaussianDiffusion):
|
|
|
|
|
loss_type = loss_type
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert isinstance(clip, CLIP)
|
|
|
|
|
if isinstance(clip, CLIP):
|
|
|
|
|
clip = XClipAdapter(clip)
|
|
|
|
|
|
|
|
|
|
freeze_model_and_make_eval_(clip)
|
|
|
|
|
assert isinstance(clip, BaseClipAdapter)
|
|
|
|
|
|
|
|
|
|
self.clip = clip
|
|
|
|
|
self.clip_image_size = clip.image_size
|
|
|
|
|
self.channels = clip.image_channels
|
|
|
|
|
@@ -1290,10 +1333,6 @@ class Decoder(BaseGaussianDiffusion):
|
|
|
|
|
yield
|
|
|
|
|
unet.cpu()
|
|
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
|
|
|
def get_text_encodings(self, text):
|
|
|
|
|
text_encodings = self.clip.text_transformer(text)
|
|
|
|
|
return text_encodings[:, 1:]
|
|
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
|
|
|
def get_image_embed(self, image):
|
|
|
|
|
@@ -1379,9 +1418,12 @@ class Decoder(BaseGaussianDiffusion):
|
|
|
|
|
def sample(self, image_embed, text = None, cond_scale = 1.):
|
|
|
|
|
batch_size = image_embed.shape[0]
|
|
|
|
|
|
|
|
|
|
text_encodings = self.get_text_encodings(text) if exists(text) else None
|
|
|
|
|
text_encodings = None
|
|
|
|
|
if exists(text):
|
|
|
|
|
_, text_encodings = self.clip.embed_text(text)
|
|
|
|
|
|
|
|
|
|
assert not (self.condition_on_text_encodings and not exists(text_encodings)), 'text or text encodings must be passed into decoder if specified'
|
|
|
|
|
assert not (not self.condition_on_text_encodings and exists(text_encodings)), 'decoder specified not to be conditioned on text, yet it is presented'
|
|
|
|
|
|
|
|
|
|
img = None
|
|
|
|
|
|
|
|
|
|
@@ -1442,11 +1484,14 @@ class Decoder(BaseGaussianDiffusion):
|
|
|
|
|
times = torch.randint(0, self.num_timesteps, (b,), device = device, dtype = torch.long)
|
|
|
|
|
|
|
|
|
|
if not exists(image_embed):
|
|
|
|
|
image_embed = self.get_image_embed(image)
|
|
|
|
|
image_embed, _ = self.clip.embed_image(image)
|
|
|
|
|
|
|
|
|
|
text_encodings = self.get_text_encodings(text) if exists(text) and not exists(text_encodings) else None
|
|
|
|
|
text_encodings = None
|
|
|
|
|
if exists(text) and not exists(text_encodings):
|
|
|
|
|
_, text_encodings = self.clip.embed_text(text)
|
|
|
|
|
|
|
|
|
|
assert not (self.condition_on_text_encodings and not exists(text_encodings)), 'text or text encodings must be passed into decoder if specified'
|
|
|
|
|
assert not (not self.condition_on_text_encodings and exists(text_encodings)), 'decoder specified not to be conditioned on text, yet it is presented'
|
|
|
|
|
|
|
|
|
|
lowres_cond_img = self.to_lowres_cond(image, target_image_size = target_image_size, downsample_image_size = self.image_sizes[unet_index - 1]) if unet_number > 1 else None
|
|
|
|
|
image = resize_image_to(image, target_image_size)
|
|
|
|
|
@@ -1479,12 +1524,15 @@ class DALLE2(nn.Module):
|
|
|
|
|
self.prior_num_samples = prior_num_samples
|
|
|
|
|
self.decoder_need_text_cond = self.decoder.condition_on_text_encodings
|
|
|
|
|
|
|
|
|
|
self.to_pil = T.ToPILImage()
|
|
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
|
|
|
@eval_decorator
|
|
|
|
|
def forward(
|
|
|
|
|
self,
|
|
|
|
|
text,
|
|
|
|
|
cond_scale = 1.
|
|
|
|
|
cond_scale = 1.,
|
|
|
|
|
return_pil_images = False
|
|
|
|
|
):
|
|
|
|
|
device = next(self.parameters()).device
|
|
|
|
|
one_text = isinstance(text, str) or (not is_list_str(text) and text.shape[0] == 1)
|
|
|
|
|
@@ -1498,7 +1546,11 @@ class DALLE2(nn.Module):
|
|
|
|
|
text_cond = text if self.decoder_need_text_cond else None
|
|
|
|
|
images = self.decoder.sample(image_embed, text = text_cond, cond_scale = cond_scale)
|
|
|
|
|
|
|
|
|
|
if return_pil_images:
|
|
|
|
|
images = list(map(self.to_pil, images.unbind(dim = 0)))
|
|
|
|
|
|
|
|
|
|
if one_text:
|
|
|
|
|
return images[0]
|
|
|
|
|
|
|
|
|
|
return images
|
|
|
|
|
|
|
|
|
|
|