|
|
|
|
@@ -77,6 +77,11 @@ def cast_tuple(val, length = None):
|
|
|
|
|
def module_device(module):
|
|
|
|
|
return next(module.parameters()).device
|
|
|
|
|
|
|
|
|
|
def zero_init_(m):
|
|
|
|
|
nn.init.zeros_(m.weight)
|
|
|
|
|
if exists(m.bias):
|
|
|
|
|
nn.init.zeros_(m.bias)
|
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
|
def null_context(*args, **kwargs):
|
|
|
|
|
yield
|
|
|
|
|
@@ -220,6 +225,7 @@ class XClipAdapter(BaseClipAdapter):
|
|
|
|
|
encoder_output = self.clip.text_transformer(text)
|
|
|
|
|
text_cls, text_encodings = encoder_output[:, 0], encoder_output[:, 1:]
|
|
|
|
|
text_embed = self.clip.to_text_latent(text_cls)
|
|
|
|
|
text_encodings = text_encodings.masked_fill(~text_mask[..., None], 0.)
|
|
|
|
|
return EmbeddedText(l2norm(text_embed), text_encodings, text_mask)
|
|
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
|
|
|
@@ -255,6 +261,7 @@ class CoCaAdapter(BaseClipAdapter):
|
|
|
|
|
text = text[..., :self.max_text_len]
|
|
|
|
|
text_mask = text != 0
|
|
|
|
|
text_embed, text_encodings = self.clip.embed_text(text)
|
|
|
|
|
text_encodings = text_encodings.masked_fill(~text_mask[..., None], 0.)
|
|
|
|
|
return EmbeddedText(text_embed, text_encodings, text_mask)
|
|
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
|
|
|
@@ -314,6 +321,7 @@ class OpenAIClipAdapter(BaseClipAdapter):
|
|
|
|
|
|
|
|
|
|
text_embed = self.clip.encode_text(text)
|
|
|
|
|
text_encodings = self.text_encodings
|
|
|
|
|
text_encodings = text_encodings.masked_fill(~text_mask[..., None], 0.)
|
|
|
|
|
del self.text_encodings
|
|
|
|
|
return EmbeddedText(l2norm(text_embed.float()), text_encodings.float(), text_mask)
|
|
|
|
|
|
|
|
|
|
@@ -922,11 +930,12 @@ class DiffusionPrior(nn.Module):
|
|
|
|
|
loss_type = "l2",
|
|
|
|
|
predict_x_start = True,
|
|
|
|
|
beta_schedule = "cosine",
|
|
|
|
|
condition_on_text_encodings = True, # the paper suggests this is needed, but you can turn it off for your CLIP preprocessed text embed -> image embed training
|
|
|
|
|
sampling_clamp_l2norm = False,
|
|
|
|
|
condition_on_text_encodings = True, # the paper suggests this is needed, but you can turn it off for your CLIP preprocessed text embed -> image embed training
|
|
|
|
|
sampling_clamp_l2norm = False, # whether to l2norm clamp the image embed at each denoising iteration (analogous to -1 to 1 clipping for usual DDPMs)
|
|
|
|
|
sampling_final_clamp_l2norm = False, # whether to l2norm the final image embedding output (this is also done for images in ddpm)
|
|
|
|
|
training_clamp_l2norm = False,
|
|
|
|
|
init_image_embed_l2norm = False,
|
|
|
|
|
image_embed_scale = None, # this is for scaling the l2-normed image embedding, so it is more suitable for gaussian diffusion, as outlined by Katherine (@crowsonkb) https://github.com/lucidrains/DALLE2-pytorch/issues/60#issue-1226116132
|
|
|
|
|
image_embed_scale = None, # this is for scaling the l2-normed image embedding, so it is more suitable for gaussian diffusion, as outlined by Katherine (@crowsonkb) https://github.com/lucidrains/DALLE2-pytorch/issues/60#issue-1226116132
|
|
|
|
|
clip_adapter_overrides = dict()
|
|
|
|
|
):
|
|
|
|
|
super().__init__()
|
|
|
|
|
@@ -963,23 +972,32 @@ class DiffusionPrior(nn.Module):
|
|
|
|
|
self.condition_on_text_encodings = condition_on_text_encodings
|
|
|
|
|
|
|
|
|
|
# in paper, they do not predict the noise, but predict x0 directly for image embedding, claiming empirically better results. I'll just offer both.
|
|
|
|
|
|
|
|
|
|
self.predict_x_start = predict_x_start
|
|
|
|
|
|
|
|
|
|
# @crowsonkb 's suggestion - https://github.com/lucidrains/DALLE2-pytorch/issues/60#issue-1226116132
|
|
|
|
|
|
|
|
|
|
self.image_embed_scale = default(image_embed_scale, self.image_embed_dim ** 0.5)
|
|
|
|
|
|
|
|
|
|
# whether to force an l2norm, similar to clipping denoised, when sampling
|
|
|
|
|
|
|
|
|
|
self.sampling_clamp_l2norm = sampling_clamp_l2norm
|
|
|
|
|
self.sampling_final_clamp_l2norm = sampling_final_clamp_l2norm
|
|
|
|
|
|
|
|
|
|
self.training_clamp_l2norm = training_clamp_l2norm
|
|
|
|
|
self.init_image_embed_l2norm = init_image_embed_l2norm
|
|
|
|
|
|
|
|
|
|
# device tracker
|
|
|
|
|
|
|
|
|
|
self.register_buffer('_dummy', torch.tensor([True]), persistent = False)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def device(self):
|
|
|
|
|
return self._dummy.device
|
|
|
|
|
|
|
|
|
|
def l2norm_clamp_embed(self, image_embed):
|
|
|
|
|
return l2norm(image_embed) * self.image_embed_scale
|
|
|
|
|
|
|
|
|
|
def p_mean_variance(self, x, t, text_cond, clip_denoised = False, cond_scale = 1.):
|
|
|
|
|
assert not (cond_scale != 1. and not self.can_classifier_guidance), 'the model was not trained with conditional dropout, and thus one cannot use classifier free guidance (cond_scale anything other than 1)'
|
|
|
|
|
|
|
|
|
|
@@ -1020,6 +1038,9 @@ class DiffusionPrior(nn.Module):
|
|
|
|
|
times = torch.full((batch,), i, device = device, dtype = torch.long)
|
|
|
|
|
image_embed = self.p_sample(image_embed, times, text_cond = text_cond, cond_scale = cond_scale)
|
|
|
|
|
|
|
|
|
|
if self.sampling_final_clamp_l2norm and self.predict_x_start:
|
|
|
|
|
image_embed = self.l2norm_clamp_embed(image_embed)
|
|
|
|
|
|
|
|
|
|
return image_embed
|
|
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
|
|
|
@@ -1055,15 +1076,18 @@ class DiffusionPrior(nn.Module):
|
|
|
|
|
x_start.clamp_(-1., 1.)
|
|
|
|
|
|
|
|
|
|
if self.predict_x_start and self.sampling_clamp_l2norm:
|
|
|
|
|
x_start = l2norm(x_start) * self.image_embed_scale
|
|
|
|
|
x_start = self.l2norm_clamp_embed(x_start)
|
|
|
|
|
|
|
|
|
|
c1 = eta * ((1 - alpha / alpha_next) * (1 - alpha_next) / (1 - alpha)).sqrt()
|
|
|
|
|
c2 = ((1 - alpha_next) - torch.square(c1)).sqrt()
|
|
|
|
|
new_noise = torch.randn_like(image_embed)
|
|
|
|
|
noise = torch.randn_like(image_embed) if time_next > 0 else 0.
|
|
|
|
|
|
|
|
|
|
img = x_start * alpha_next.sqrt() + \
|
|
|
|
|
c1 * new_noise + \
|
|
|
|
|
c2 * pred_noise
|
|
|
|
|
image_embed = x_start * alpha_next.sqrt() + \
|
|
|
|
|
c1 * noise + \
|
|
|
|
|
c2 * pred_noise
|
|
|
|
|
|
|
|
|
|
if self.predict_x_start and self.sampling_final_clamp_l2norm:
|
|
|
|
|
image_embed = self.l2norm_clamp_embed(image_embed)
|
|
|
|
|
|
|
|
|
|
return image_embed
|
|
|
|
|
|
|
|
|
|
@@ -1091,7 +1115,7 @@ class DiffusionPrior(nn.Module):
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if self.predict_x_start and self.training_clamp_l2norm:
|
|
|
|
|
pred = l2norm(pred) * self.image_embed_scale
|
|
|
|
|
pred = self.l2norm_clamp_embed(pred)
|
|
|
|
|
|
|
|
|
|
target = noise if not self.predict_x_start else image_embed
|
|
|
|
|
|
|
|
|
|
@@ -1181,6 +1205,7 @@ class DiffusionPrior(nn.Module):
|
|
|
|
|
|
|
|
|
|
if self.condition_on_text_encodings:
|
|
|
|
|
assert exists(text_encodings), 'text encodings must be present for diffusion prior if specified'
|
|
|
|
|
text_mask = default(text_mask, lambda: torch.any(text_encodings != 0., dim = -1))
|
|
|
|
|
text_cond = {**text_cond, 'text_encodings': text_encodings, 'mask': text_mask}
|
|
|
|
|
|
|
|
|
|
# timestep conditioning from ddpm
|
|
|
|
|
@@ -1198,16 +1223,35 @@ class DiffusionPrior(nn.Module):
|
|
|
|
|
|
|
|
|
|
# decoder
|
|
|
|
|
|
|
|
|
|
def ConvTransposeUpsample(dim, dim_out = None):
|
|
|
|
|
dim_out = default(dim_out, dim)
|
|
|
|
|
return nn.ConvTranspose2d(dim, dim_out, 4, 2, 1)
|
|
|
|
|
class PixelShuffleUpsample(nn.Module):
|
|
|
|
|
"""
|
|
|
|
|
code shared by @MalumaDev at DALLE2-pytorch for addressing checkboard artifacts
|
|
|
|
|
https://arxiv.org/ftp/arxiv/papers/1707/1707.02937.pdf
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self, dim, dim_out = None):
|
|
|
|
|
super().__init__()
|
|
|
|
|
dim_out = default(dim_out, dim)
|
|
|
|
|
conv = nn.Conv2d(dim, dim_out * 4, 1)
|
|
|
|
|
|
|
|
|
|
def NearestUpsample(dim, dim_out = None):
|
|
|
|
|
dim_out = default(dim_out, dim)
|
|
|
|
|
return nn.Sequential(
|
|
|
|
|
nn.Upsample(scale_factor = 2, mode = 'nearest'),
|
|
|
|
|
nn.Conv2d(dim, dim_out, 3, padding = 1)
|
|
|
|
|
)
|
|
|
|
|
self.net = nn.Sequential(
|
|
|
|
|
conv,
|
|
|
|
|
nn.SiLU(),
|
|
|
|
|
nn.PixelShuffle(2)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.init_conv_(conv)
|
|
|
|
|
|
|
|
|
|
def init_conv_(self, conv):
|
|
|
|
|
o, i, h, w = conv.weight.shape
|
|
|
|
|
conv_weight = torch.empty(o // 4, i, h, w)
|
|
|
|
|
nn.init.kaiming_uniform_(conv_weight)
|
|
|
|
|
conv_weight = repeat(conv_weight, 'o ... -> (o 4) ...')
|
|
|
|
|
|
|
|
|
|
conv.weight.data.copy_(conv_weight)
|
|
|
|
|
nn.init.zeros_(conv.bias.data)
|
|
|
|
|
|
|
|
|
|
def forward(self, x):
|
|
|
|
|
return self.net(x)
|
|
|
|
|
|
|
|
|
|
def Downsample(dim, *, dim_out = None):
|
|
|
|
|
dim_out = default(dim_out, dim)
|
|
|
|
|
@@ -1471,7 +1515,7 @@ class Unet(nn.Module):
|
|
|
|
|
cross_embed_downsample_kernel_sizes = (2, 4),
|
|
|
|
|
memory_efficient = False,
|
|
|
|
|
scale_skip_connection = False,
|
|
|
|
|
nearest_upsample = False,
|
|
|
|
|
pixel_shuffle_upsample = True,
|
|
|
|
|
final_conv_kernel_size = 1,
|
|
|
|
|
**kwargs
|
|
|
|
|
):
|
|
|
|
|
@@ -1585,7 +1629,7 @@ class Unet(nn.Module):
|
|
|
|
|
|
|
|
|
|
# upsample klass
|
|
|
|
|
|
|
|
|
|
upsample_klass = ConvTransposeUpsample if not nearest_upsample else NearestUpsample
|
|
|
|
|
upsample_klass = ConvTransposeUpsample if not pixel_shuffle_upsample else PixelShuffleUpsample
|
|
|
|
|
|
|
|
|
|
# give memory efficient unet an initial resnet block
|
|
|
|
|
|
|
|
|
|
@@ -1649,6 +1693,8 @@ class Unet(nn.Module):
|
|
|
|
|
self.final_resnet_block = ResnetBlock(dim * 2, dim, time_cond_dim = time_cond_dim, groups = top_level_resnet_group)
|
|
|
|
|
self.to_out = nn.Conv2d(dim, self.channels_out, kernel_size = final_conv_kernel_size, padding = final_conv_kernel_size // 2)
|
|
|
|
|
|
|
|
|
|
zero_init_(self.to_out) # since both OpenAI and @crowsonkb are doing it
|
|
|
|
|
|
|
|
|
|
# if the current settings for the unet are not correct
|
|
|
|
|
# for cascading DDPM, then reinit the unet with the right settings
|
|
|
|
|
def cast_model_parameters(
|
|
|
|
|
@@ -2047,7 +2093,7 @@ class Decoder(nn.Module):
|
|
|
|
|
self.noise_schedulers = nn.ModuleList([])
|
|
|
|
|
|
|
|
|
|
for ind, (unet_beta_schedule, unet_p2_loss_weight_gamma, sample_timesteps) in enumerate(zip(beta_schedule, p2_loss_weight_gamma, self.sample_timesteps)):
|
|
|
|
|
assert sample_timesteps <= timesteps, f'sampling timesteps {sample_timesteps} must be less than or equal to the number of training timesteps {timesteps} for unet {ind + 1}'
|
|
|
|
|
assert not exists(sample_timesteps) or sample_timesteps <= timesteps, f'sampling timesteps {sample_timesteps} must be less than or equal to the number of training timesteps {timesteps} for unet {ind + 1}'
|
|
|
|
|
|
|
|
|
|
noise_scheduler = NoiseScheduler(
|
|
|
|
|
beta_schedule = unet_beta_schedule,
|
|
|
|
|
@@ -2275,9 +2321,10 @@ class Decoder(nn.Module):
|
|
|
|
|
|
|
|
|
|
c1 = eta * ((1 - alpha / alpha_next) * (1 - alpha_next) / (1 - alpha)).sqrt()
|
|
|
|
|
c2 = ((1 - alpha_next) - torch.square(c1)).sqrt()
|
|
|
|
|
noise = torch.randn_like(img) if time_next > 0 else 0.
|
|
|
|
|
|
|
|
|
|
img = x_start * alpha_next.sqrt() + \
|
|
|
|
|
c1 * torch.randn_like(img) + \
|
|
|
|
|
c1 * noise + \
|
|
|
|
|
c2 * pred_noise
|
|
|
|
|
|
|
|
|
|
img = self.unnormalize_img(img)
|
|
|
|
|
@@ -2393,6 +2440,9 @@ class Decoder(nn.Module):
|
|
|
|
|
assert not (self.condition_on_text_encodings and not exists(text_encodings)), 'text or text encodings must be passed into decoder if specified'
|
|
|
|
|
assert not (not self.condition_on_text_encodings and exists(text_encodings)), 'decoder specified not to be conditioned on text, yet it is presented'
|
|
|
|
|
|
|
|
|
|
if self.condition_on_text_encodings:
|
|
|
|
|
text_mask = default(text_mask, lambda: torch.any(text_encodings != 0., dim = -1))
|
|
|
|
|
|
|
|
|
|
img = None
|
|
|
|
|
is_cuda = next(self.parameters()).is_cuda
|
|
|
|
|
|
|
|
|
|
@@ -2476,6 +2526,9 @@ class Decoder(nn.Module):
|
|
|
|
|
assert not (self.condition_on_text_encodings and not exists(text_encodings)), 'text or text encodings must be passed into decoder if specified'
|
|
|
|
|
assert not (not self.condition_on_text_encodings and exists(text_encodings)), 'decoder specified not to be conditioned on text, yet it is presented'
|
|
|
|
|
|
|
|
|
|
if self.condition_on_text_encodings:
|
|
|
|
|
text_mask = default(text_mask, lambda: torch.any(text_encodings != 0., dim = -1))
|
|
|
|
|
|
|
|
|
|
lowres_cond_img = self.to_lowres_cond(image, target_image_size = target_image_size, downsample_image_size = self.image_sizes[unet_index - 1]) if unet_number > 1 else None
|
|
|
|
|
image = resize_image_to(image, target_image_size)
|
|
|
|
|
|
|
|
|
|
|