Compare commits

...

26 Commits

Author SHA1 Message Date
Phil Wang
301a97197f fix self conditioning shape in diffusion prior 2022-08-12 12:29:25 -07:00
Phil Wang
9440411954 make self conditioning technique work with diffusion prior 2022-08-12 12:20:51 -07:00
Phil Wang
981d407792 comment 2022-08-12 11:41:23 -07:00
Phil Wang
7c5477b26d bet on the new self-conditioning technique out of geoffrey hintons group 2022-08-12 11:36:08 -07:00
Phil Wang
be3bb868bf add gradient checkpointing for all resnet blocks 2022-08-02 19:21:44 -07:00
Phil Wang
451de34871 enforce clip anytorch version 2022-07-30 10:07:55 -07:00
Phil Wang
f22e8c8741 make open clip available for use with dalle2 pytorch 2022-07-30 09:02:31 -07:00
Phil Wang
87432e93ad quick fix for linear attention 2022-07-29 13:17:12 -07:00
Phil Wang
d167378401 add cosine sim for self attention as well, as a setting 2022-07-29 12:48:20 -07:00
Phil Wang
2d67d5821e change up epsilon in layernorm the case of using fp16, thanks to @Veldrovive for figuring out this stabilizes training 2022-07-29 12:41:02 -07:00
Phil Wang
748c7fe7af allow for cosine sim cross attention, modify linear attention in attempt to resolve issue on fp16 2022-07-29 11:12:18 -07:00
Phil Wang
80046334ad make sure entire readme runs without errors 2022-07-28 10:17:43 -07:00
Phil Wang
36fb46a95e fix readme and a small bug in DALLE2 class 2022-07-28 08:33:51 -07:00
Phil Wang
07abfcf45b rescale values in linear attention to mitigate overflows in fp16 setting 2022-07-27 12:27:38 -07:00
Phil Wang
2e35a9967d product management 2022-07-26 11:10:16 -07:00
Phil Wang
406e75043f add upsample combiner feature for the unets 2022-07-26 10:46:04 -07:00
Phil Wang
9646dfc0e6 fix path_or_state bug 2022-07-26 09:47:54 -07:00
Phil Wang
62043acb2f fix repaint 2022-07-24 15:29:06 -07:00
Phil Wang
417ff808e6 1.0.3 2022-07-22 13:16:57 -07:00
Aidan Dempster
f3d7e226ba Changed types to be generic instead of functions (#215)
This allows pylance to do proper type hinting and makes developing
extensions to the package much easier
2022-07-22 13:16:29 -07:00
Phil Wang
48a1302428 1.0.2 2022-07-20 23:01:51 -07:00
Aidan Dempster
ccaa46b81b Re-introduced change that was accidentally rolled back (#212) 2022-07-20 23:01:19 -07:00
Phil Wang
76d08498cc diffusion prior training updates from @nousr 2022-07-20 18:05:27 -07:00
zion
f9423d308b Prior updates (#211)
* update configs for prior

add prior warmup to config

update example prior config

* update prior trainer & script

add deepspeed amp & warmup

adopt full accelerator support

reload at sample point

finish epoch resume code

* update tracker save method for prior

* helper functions for prior_loader
2022-07-20 18:04:26 -07:00
Phil Wang
06c65b60d2 1.0.0 2022-07-19 19:08:17 -07:00
Aidan Dempster
4145474bab Improved upsampler training (#181)
Sampling is now possible without the first decoder unet

Non-training unets are deleted in the decoder trainer since they are never used and it is harder merge the models is they have keys in this state dict

Fixed a mistake where clip was not re-added after saving
2022-07-19 19:07:50 -07:00
12 changed files with 1272 additions and 554 deletions

View File

@@ -371,6 +371,7 @@ loss.backward()
unet1 = Unet(
dim = 128,
image_embed_dim = 512,
text_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults=(1, 2, 4, 8),
@@ -395,7 +396,7 @@ decoder = Decoder(
).cuda()
for unet_number in (1, 2):
loss = decoder(images, unet_number = unet_number) # this can optionally be decoder(images, text) if you wish to condition on the text encodings as well, though it was hinted in the paper it didn't do much
loss = decoder(images, text = text, unet_number = unet_number) # this can optionally be decoder(images, text) if you wish to condition on the text encodings as well, though it was hinted in the paper it didn't do much
loss.backward()
# do above for many steps
@@ -626,6 +627,18 @@ images = dalle2(
# save your image (in this example, of size 256x256)
```
Alternatively, you can also use <a href="https://github.com/mlfoundations/open_clip">Open Clip</a>
```bash
$ pip install open-clip-torch
```
```python
from dalle2_pytorch import OpenClipAdapter
clip = OpenClipAdapter()
```
Now you'll just have to worry about training the Prior and the Decoder!
## Inpainting
@@ -860,25 +873,23 @@ unet1 = Unet(
text_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults=(1, 2, 4, 8)
dim_mults=(1, 2, 4, 8),
cond_on_text_encodings = True,
).cuda()
unet2 = Unet(
dim = 16,
image_embed_dim = 512,
text_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults = (1, 2, 4, 8, 16),
cond_on_text_encodings = True
).cuda()
decoder = Decoder(
unet = (unet1, unet2),
image_sizes = (128, 256),
clip = clip,
timesteps = 1000,
condition_on_text_encodings = True
timesteps = 1000
).cuda()
decoder_trainer = DecoderTrainer(
@@ -903,8 +914,8 @@ for unet_number in (1, 2):
# after much training
# you can sample from the exponentially moving averaged unets as so
mock_image_embed = torch.randn(4, 512).cuda()
images = decoder_trainer.sample(mock_image_embed, text = text) # (4, 3, 256, 256)
mock_image_embed = torch.randn(32, 512).cuda()
images = decoder_trainer.sample(image_embed = mock_image_embed, text = text) # (4, 3, 256, 256)
```
### Diffusion Prior Training
@@ -1112,7 +1123,8 @@ For detailed information on training the diffusion prior, please refer to the [d
- [x] allow for unet to be able to condition non-cross attention style as well
- [x] speed up inference, read up on papers (ddim)
- [x] add inpainting ability using resampler from repaint paper https://arxiv.org/abs/2201.09865
- [ ] try out the nested unet from https://arxiv.org/abs/2005.09007 after hearing several positive testimonies from researchers, for segmentation anyhow
- [x] add the final combination of upsample feature maps, used in unet squared, seems to have an effect in local experiments
- [ ] consider elucidated dalle2 https://arxiv.org/abs/2206.00364
- [ ] interface out the vqgan-vae so a pretrained one can be pulled off the shelf to validate latent diffusion + DALL-E2
## Citations
@@ -1241,4 +1253,15 @@ For detailed information on training the diffusion prior, please refer to the [d
}
```
```bibtex
@misc{chen2022analog,
title = {Analog Bits: Generating Discrete Data using Diffusion Models with Self-Conditioning},
author = {Ting Chen and Ruixiang Zhang and Geoffrey Hinton},
year = {2022},
eprint = {2208.04202},
archivePrefix = {arXiv},
primaryClass = {cs.CV}
}
```
*Creating noise from data is easy; creating data from noise is generative modeling.* - <a href="https://arxiv.org/abs/2011.13456">Yang Song's paper</a>

View File

@@ -69,6 +69,7 @@ Settings for controlling the training hyperparameters.
| `wd` | No | `0.01` | The weight decay. |
| `max_grad_norm`| No | `0.5` | The grad norm clipping. |
| `save_every_n_samples` | No | `100000` | Samples will be generated and a checkpoint will be saved every `save_every_n_samples` samples. |
| `cond_scale` | No | `1.0` | Conditioning scale to use for sampling. Can also be an array of values, one for each unet. |
| `device` | No | `cuda:0` | The device to train on. |
| `epoch_samples` | No | `None` | Limits the number of samples iterated through in each epoch. This must be set if resampling. None means no limit. |
| `validation_samples` | No | `None` | The number of samples to use for validation. None mean the entire validation set. |

View File

@@ -1,18 +1,14 @@
{
"prior": {
"clip": {
"make": "x-clip",
"model": "ViT-L/14",
"base_model_kwargs": {
"dim_text": 768,
"dim_image": 768,
"dim_latent": 768
}
"make": "openai",
"model": "ViT-L/14"
},
"net": {
"dim": 768,
"depth": 12,
"num_timesteps": 1000,
"max_text_len": 77,
"num_time_embeds": 1,
"num_image_embeds": 1,
"num_text_embeds": 1,
@@ -20,8 +16,8 @@
"heads": 12,
"ff_mult": 4,
"norm_out": true,
"attn_dropout": 0.0,
"ff_dropout": 0.0,
"attn_dropout": 0.05,
"ff_dropout": 0.05,
"final_proj": true,
"normformer": true,
"rotary_emb": true
@@ -30,6 +26,7 @@
"image_size": 224,
"image_channels": 3,
"timesteps": 1000,
"sample_timesteps": 64,
"cond_drop_prob": 0.1,
"loss_type": "l2",
"predict_x_start": true,
@@ -37,34 +34,48 @@
"condition_on_text_encodings": true
},
"data": {
"image_url": "https://mystic.the-eye.eu/public/AI/cah/laion5b/embeddings/laion2B-en/img_emb/",
"text_url": "https://mystic.the-eye.eu/public/AI/cah/laion5b/embeddings/laion2B-en/text_emb/",
"meta_url": "https://mystic.the-eye.eu/public/AI/cah/laion5b/embeddings/laion2B-en/laion2B-en-metadata/",
"batch_size": 256,
"batch_size": 128,
"num_data_points": 100000,
"eval_every_seconds": 1600,
"image_url": "<path to your images>",
"meta_url": "<path to your metadata>",
"splits": {
"train": 0.9,
"val": 1e-7,
"test": 0.0999999
"train": 0.8,
"val": 0.1,
"test": 0.1
}
},
"train": {
"epochs": 1,
"epochs": 5,
"lr": 1.1e-4,
"wd": 6.02e-2,
"max_grad_norm": 0.5,
"use_ema": true,
"ema_beta": 0.9999,
"ema_update_after_step": 50,
"warmup_steps": 50,
"amp": false,
"save_every": 10000
},
"load": {
"source": null,
"resume": false
"save_every_seconds": 3600,
"eval_timesteps": [64, 1000],
"random_seed": 84513
},
"tracker": {
"tracker_type": "wandb",
"data_path": "./prior_checkpoints",
"wandb_entity": "laion",
"wandb_project": "diffusion-prior",
"verbose": true
"data_path": ".prior",
"overwrite_data_path": true,
"log": {
"log_type": "wandb",
"wandb_entity": "<your wandb username>",
"wandb_project": "prior_debugging",
"wandb_resume": false,
"verbose": true
},
"save": [
{
"save_to": "local",
"save_type": "checkpoint",
"save_latest_to": ".prior/latest_checkpoint.pth",
"save_best_to": ".prior/best_checkpoint.pth"
}
]
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -67,6 +67,15 @@ class PriorEmbeddingDataset(IterableDataset):
def __str__(self):
return f"<PriorEmbeddingDataset: start: {self.start}, stop: {self.stop}, len: {self.__len__()}>"
def set_start(self, start):
"""
Adjust the starting point within the reader, useful for resuming an epoch
"""
self.start = start
def get_start(self):
return self.start
def get_sample(self):
"""
pre-proocess data from either reader into a common format

View File

@@ -528,13 +528,16 @@ class Tracker:
elif save_type == 'model':
if isinstance(trainer, DiffusionPriorTrainer):
prior = trainer.ema_diffusion_prior.ema_model if trainer.use_ema else trainer.diffusion_prior
prior: DiffusionPrior = trainer.unwrap_model(prior)
prior: DiffusionPrior = trainer.accelerator.unwrap_model(prior)
# Remove CLIP if it is part of the model
original_clip = prior.clip
prior.clip = None
model_state_dict = prior.state_dict()
prior.clip = original_clip
elif isinstance(trainer, DecoderTrainer):
decoder: Decoder = trainer.accelerator.unwrap_model(trainer.decoder)
# Remove CLIP if it is part of the model
original_clip = decoder.clip
decoder.clip = None
if trainer.use_ema:
trainable_unets = decoder.unets
@@ -543,6 +546,7 @@ class Tracker:
decoder.unets = trainable_unets # Swap back
else:
model_state_dict = decoder.state_dict()
decoder.clip = original_clip
else:
raise NotImplementedError('Saving this type of model with EMA mode enabled is not yet implemented. Actually, how did you get here?')
state_dict = {

View File

@@ -1,7 +1,7 @@
import json
from torchvision import transforms as T
from pydantic import BaseModel, validator, root_validator
from typing import List, Iterable, Optional, Union, Tuple, Dict, Any
from typing import List, Optional, Union, Tuple, Dict, Any, TypeVar
from x_clip import CLIP as XCLIP
from coca_pytorch import CoCa
@@ -25,11 +25,9 @@ def exists(val):
def default(val, d):
return val if exists(val) else d
def ListOrTuple(inner_type):
return Union[List[inner_type], Tuple[inner_type]]
def SingularOrIterable(inner_type):
return Union[inner_type, ListOrTuple(inner_type)]
InnerType = TypeVar('InnerType')
ListOrTuple = Union[List[InnerType], Tuple[InnerType]]
SingularOrIterable = Union[InnerType, ListOrTuple[InnerType]]
# general pydantic classes
@@ -145,6 +143,9 @@ class DiffusionPriorNetworkConfig(BaseModel):
normformer: bool = False
rotary_emb: bool = True
class Config:
extra = "allow"
def create(self):
kwargs = self.dict()
return DiffusionPriorNetwork(**kwargs)
@@ -187,23 +188,26 @@ class DiffusionPriorTrainConfig(BaseModel):
use_ema: bool = True
ema_beta: float = 0.99
amp: bool = False
save_every: int = 10000 # what steps to save on
warmup_steps: int = None # number of warmup steps
save_every_seconds: int = 3600 # how often to save
eval_timesteps: List[int] = [64] # which sampling timesteps to evaluate with
best_validation_loss: float = 1e9 # the current best valudation loss observed
current_epoch: int = 0 # the current epoch
num_samples_seen: int = 0 # the current number of samples seen
random_seed: int = 0 # manual seed for torch
class DiffusionPriorDataConfig(BaseModel):
image_url: str # path to embeddings folder
meta_url: str # path to metadata (captions) for images
splits: TrainSplitConfig
batch_size: int = 64
class DiffusionPriorLoadConfig(BaseModel):
source: str = None
resume: bool = False
image_url: str # path to embeddings folder
meta_url: str # path to metadata (captions) for images
splits: TrainSplitConfig # define train, validation, test splits for your dataset
batch_size: int # per-gpu batch size used to train the model
num_data_points: int = 25e7 # total number of datapoints to train on
eval_every_seconds: int = 3600 # validation statistics will be performed this often
class TrainDiffusionPriorConfig(BaseModel):
prior: DiffusionPriorConfig
data: DiffusionPriorDataConfig
train: DiffusionPriorTrainConfig
load: DiffusionPriorLoadConfig
tracker: TrackerConfig
@classmethod
@@ -216,13 +220,13 @@ class TrainDiffusionPriorConfig(BaseModel):
class UnetConfig(BaseModel):
dim: int
dim_mults: ListOrTuple(int)
dim_mults: ListOrTuple[int]
image_embed_dim: int = None
text_embed_dim: int = None
cond_on_text_encodings: bool = None
cond_dim: int = None
channels: int = 3
self_attn: ListOrTuple(int)
self_attn: ListOrTuple[int]
attn_dim_head: int = 32
attn_heads: int = 16
init_cross_embed: bool = True
@@ -231,16 +235,16 @@ class UnetConfig(BaseModel):
extra = "allow"
class DecoderConfig(BaseModel):
unets: ListOrTuple(UnetConfig)
unets: ListOrTuple[UnetConfig]
image_size: int = None
image_sizes: ListOrTuple(int) = None
image_sizes: ListOrTuple[int] = None
clip: Optional[AdapterConfig] # The clip model to use if embeddings are not provided
channels: int = 3
timesteps: int = 1000
sample_timesteps: Optional[SingularOrIterable(int)] = None
sample_timesteps: Optional[SingularOrIterable[int]] = None
loss_type: str = 'l2'
beta_schedule: ListOrTuple(str) = 'cosine'
learned_variance: bool = True
beta_schedule: ListOrTuple[str] = None # None means all cosine
learned_variance: SingularOrIterable[bool] = True
image_cond_drop_prob: float = 0.1
text_cond_drop_prob: float = 0.5
@@ -299,20 +303,22 @@ class DecoderDataConfig(BaseModel):
class DecoderTrainConfig(BaseModel):
epochs: int = 20
lr: SingularOrIterable(float) = 1e-4
wd: SingularOrIterable(float) = 0.01
warmup_steps: Optional[SingularOrIterable(int)] = None
lr: SingularOrIterable[float] = 1e-4
wd: SingularOrIterable[float] = 0.01
warmup_steps: Optional[SingularOrIterable[int]] = None
find_unused_parameters: bool = True
max_grad_norm: SingularOrIterable(float) = 0.5
max_grad_norm: SingularOrIterable[float] = 0.5
save_every_n_samples: int = 100000
n_sample_images: int = 6 # The number of example images to produce when sampling the train and test dataset
cond_scale: Union[float, List[float]] = 1.0
device: str = 'cuda:0'
epoch_samples: int = None # Limits the number of samples per epoch. None means no limit. Required if resample_train is true as otherwise the number of samples per epoch is infinite.
validation_samples: int = None # Same as above but for validation.
save_immediately: bool = False
use_ema: bool = True
ema_beta: float = 0.999
amp: bool = False
unet_training_mask: ListOrTuple(bool) = None # If None, use all unets
unet_training_mask: ListOrTuple[bool] = None # If None, use all unets
class DecoderEvaluateConfig(BaseModel):
n_evaluation_samples: int = 1000
@@ -321,12 +327,6 @@ class DecoderEvaluateConfig(BaseModel):
KID: Dict[str, Any] = None
LPIPS: Dict[str, Any] = None
class DecoderLoadConfig(BaseModel):
source: str = None # Supports file and wandb
run_path: str = '' # Used only if source is wandb
file_path: str = '' # The local filepath if source is file. If source is wandb, the relative path to the model file in wandb.
resume: bool = False # If using wandb, whether to resume the run
class TrainDecoderConfig(BaseModel):
decoder: DecoderConfig
data: DecoderDataConfig

View File

@@ -174,26 +174,24 @@ class DiffusionPriorTrainer(nn.Module):
def __init__(
self,
diffusion_prior,
accelerator = None,
use_ema = True,
lr = 3e-4,
wd = 1e-2,
eps = 1e-6,
max_grad_norm = None,
amp = False,
group_wd_params = True,
device = None,
accelerator = None,
verbose = True,
warmup_steps = 1,
**kwargs
):
super().__init__()
assert isinstance(diffusion_prior, DiffusionPrior)
assert not exists(accelerator) or isinstance(accelerator, Accelerator)
ema_kwargs, kwargs = groupby_prefix_and_trim('ema_', kwargs)
accelerator_kwargs, kwargs = groupby_prefix_and_trim('accelerator_', kwargs)
# verbosity
self.verbose = verbose
if not exists(accelerator):
accelerator = Accelerator(**accelerator_kwargs)
# assign some helpful member vars
@@ -202,23 +200,31 @@ class DiffusionPriorTrainer(nn.Module):
# setting the device
if not exists(accelerator) and not exists(device):
diffusion_prior_device = next(diffusion_prior.parameters()).device
self.print(f'accelerator not given, and device not specified: defaulting to device of diffusion prior parameters - {diffusion_prior_device}')
self.device = diffusion_prior_device
else:
self.device = accelerator.device if exists(accelerator) else device
diffusion_prior.to(self.device)
self.device = accelerator.device
diffusion_prior.to(self.device)
# save model
self.diffusion_prior = diffusion_prior
# optimizer and mixed precision stuff
# mixed precision checks
self.amp = amp
if (
exists(self.accelerator)
and self.accelerator.distributed_type == DistributedType.DEEPSPEED
and self.diffusion_prior.clip is not None
):
# Then we need to make sure clip is using the correct precision or else deepspeed will error
cast_type_map = {
"fp16": torch.half,
"bf16": torch.bfloat16,
"no": torch.float
}
precision_type = cast_type_map[accelerator.mixed_precision]
assert precision_type == torch.float, "DeepSpeed currently only supports float32 precision when using on the fly embedding generation from clip"
self.diffusion_prior.clip.to(precision_type)
self.scaler = GradScaler(enabled = amp)
# optimizer stuff
self.optim_kwargs = dict(lr=lr, wd=wd, eps=eps, group_wd_params=group_wd_params)
@@ -227,17 +233,21 @@ class DiffusionPriorTrainer(nn.Module):
**self.optim_kwargs,
**kwargs
)
self.scheduler = LambdaLR(self.optimizer, lr_lambda = lambda _: 1.0)
self.warmup_scheduler = warmup.LinearWarmup(self.optimizer, warmup_period = warmup_steps) if exists(warmup_steps) else None
# distribute the model if using HFA
if exists(self.accelerator):
self.diffusion_prior, self.optimizer = self.accelerator.prepare(self.diffusion_prior, self.optimizer)
self.diffusion_prior, self.optimizer, self.scheduler = self.accelerator.prepare(self.diffusion_prior, self.optimizer, self.scheduler)
# exponential moving average stuff
self.use_ema = use_ema
if self.use_ema:
self.ema_diffusion_prior = EMA(self.unwrap_model(self.diffusion_prior), **ema_kwargs)
self.ema_diffusion_prior = EMA(self.accelerator.unwrap_model(self.diffusion_prior), **ema_kwargs)
# gradient clipping if needed
@@ -247,67 +257,24 @@ class DiffusionPriorTrainer(nn.Module):
self.register_buffer('step', torch.tensor([0], device = self.device))
# accelerator wrappers
def print(self, msg):
if not self.verbose:
return
if exists(self.accelerator):
self.accelerator.print(msg)
else:
print(msg)
def unwrap_model(self, model):
if exists(self.accelerator):
return self.accelerator.unwrap_model(model)
else:
return model
def wait_for_everyone(self):
if exists(self.accelerator):
self.accelerator.wait_for_everyone()
def is_main_process(self):
if exists(self.accelerator):
return self.accelerator.is_main_process
else:
return True
def clip_grad_norm_(self, *args):
if exists(self.accelerator):
return self.accelerator.clip_grad_norm_(*args)
else:
return torch.nn.utils.clip_grad_norm_(*args)
def backprop(self, x):
if exists(self.accelerator):
self.accelerator.backward(x)
else:
try:
x.backward()
except Exception as e:
self.print(f"Caught error in backprop call: {e}")
# utility
def save(self, path, overwrite = True, **kwargs):
# ensure we sync gradients before continuing
self.wait_for_everyone()
# only save on the main process
if self.is_main_process():
self.print(f"Saving checkpoint at step: {self.step.item()}")
if self.accelerator.is_main_process:
print(f"Saving checkpoint at step: {self.step.item()}")
path = Path(path)
assert not (path.exists() and not overwrite)
path.parent.mkdir(parents = True, exist_ok = True)
# FIXME: LambdaLR can't be saved due to pickling issues
save_obj = dict(
scaler = self.scaler.state_dict(),
optimizer = self.optimizer.state_dict(),
model = self.unwrap_model(self.diffusion_prior).state_dict(), # unwrap the model from distribution if applicable
warmup_scheduler = self.warmup_scheduler,
model = self.accelerator.unwrap_model(self.diffusion_prior).state_dict(),
version = version.parse(__version__),
step = self.step.item(),
step = self.step,
**kwargs
)
@@ -320,14 +287,14 @@ class DiffusionPriorTrainer(nn.Module):
torch.save(save_obj, str(path))
def load(self, path, overwrite_lr = True, strict = True):
def load(self, path_or_state, overwrite_lr = True, strict = True):
"""
Load a checkpoint of a diffusion prior trainer.
Will load the entire trainer, including the optimizer and EMA.
Params:
- path (str): a path to the DiffusionPriorTrainer checkpoint file
- path_or_state (str | torch): a path to the DiffusionPriorTrainer checkpoint file
- overwrite_lr (bool): wether or not to overwrite the stored LR with the LR specified in the new trainer
- strict (bool): kwarg for `torch.nn.Module.load_state_dict`, will force an exact checkpoint match
@@ -336,56 +303,56 @@ class DiffusionPriorTrainer(nn.Module):
"""
# all processes need to load checkpoint. no restriction here
path = Path(path)
assert path.exists()
if isinstance(path_or_state, str):
path = Path(path_or_state)
assert path.exists()
loaded_obj = torch.load(str(path), map_location=self.device)
loaded_obj = torch.load(str(path), map_location=self.device)
elif isinstance(path_or_state, dict):
loaded_obj = path_or_state
if version.parse(__version__) != loaded_obj['version']:
print(f'loading saved diffusion prior at version {loaded_obj["version"]} but current package version is at {__version__}')
# unwrap the model when loading from checkpoint
self.unwrap_model(self.diffusion_prior).load_state_dict(loaded_obj['model'], strict = strict)
self.step.copy_(torch.ones_like(self.step) * loaded_obj['step'])
self.scaler.load_state_dict(loaded_obj['scaler'])
self.accelerator.unwrap_model(self.diffusion_prior).load_state_dict(loaded_obj['model'], strict = strict)
self.step.copy_(torch.ones_like(self.step, device=self.device) * loaded_obj['step'].to(self.device))
self.optimizer.load_state_dict(loaded_obj['optimizer'])
# set warmupstep
if exists(self.warmup_scheduler):
self.warmup_scheduler.last_step = self.step.item()
# ensure new lr is used if different from old one
if overwrite_lr:
new_lr = self.optim_kwargs["lr"]
self.print(f"Overriding LR to be {new_lr}")
for group in self.optimizer.param_groups:
group["lr"] = new_lr
group["lr"] = new_lr if group["lr"] > 0.0 else 0.0
if self.use_ema:
assert 'ema' in loaded_obj
self.ema_diffusion_prior.load_state_dict(loaded_obj['ema'], strict = strict)
# below not be necessary, but I had a suspicion that this wasn't being loaded correctly
# below might not be necessary, but I had a suspicion that this wasn't being loaded correctly
self.ema_diffusion_prior.ema_model.load_state_dict(loaded_obj["ema_model"])
# sync and inform
self.wait_for_everyone()
self.print(f"Loaded model")
return loaded_obj
# model functionality
def update(self):
# only continue with updates until all ranks finish
self.wait_for_everyone()
if exists(self.max_grad_norm):
self.scaler.unscale_(self.optimizer)
# utilize HFA clipping where applicable
self.clip_grad_norm_(self.diffusion_prior.parameters(), self.max_grad_norm)
self.scaler.step(self.optimizer)
self.scaler.update()
self.accelerator.clip_grad_norm_(self.diffusion_prior.parameters(), self.max_grad_norm)
self.optimizer.step()
self.optimizer.zero_grad()
# accelerator will ocassionally skip optimizer steps in a "dynamic loss scaling strategy"
if not self.accelerator.optimizer_step_was_skipped:
with self.warmup_scheduler.dampening():
self.scheduler.step()
if self.use_ema:
self.ema_diffusion_prior.update()
@@ -414,7 +381,7 @@ class DiffusionPriorTrainer(nn.Module):
@cast_torch_tensor
@prior_sample_in_chunks
def embed_text(self, *args, **kwargs):
return self.unwrap_model(self.diffusion_prior).clip.embed_text(*args, **kwargs)
return self.accelerator.unwrap_model(self.diffusion_prior).clip.embed_text(*args, **kwargs)
@cast_torch_tensor
def forward(
@@ -426,16 +393,14 @@ class DiffusionPriorTrainer(nn.Module):
total_loss = 0.
for chunk_size_frac, (chunked_args, chunked_kwargs) in split_args_and_kwargs(*args, split_size = max_batch_size, **kwargs):
with autocast(enabled = self.amp):
with self.accelerator.autocast():
loss = self.diffusion_prior(*chunked_args, **chunked_kwargs)
loss = loss * chunk_size_frac
total_loss += loss.item()
# backprop with accelerate if applicable
if self.training:
self.backprop(self.scaler.scale(loss))
self.accelerator.backward(loss)
return total_loss
@@ -498,23 +463,27 @@ class DecoderTrainer(nn.Module):
warmup_schedulers = []
for unet, unet_lr, unet_wd, unet_eps, unet_warmup_steps in zip(decoder.unets, lr, wd, eps, warmup_steps):
optimizer = get_optimizer(
unet.parameters(),
lr = unet_lr,
wd = unet_wd,
eps = unet_eps,
group_wd_params = group_wd_params,
**kwargs
)
if isinstance(unet, nn.Identity):
optimizers.append(None)
schedulers.append(None)
warmup_schedulers.append(None)
else:
optimizer = get_optimizer(
unet.parameters(),
lr = unet_lr,
wd = unet_wd,
eps = unet_eps,
group_wd_params = group_wd_params,
**kwargs
)
optimizers.append(optimizer)
optimizers.append(optimizer)
scheduler = LambdaLR(optimizer, lr_lambda = lambda step: 1.0)
scheduler = LambdaLR(optimizer, lr_lambda = lambda step: 1.0)
warmup_scheduler = warmup.LinearWarmup(optimizer, warmup_period = unet_warmup_steps) if exists(unet_warmup_steps) else None
warmup_schedulers.append(warmup_scheduler)
warmup_scheduler = warmup.LinearWarmup(optimizer, warmup_period = unet_warmup_steps) if exists(unet_warmup_steps) else None
warmup_schedulers.append(warmup_scheduler)
schedulers.append(scheduler)
schedulers.append(scheduler)
if self.use_ema:
self.ema_unets.append(EMA(unet, **ema_kwargs))
@@ -590,7 +559,8 @@ class DecoderTrainer(nn.Module):
for ind in range(0, self.num_unets):
optimizer_key = f'optim{ind}'
optimizer = getattr(self, optimizer_key)
save_obj = {**save_obj, optimizer_key: self.accelerator.unwrap_model(optimizer).state_dict()}
state_dict = optimizer.state_dict() if optimizer is not None else None
save_obj = {**save_obj, optimizer_key: state_dict}
if self.use_ema:
save_obj = {**save_obj, 'ema': self.ema_unets.state_dict()}
@@ -612,8 +582,8 @@ class DecoderTrainer(nn.Module):
optimizer_key = f'optim{ind}'
optimizer = getattr(self, optimizer_key)
warmup_scheduler = self.warmup_schedulers[ind]
self.accelerator.unwrap_model(optimizer).load_state_dict(loaded_obj[optimizer_key])
if optimizer is not None:
optimizer.load_state_dict(loaded_obj[optimizer_key])
if exists(warmup_scheduler):
warmup_scheduler.last_step = last_step
@@ -714,23 +684,32 @@ class DecoderTrainer(nn.Module):
*args,
unet_number = None,
max_batch_size = None,
return_lowres_cond_image=False,
**kwargs
):
unet_number = self.validate_and_return_unet_number(unet_number)
total_loss = 0.
using_amp = self.accelerator.mixed_precision != 'no'
cond_images = []
for chunk_size_frac, (chunked_args, chunked_kwargs) in split_args_and_kwargs(*args, split_size = max_batch_size, **kwargs):
with self.accelerator.autocast():
loss = self.decoder(*chunked_args, unet_number = unet_number, **chunked_kwargs)
loss_obj = self.decoder(*chunked_args, unet_number = unet_number, return_lowres_cond_image=return_lowres_cond_image, **chunked_kwargs)
# loss_obj may be a tuple with loss and cond_image
if return_lowres_cond_image:
loss, cond_image = loss_obj
else:
loss = loss_obj
cond_image = None
loss = loss * chunk_size_frac
if cond_image is not None:
cond_images.append(cond_image)
total_loss += loss.item()
if self.training:
self.accelerator.backward(loss)
return total_loss
if return_lowres_cond_image:
return total_loss, torch.stack(cond_images)
else:
return total_loss

View File

@@ -1 +1 @@
__version__ = '0.26.2'
__version__ = '1.6.2'

View File

@@ -26,7 +26,7 @@ setup(
install_requires=[
'accelerate',
'click',
'clip-anytorch',
'clip-anytorch>=2.4.0',
'coca-pytorch>=0.0.5',
'ema-pytorch>=0.0.7',
'einops>=0.4',

View File

@@ -1,5 +1,6 @@
from pathlib import Path
from typing import List
from datetime import timedelta
from dalle2_pytorch.trainer import DecoderTrainer
from dalle2_pytorch.dataloaders import create_image_embedding_dataloader
@@ -11,11 +12,12 @@ from clip import tokenize
import torchvision
import torch
from torch import nn
from torchmetrics.image.fid import FrechetInceptionDistance
from torchmetrics.image.inception import InceptionScore
from torchmetrics.image.kid import KernelInceptionDistance
from torchmetrics.image.lpip import LearnedPerceptualImagePatchSimilarity
from accelerate import Accelerator, DistributedDataParallelKwargs
from accelerate import Accelerator, DistributedDataParallelKwargs, InitProcessGroupKwargs
from accelerate.utils import dataclasses as accelerate_dataclasses
import webdataset as wds
import click
@@ -132,7 +134,7 @@ def get_example_data(dataloader, device, n=5):
break
return list(zip(images[:n], img_embeddings[:n], text_embeddings[:n], captions[:n]))
def generate_samples(trainer, example_data, condition_on_text_encodings=False, text_prepend="", match_image_size=True):
def generate_samples(trainer, example_data, start_unet=1, end_unet=None, condition_on_text_encodings=False, cond_scale=1.0, device=None, text_prepend="", match_image_size=True):
"""
Takes example data and generates images from the embeddings
Returns three lists: real images, generated images, and captions
@@ -157,6 +159,13 @@ def generate_samples(trainer, example_data, condition_on_text_encodings=False, t
# Then we are using precomputed text embeddings
text_embeddings = torch.stack(text_embeddings)
sample_params["text_encodings"] = text_embeddings
sample_params["start_at_unet_number"] = start_unet
sample_params["stop_at_unet_number"] = end_unet
if start_unet > 1:
# If we are only training upsamplers
sample_params["image"] = torch.stack(real_images)
if device is not None:
sample_params["_device"] = device
samples = trainer.sample(**sample_params)
generated_images = list(samples)
captions = [text_prepend + txt for txt in txts]
@@ -165,15 +174,15 @@ def generate_samples(trainer, example_data, condition_on_text_encodings=False, t
real_images = [resize_image_to(image, generated_image_size, clamp_range=(0, 1)) for image in real_images]
return real_images, generated_images, captions
def generate_grid_samples(trainer, examples, condition_on_text_encodings=False, text_prepend=""):
def generate_grid_samples(trainer, examples, start_unet=1, end_unet=None, condition_on_text_encodings=False, cond_scale=1.0, device=None, text_prepend=""):
"""
Generates samples and uses torchvision to put them in a side by side grid for easy viewing
"""
real_images, generated_images, captions = generate_samples(trainer, examples, condition_on_text_encodings, text_prepend)
real_images, generated_images, captions = generate_samples(trainer, examples, start_unet, end_unet, condition_on_text_encodings, cond_scale, device, text_prepend)
grid_images = [torchvision.utils.make_grid([original_image, generated_image]) for original_image, generated_image in zip(real_images, generated_images)]
return grid_images, captions
def evaluate_trainer(trainer, dataloader, device, condition_on_text_encodings=False, n_evaluation_samples=1000, FID=None, IS=None, KID=None, LPIPS=None):
def evaluate_trainer(trainer, dataloader, device, start_unet, end_unet, condition_on_text_encodings=False, cond_scale=1.0, inference_device=None, n_evaluation_samples=1000, FID=None, IS=None, KID=None, LPIPS=None):
"""
Computes evaluation metrics for the decoder
"""
@@ -183,7 +192,7 @@ def evaluate_trainer(trainer, dataloader, device, condition_on_text_encodings=Fa
if len(examples) == 0:
print("No data to evaluate. Check that your dataloader has shards.")
return metrics
real_images, generated_images, captions = generate_samples(trainer, examples, condition_on_text_encodings)
real_images, generated_images, captions = generate_samples(trainer, examples, start_unet, end_unet, condition_on_text_encodings, cond_scale, inference_device)
real_images = torch.stack(real_images).to(device=device, dtype=torch.float)
generated_images = torch.stack(generated_images).to(device=device, dtype=torch.float)
# Convert from [0, 1] to [0, 255] and from torch.float to torch.uint8
@@ -259,11 +268,13 @@ def train(
evaluate_config=None,
epoch_samples = None, # If the training dataset is resampling, we have to manually stop an epoch
validation_samples = None,
save_immediately=False,
epochs = 20,
n_sample_images = 5,
save_every_n_samples = 100000,
unet_training_mask=None,
condition_on_text_encodings=False,
cond_scale=1.0,
**kwargs
):
"""
@@ -271,6 +282,21 @@ def train(
"""
is_master = accelerator.process_index == 0
if not exists(unet_training_mask):
# Then the unet mask should be true for all unets in the decoder
unet_training_mask = [True] * len(decoder.unets)
assert len(unet_training_mask) == len(decoder.unets), f"The unet training mask should be the same length as the number of unets in the decoder. Got {len(unet_training_mask)} and {trainer.num_unets}"
trainable_unet_numbers = [i+1 for i, trainable in enumerate(unet_training_mask) if trainable]
first_trainable_unet = trainable_unet_numbers[0]
last_trainable_unet = trainable_unet_numbers[-1]
def move_unets(unet_training_mask):
for i in range(len(decoder.unets)):
if not unet_training_mask[i]:
# Replace the unet from the module list with a nn.Identity(). This training script never uses unets that aren't being trained so this is fine.
decoder.unets[i] = nn.Identity().to(inference_device)
# Remove non-trainable unets
move_unets(unet_training_mask)
trainer = DecoderTrainer(
decoder=decoder,
accelerator=accelerator,
@@ -285,6 +311,7 @@ def train(
sample = 0
samples_seen = 0
val_sample = 0
step = lambda: int(trainer.num_steps_taken(unet_number=first_trainable_unet))
if tracker.can_recall:
start_epoch, validation_losses, next_task, recalled_sample, samples_seen = recall_trainer(tracker, trainer)
@@ -296,13 +323,6 @@ def train(
accelerator.print(f"Starting training from task {next_task} at sample {sample} and validation sample {val_sample}")
trainer.to(device=inference_device)
if not exists(unet_training_mask):
# Then the unet mask should be true for all unets in the decoder
unet_training_mask = [True] * trainer.num_unets
first_training_unet = min(index for index, mask in enumerate(unet_training_mask) if mask)
step = lambda: int(trainer.num_steps_taken(unet_number=first_training_unet+1))
assert len(unet_training_mask) == trainer.num_unets, f"The unet training mask should be the same length as the number of unets in the decoder. Got {len(unet_training_mask)} and {trainer.num_unets}"
accelerator.print(print_ribbon("Generating Example Data", repeat=40))
accelerator.print("This can take a while to load the shard lists...")
if is_master:
@@ -360,7 +380,7 @@ def train(
tokenized_texts = tokenize(txt, truncate=True)
assert tokenized_texts.shape[0] == len(img), f"The number of texts ({tokenized_texts.shape[0]}) should be the same as the number of images ({len(img)})"
forward_params['text'] = tokenized_texts
loss = trainer.forward(img, **forward_params, unet_number=unet)
loss = trainer.forward(img, **forward_params, unet_number=unet, _device=inference_device)
trainer.update(unet_number=unet)
unet_losses_tensor[i % TRAIN_CALC_LOSS_EVERY_ITERS, unet-1] = loss
@@ -373,10 +393,10 @@ def train(
unet_all_losses = accelerator.gather(unet_losses_tensor)
mask = unet_all_losses != 0
unet_average_loss = (unet_all_losses * mask).sum(dim=0) / mask.sum(dim=0)
loss_map = { f"Unet {index} Training Loss": loss.item() for index, loss in enumerate(unet_average_loss) if loss != 0 }
loss_map = { f"Unet {index} Training Loss": loss.item() for index, loss in enumerate(unet_average_loss) if unet_training_mask[index] }
# gather decay rate on each UNet
ema_decay_list = {f"Unet {index} EMA Decay": ema_unet.get_current_decay() for index, ema_unet in enumerate(trainer.ema_unets)}
ema_decay_list = {f"Unet {index} EMA Decay": ema_unet.get_current_decay() for index, ema_unet in enumerate(trainer.ema_unets) if unet_training_mask[index]}
log_data = {
"Epoch": epoch,
@@ -391,7 +411,7 @@ def train(
if is_master:
tracker.log(log_data, step=step())
if is_master and last_snapshot + save_every_n_samples < sample: # This will miss by some amount every time, but it's not a big deal... I hope
if is_master and (last_snapshot + save_every_n_samples < sample or (save_immediately and i == 0)): # This will miss by some amount every time, but it's not a big deal... I hope
# It is difficult to gather this kind of info on the accelerator, so we have to do it on the master
print("Saving snapshot")
last_snapshot = sample
@@ -399,7 +419,7 @@ def train(
save_trainer(tracker, trainer, epoch, sample, next_task, validation_losses, samples_seen)
if exists(n_sample_images) and n_sample_images > 0:
trainer.eval()
train_images, train_captions = generate_grid_samples(trainer, train_example_data, condition_on_text_encodings, "Train: ")
train_images, train_captions = generate_grid_samples(trainer, train_example_data, first_trainable_unet, last_trainable_unet, condition_on_text_encodings, cond_scale, inference_device, "Train: ")
tracker.log_images(train_images, captions=train_captions, image_section="Train Samples", step=step())
if epoch_samples is not None and sample >= epoch_samples:
@@ -449,8 +469,9 @@ def train(
else:
# Then we need to pass the text instead
tokenized_texts = tokenize(txt, truncate=True)
assert tokenized_texts.shape[0] == len(img), f"The number of texts ({tokenized_texts.shape[0]}) should be the same as the number of images ({len(img)})"
forward_params['text'] = tokenized_texts
loss = trainer.forward(img.float(), **forward_params, unet_number=unet)
loss = trainer.forward(img.float(), **forward_params, unet_number=unet, _device=inference_device)
average_val_loss_tensor[0, unet-1] += loss
if i % VALID_CALC_LOSS_EVERY_ITERS == 0:
@@ -477,7 +498,7 @@ def train(
if next_task == 'eval':
if exists(evaluate_config):
accelerator.print(print_ribbon(f"Starting Evaluation {epoch}", repeat=40))
evaluation = evaluate_trainer(trainer, dataloaders["val"], inference_device, **evaluate_config.dict(), condition_on_text_encodings=condition_on_text_encodings)
evaluation = evaluate_trainer(trainer, dataloaders["val"], inference_device, first_trainable_unet, last_trainable_unet, inference_device=inference_device, **evaluate_config.dict(), condition_on_text_encodings=condition_on_text_encodings, cond_scale=cond_scale)
if is_master:
tracker.log(evaluation, step=step())
next_task = 'sample'
@@ -488,15 +509,15 @@ def train(
# Generate examples and save the model if we are the master
# Generate sample images
print(print_ribbon(f"Sampling Set {epoch}", repeat=40))
test_images, test_captions = generate_grid_samples(trainer, test_example_data, condition_on_text_encodings, "Test: ")
train_images, train_captions = generate_grid_samples(trainer, train_example_data, condition_on_text_encodings, "Train: ")
test_images, test_captions = generate_grid_samples(trainer, test_example_data, first_trainable_unet, last_trainable_unet, condition_on_text_encodings, cond_scale, inference_device, "Test: ")
train_images, train_captions = generate_grid_samples(trainer, train_example_data, first_trainable_unet, last_trainable_unet, condition_on_text_encodings, cond_scale, inference_device, "Train: ")
tracker.log_images(test_images, captions=test_captions, image_section="Test Samples", step=step())
tracker.log_images(train_images, captions=train_captions, image_section="Train Samples", step=step())
print(print_ribbon(f"Starting Saving {epoch}", repeat=40))
is_best = False
if all_average_val_losses is not None:
average_loss = all_average_val_losses.mean(dim=0).item()
average_loss = all_average_val_losses.mean(dim=0).sum() / sum(unet_training_mask)
if len(validation_losses) == 0 or average_loss < min(validation_losses):
is_best = True
validation_losses.append(average_loss)
@@ -522,7 +543,8 @@ def initialize_training(config: TrainDecoderConfig, config_path):
# Set up accelerator for configurable distributed training
ddp_kwargs = DistributedDataParallelKwargs(find_unused_parameters=config.train.find_unused_parameters)
accelerator = Accelerator(kwargs_handlers=[ddp_kwargs])
init_kwargs = InitProcessGroupKwargs(timeout=timedelta(seconds=60*60))
accelerator = Accelerator(kwargs_handlers=[ddp_kwargs, init_kwargs])
if accelerator.num_processes > 1:
# We are using distributed training and want to immediately ensure all can connect

View File

@@ -1,31 +1,23 @@
# TODO: add start, num_data_points, eval_every and group to config
# TODO: switch back to repo's wandb
START = 0
NUM_DATA_POINTS = 250e6
EVAL_EVERY = 1000
GROUP = "distributed"
import os
import click
import wandb
import torch
from torch import nn
from torch.utils.data import DataLoader
import numpy as np
from typing import List
from accelerate import Accelerator
from accelerate.utils import set_seed
from torch.utils.data import DataLoader
from embedding_reader import EmbeddingReader
from accelerate.utils import dataclasses as accelerate_dataclasses
from dalle2_pytorch.dataloaders import get_reader, make_splits
from dalle2_pytorch.utils import Timer
from dalle2_pytorch.trackers import Tracker
from dalle2_pytorch import DiffusionPriorTrainer
from dalle2_pytorch.dataloaders import get_reader, make_splits
from dalle2_pytorch.train_configs import (
DiffusionPriorConfig,
DiffusionPriorTrainConfig,
TrainDiffusionPriorConfig,
)
from dalle2_pytorch.trackers import BaseTracker, WandbTracker
from dalle2_pytorch import DiffusionPriorTrainer
# helpers
@@ -38,8 +30,19 @@ def exists(val):
return val is not None
def all_between(values: list, lower_bound, upper_bound):
for value in values:
if value < lower_bound or value > upper_bound:
return False
return True
def make_model(
prior_config, train_config, device: str = None, accelerator: Accelerator = None
prior_config: DiffusionPriorConfig,
train_config: DiffusionPriorTrainConfig,
device: str = None,
accelerator: Accelerator = None,
):
# create model from config
diffusion_prior = prior_config.create()
@@ -54,71 +57,214 @@ def make_model(
use_ema=train_config.use_ema,
device=device,
accelerator=accelerator,
warmup_steps=train_config.warmup_steps,
)
return trainer
def create_tracker(
accelerator: Accelerator,
config: TrainDiffusionPriorConfig,
config_path: str,
dummy: bool = False,
) -> Tracker:
tracker_config = config.tracker
accelerator_config = {
"Distributed": accelerator.distributed_type
!= accelerate_dataclasses.DistributedType.NO,
"DistributedType": accelerator.distributed_type,
"NumProcesses": accelerator.num_processes,
"MixedPrecision": accelerator.mixed_precision,
}
tracker: Tracker = tracker_config.create(
config, accelerator_config, dummy_mode=dummy
)
tracker.save_config(config_path, config_name="prior_config.json")
return tracker
def pad_gather_reduce(trainer: DiffusionPriorTrainer, x, method="mean"):
"""
pad a value or tensor across all processes and gather
params:
- trainer: a trainer that carries an accelerator object
- x: a number or torch tensor to reduce
- method: "mean", "sum", "max", "min"
return:
- the average tensor after maskin out 0's
- None if the gather resulted in an empty tensor
"""
assert method in [
"mean",
"sum",
"max",
"min",
], "This function has limited capabilities [sum, mean, max, min]"
assert type(x) is not None, "Cannot reduce a None type object"
# wait for everyone to arrive here before gathering
if type(x) is not torch.Tensor:
x = torch.tensor([x])
# verify that the tensor is on the proper device
x = x.to(trainer.device)
# pad across processes
padded_x = trainer.accelerator.pad_across_processes(x, dim=0)
# gather across all procesess
gathered_x = trainer.accelerator.gather(padded_x)
# mask out zeros
masked_x = gathered_x[gathered_x != 0]
# if the tensor is empty, warn and return None
if len(masked_x) == 0:
click.secho(
f"The call to this method resulted in an empty tensor after masking out zeros. The gathered tensor was this: {gathered_x} and the original value passed was: {x}.",
fg="red",
)
return None
if method == "mean":
return torch.mean(masked_x)
elif method == "sum":
return torch.sum(masked_x)
elif method == "max":
return torch.max(masked_x)
elif method == "min":
return torch.min(masked_x)
def save_trainer(
tracker: Tracker,
trainer: DiffusionPriorTrainer,
is_latest: bool,
is_best: bool,
epoch: int,
samples_seen: int,
best_validation_loss: float,
):
"""
Logs the model with an appropriate method depending on the tracker
"""
trainer.accelerator.wait_for_everyone()
if trainer.accelerator.is_main_process:
click.secho(
f"RANK:{trainer.accelerator.process_index} | Saving Model | Best={is_best} | Latest={is_latest}",
fg="magenta",
)
tracker.save(
trainer=trainer,
is_best=is_best,
is_latest=is_latest,
epoch=int(epoch),
samples_seen=int(samples_seen),
best_validation_loss=best_validation_loss,
)
def recall_trainer(tracker: Tracker, trainer: DiffusionPriorTrainer):
"""
Loads the model with an appropriate method depending on the tracker
"""
if trainer.accelerator.is_main_process:
click.secho(f"Loading model from {type(tracker.loader).__name__}", fg="yellow")
state_dict = tracker.recall()
trainer.load(state_dict, strict=True)
return (
int(state_dict.get("epoch", 0)),
state_dict.get("best_validation_loss", 0),
int(state_dict.get("samples_seen", 0)),
)
# eval functions
def eval_model(
def report_validation_loss(
trainer: DiffusionPriorTrainer,
dataloader: DataLoader,
text_conditioned: bool,
use_ema: bool,
tracker: Tracker,
split: str,
tracker_folder: str,
loss_type: str,
tracker_context: str,
tracker: BaseTracker = None,
use_ema: bool = True,
):
trainer.eval()
if trainer.is_main_process():
click.secho(f"Measuring performance on {tracker_context}", fg="green", blink=True)
"""
Compute the validation loss on a given subset of data.
"""
with torch.no_grad():
total_loss = 0.0
total_samples = 0.0
if trainer.accelerator.is_main_process:
click.secho(
f"Measuring performance on {use_ema}-{split} split",
fg="green",
blink=True,
)
for image_embeddings, text_data in dataloader:
image_embeddings = image_embeddings.to(trainer.device)
text_data = text_data.to(trainer.device)
total_loss = torch.zeros(1, dtype=torch.float, device=trainer.device)
batches = image_embeddings.shape[0]
for image_embeddings, text_data in dataloader:
image_embeddings = image_embeddings.to(trainer.device)
text_data = text_data.to(trainer.device)
input_args = dict(image_embed=image_embeddings)
input_args = dict(image_embed=image_embeddings)
if text_conditioned:
input_args = dict(**input_args, text=text_data)
else:
input_args = dict(**input_args, text_embed=text_data)
if text_conditioned:
input_args = dict(**input_args, text=text_data)
else:
input_args = dict(**input_args, text_embed=text_data)
if use_ema:
loss = trainer.ema_diffusion_prior(**input_args)
else:
loss = trainer(**input_args)
if use_ema:
loss = trainer.ema_diffusion_prior(**input_args)
else:
loss = trainer(**input_args)
total_loss += loss * batches
total_samples += batches
total_loss += loss
avg_loss = total_loss / total_samples
# compute the average loss across all processes
stats = {f"{tracker_context}-{loss_type}": avg_loss}
trainer.print(stats)
avg_loss = pad_gather_reduce(trainer, total_loss, method="mean")
stats = {f"{tracker_folder}/{loss_type}-loss": avg_loss}
if exists(tracker):
tracker.log(stats, step=trainer.step.item() + 1)
# print and log results on main process
tracker.log(stats, step=trainer.step.item() + 1)
return avg_loss
def report_cosine_sims(
trainer: DiffusionPriorTrainer,
dataloader: DataLoader,
text_conditioned: bool,
tracker: BaseTracker,
tracker_context: str = "validation",
tracker: Tracker,
split: str,
timesteps: int,
tracker_folder: str,
):
trainer.eval()
if trainer.is_main_process():
click.secho("Measuring Cosine-Similarity", fg="green", blink=True)
if trainer.accelerator.is_main_process:
click.secho(
f"Measuring Cosine-Similarity on {split} split with {timesteps} timesteps",
fg="green",
blink=True,
)
for test_image_embeddings, text_data in dataloader:
test_image_embeddings = test_image_embeddings.to(trainer.device)
@@ -127,9 +273,7 @@ def report_cosine_sims(
# we are text conditioned, we produce an embedding from the tokenized text
if text_conditioned:
text_embedding, text_encodings = trainer.embed_text(text_data)
text_cond = dict(
text_embed=text_embedding, text_encodings=text_encodings
)
text_cond = dict(text_embed=text_embedding, text_encodings=text_encodings)
else:
text_embedding = text_data
text_cond = dict(text_embed=text_embedding)
@@ -150,8 +294,7 @@ def report_cosine_sims(
text_encodings_shuffled = None
text_cond_shuffled = dict(
text_embed=text_embed_shuffled,
text_encodings=text_encodings_shuffled
text_embed=text_embed_shuffled, text_encodings=text_encodings_shuffled
)
# prepare the text embedding
@@ -164,7 +307,9 @@ def report_cosine_sims(
# predict on the unshuffled text embeddings
predicted_image_embeddings = trainer.p_sample_loop(
test_image_embeddings.shape, text_cond
test_image_embeddings.shape,
text_cond,
timesteps=timesteps,
)
predicted_image_embeddings = (
@@ -174,7 +319,9 @@ def report_cosine_sims(
# predict on the shuffled embeddings
predicted_unrelated_embeddings = trainer.p_sample_loop(
test_image_embeddings.shape, text_cond_shuffled
test_image_embeddings.shape,
text_cond_shuffled,
timesteps=timesteps,
)
predicted_unrelated_embeddings = (
@@ -183,32 +330,97 @@ def report_cosine_sims(
)
# calculate similarities
original_similarity = cos(text_embed, test_image_embeddings).cpu().numpy()
predicted_similarity = cos(text_embed, predicted_image_embeddings).cpu().numpy()
unrelated_similarity = (
cos(text_embed, predicted_unrelated_embeddings).cpu().numpy()
orig_sim = pad_gather_reduce(
trainer, cos(text_embed, test_image_embeddings), method="mean"
)
predicted_img_similarity = (
cos(test_image_embeddings, predicted_image_embeddings).cpu().numpy()
pred_sim = pad_gather_reduce(
trainer, cos(text_embed, predicted_image_embeddings), method="mean"
)
unrel_sim = pad_gather_reduce(
trainer, cos(text_embed, predicted_unrelated_embeddings), method="mean"
)
pred_img_sim = pad_gather_reduce(
trainer,
cos(test_image_embeddings, predicted_image_embeddings),
method="mean",
)
stats = {
f"{tracker_context}/baseline similarity": np.mean(original_similarity),
f"{tracker_context}/similarity with text": np.mean(predicted_similarity),
f"{tracker_context}/similarity with original image": np.mean(
predicted_img_similarity
),
f"{tracker_context}/similarity with unrelated caption": np.mean(unrelated_similarity),
f"{tracker_context}/difference from baseline similarity": np.mean(
predicted_similarity - original_similarity
),
f"{tracker_folder}/baseline similarity [steps={timesteps}]": orig_sim,
f"{tracker_folder}/similarity with text [steps={timesteps}]": pred_sim,
f"{tracker_folder}/similarity with original image [steps={timesteps}]": pred_img_sim,
f"{tracker_folder}/similarity with unrelated caption [steps={timesteps}]": unrel_sim,
f"{tracker_folder}/difference from baseline similarity [steps={timesteps}]": pred_sim
- orig_sim,
}
for k, v in stats.items():
trainer.print(f"{tracker_context}/{k}: {v}")
tracker.log(stats, step=trainer.step.item() + 1)
if exists(tracker):
tracker.log(stats, step=trainer.step.item() + 1)
def eval_model(
trainer: DiffusionPriorTrainer,
dataloader: DataLoader,
text_conditioned: bool,
split: str,
tracker: Tracker,
use_ema: bool,
report_cosine: bool,
report_loss: bool,
timesteps: List[int],
loss_type: str = None,
):
"""
Run evaluation on a model and track metrics
returns: loss if requested
"""
trainer.eval()
use_ema = "ema" if use_ema else "online"
tracker_folder = f"metrics/{use_ema}-{split}"
# detemine if valid timesteps are passed
min_timesteps = trainer.accelerator.unwrap_model(
trainer.diffusion_prior
).sample_timesteps
max_timesteps = trainer.accelerator.unwrap_model(
trainer.diffusion_prior
).noise_scheduler.num_timesteps
assert all_between(
timesteps, lower_bound=min_timesteps, upper_bound=max_timesteps
), f"all timesteps values must be between {min_timesteps} and {max_timesteps}: got {timesteps}"
# measure cosine metrics across various eta and timesteps
if report_cosine:
for timestep in timesteps:
report_cosine_sims(
trainer,
dataloader=dataloader,
text_conditioned=text_conditioned,
tracker=tracker,
split=split,
timesteps=timestep,
tracker_folder=tracker_folder,
)
# measure loss on a seperate split of data
if report_loss:
loss = report_validation_loss(
trainer=trainer,
dataloader=dataloader,
text_conditioned=text_conditioned,
use_ema=use_ema,
tracker=tracker,
split=split,
tracker_folder=tracker_folder,
loss_type=loss_type,
)
return loss
# training script
@@ -216,182 +428,327 @@ def report_cosine_sims(
def train(
trainer: DiffusionPriorTrainer,
tracker: Tracker,
train_loader: DataLoader,
eval_loader: DataLoader,
test_loader: DataLoader,
config: DiffusionPriorTrainConfig,
):
# distributed tracking with wandb
if trainer.accelerator.num_processes > 1:
os.environ["WANDB_START_METHOD"] = "thread"
# init timers
save_timer = Timer() # when to save
samples_timer = Timer() # samples/sec
validation_profiler = Timer() # how long is validation taking
validation_countdown = Timer() # when to perform evalutation
tracker = wandb.init(
name=f"RANK:{trainer.device}",
entity=config.tracker.wandb_entity,
project=config.tracker.wandb_project,
config=config.dict(),
group=GROUP,
)
# keep track of best validation loss
# sync after tracker init
trainer.wait_for_everyone()
# init a timer
timer = Timer()
best_validation_loss = config.train.best_validation_loss
samples_seen = config.train.num_samples_seen
# do training
for img, txt in train_loader:
trainer.train()
current_step = trainer.step.item() + 1
# place data on device
img = img.to(trainer.device)
txt = txt.to(trainer.device)
start_epoch = config.train.current_epoch
# pass to model
loss = trainer(text=txt, image_embed=img)
for epoch in range(start_epoch, config.train.epochs):
# if we finished out an old epoch, reset the distribution to be a full epoch
tracker.log({"tracking/epoch": epoch}, step=trainer.step.item())
# display & log loss (will only print from main process)
trainer.print(f"Step {current_step}: Loss {loss}")
if train_loader.dataset.get_start() > 0 and epoch == start_epoch+1:
if trainer.accelerator.is_main_process:
click.secho(f"Finished resumed epoch...resetting dataloader.")
train_loader.dataset.set_start(0)
# perform backprop & apply EMA updates
trainer.update()
for img, txt in train_loader:
# setup things every step
# track samples/sec/rank
samples_per_sec = img.shape[0] / timer.elapsed()
trainer.train()
current_step = trainer.step.item()
samples_timer.reset()
# samples seen
samples_seen = (
config.data.batch_size * trainer.accelerator.num_processes * current_step
)
# place data on device
# ema decay
ema_decay = trainer.ema_diffusion_prior.get_current_decay()
img = img.to(trainer.device)
txt = txt.to(trainer.device)
# Log on all processes for debugging
tracker.log(
{
"tracking/samples-sec": samples_per_sec,
"tracking/samples-seen": samples_seen,
"tracking/ema-decay": ema_decay,
"metrics/training-loss": loss,
},
step=current_step,
)
# pass to model
# Metric Tracking & Checkpointing (outside of timer's scope)
if current_step % EVAL_EVERY == 0:
eval_model(
trainer=trainer,
dataloader=eval_loader,
text_conditioned=config.prior.condition_on_text_encodings,
loss_type=config.prior.loss_type,
tracker_context="metrics/online-model-validation",
tracker=tracker,
use_ema=False,
loss = trainer(text=txt, image_embed=img)
# perform backprop & apply EMA updates
trainer.update()
# gather info about training step
all_loss = pad_gather_reduce(trainer, loss, method="mean")
num_samples = pad_gather_reduce(trainer, len(txt), method="sum")
samples_per_sec = num_samples / samples_timer.elapsed()
samples_seen += num_samples
ema_decay = trainer.ema_diffusion_prior.get_current_decay()
# log
tracker.log(
{
"tracking/samples-sec": samples_per_sec,
"tracking/samples-seen": samples_seen,
"tracking/ema-decay": ema_decay,
f"tracking/training-{config.prior.loss_type}": all_loss,
},
step=current_step,
)
eval_model(
trainer=trainer,
dataloader=eval_loader,
text_conditioned=config.prior.condition_on_text_encodings,
loss_type=config.prior.loss_type,
tracker_context="metrics/ema-model-validation",
tracker=tracker,
use_ema=True,
# Metric Tracking @ Timed Intervals
eval_delta = pad_gather_reduce(
trainer, validation_countdown.elapsed(), method="min"
)
report_cosine_sims(
trainer=trainer,
dataloader=eval_loader,
text_conditioned=config.prior.condition_on_text_encodings,
tracker=tracker,
tracker_context="metrics",
)
if eval_delta != None and eval_delta > config.data.eval_every_seconds:
# begin timing how long this takes
if current_step % config.train.save_every == 0:
trainer.save(f"{config.tracker.data_path}/chkpt_step_{current_step}.pth")
validation_profiler.reset()
# reset timer for next round
timer.reset()
# package kwargs for evaluation
eval_kwargs = {
"trainer": trainer,
"tracker": tracker,
"text_conditioned": config.prior.condition_on_text_encodings,
"timesteps": config.train.eval_timesteps,
}
# ONLINE MODEL : COSINE : LOSS : VALIDATION SPLIT
eval_model(
dataloader=eval_loader,
loss_type=config.prior.loss_type,
split="validation",
use_ema=False,
report_cosine=False,
report_loss=True,
**eval_kwargs,
)
# EMA MODEL : COSINE : LOSS : VALIDATION DATA
ema_val_loss = eval_model(
dataloader=eval_loader,
loss_type=config.prior.loss_type,
split="validation",
use_ema=True,
report_cosine=True,
report_loss=True,
**eval_kwargs,
)
tracker.log(
{
"tracking/validation length (minutes)": validation_profiler.elapsed()
/ 60
}
)
# check if the ema validation is the lowest seen yet
if ema_val_loss < best_validation_loss:
best_validation_loss = ema_val_loss
# go save the model as best
save_trainer(
trainer=trainer,
tracker=tracker,
is_best=True,
is_latest=False,
samples_seen=samples_seen,
epoch=epoch,
best_validation_loss=best_validation_loss,
)
# reset timer for validaiton
validation_countdown.reset()
elif eval_delta is None:
click.secho(
f"Error occured reading the eval time on rank: {trainer.device}",
fg="yellow",
)
# save as latest model on schedule
save_delta = pad_gather_reduce(trainer, save_timer.elapsed(), method="min")
if save_delta != None and save_delta >= config.train.save_every_seconds:
save_trainer(
trainer=trainer,
tracker=tracker,
is_best=False,
is_latest=True,
samples_seen=samples_seen,
epoch=epoch,
best_validation_loss=best_validation_loss,
)
save_timer.reset()
elif save_delta is None:
click.secho(
f"Error occured reading the save time on rank: {trainer.device}",
fg="yellow",
)
# evaluate on test data
eval_model(
if trainer.accelerator.is_main_process:
click.secho(f"Starting Test", fg="red")
# save one last time as latest before beginning validation
save_trainer(
tracker=tracker,
trainer=trainer,
is_best=False,
is_latest=True,
samples_seen=samples_seen,
epoch=epoch,
best_validation_loss=best_validation_loss,
)
test_loss = eval_model(
trainer=trainer,
dataloader=test_loader,
text_conditioned=config.prior.condition_on_text_encodings,
loss_type=config.prior.loss_type,
tracker_context="test",
split="test",
tracker=tracker,
use_ema=True,
report_cosine=False,
report_loss=True,
timesteps=config.train.eval_timesteps,
loss_type=config.prior.loss_type,
)
report_cosine_sims(
trainer,
test_loader,
config.prior.condition_on_text_encodings,
tracker,
tracker_context="test",
)
if test_loss < best_validation_loss:
best_validation_loss = test_loss
# go save the model as best
save_trainer(
trainer=trainer,
tracker=tracker,
is_best=True,
is_latest=False,
samples_seen=samples_seen,
epoch=epoch,
best_validation_loss=test_loss,
)
def initialize_training(config, accelerator=None):
def initialize_training(config_file, accelerator):
"""
Parse the configuration file, and prepare everything necessary for training
"""
# load the configuration file
if accelerator.is_main_process:
click.secho(f"Loading configuration from {config_file}", fg="green")
config = TrainDiffusionPriorConfig.from_json_path(config_file)
# seed
set_seed(config.train.random_seed)
# get a device
if accelerator:
device = accelerator.device
click.secho(f"Accelerating on: {device}", fg="yellow")
else:
if torch.cuda.is_available():
click.secho("GPU detected, defaulting to cuda:0", fg="yellow")
device = "cuda:0"
else:
click.secho("No GPU detected...using cpu", fg="yellow")
device = "cpu"
device = accelerator.device
# make the trainer (will automatically distribute if possible & configured)
trainer = make_model(config.prior, config.train, device, accelerator).to(device)
trainer: DiffusionPriorTrainer = make_model(
config.prior, config.train, device, accelerator
).to(device)
# create a tracker
tracker = create_tracker(
accelerator, config, config_file, dummy=accelerator.process_index != 0
)
# reload from chcekpoint
if config.load.resume == True:
click.secho(f"Loading checkpoint: {config.load.source}", fg="cyan")
trainer.load(config.load.source)
if tracker.can_recall:
current_epoch, best_validation_loss, samples_seen = recall_trainer(
tracker=tracker, trainer=trainer
)
# display best values
if trainer.accelerator.is_main_process:
click.secho(f"Current Epoch: {current_epoch} | Best Val Loss: {best_validation_loss} | Samples Seen: {samples_seen}", fg="yellow")
# update config to reflect recalled values
config.train.num_samples_seen = samples_seen
config.train.current_epoch = current_epoch
config.train.best_validation_loss = best_validation_loss
# fetch and prepare data
if trainer.is_main_process():
click.secho("Grabbing data from source", fg="blue", blink=True)
if trainer.accelerator.is_main_process:
click.secho("Grabbing data...", fg="blue", blink=True)
trainer.accelerator.wait_for_everyone()
img_reader = get_reader(
text_conditioned=trainer.text_conditioned,
img_url=config.data.image_url,
meta_url=config.data.meta_url,
)
# calculate start point within epoch
trainer.accelerator.wait_for_everyone()
train_loader, eval_loader, test_loader = make_splits(
text_conditioned=trainer.text_conditioned,
batch_size=config.data.batch_size,
num_data_points=NUM_DATA_POINTS,
num_data_points=config.data.num_data_points,
train_split=config.data.splits.train,
eval_split=config.data.splits.val,
image_reader=img_reader,
rank=accelerator.state.process_index if exists(accelerator) else 0,
world_size=accelerator.state.num_processes if exists(accelerator) else 1,
start=START,
rank=accelerator.state.process_index,
world_size=accelerator.state.num_processes,
start=0,
)
# wait for everyone to load data before continuing
trainer.wait_for_everyone()
# update the start point to finish out the epoch on a resumed run
if tracker.can_recall:
samples_seen = config.train.num_samples_seen
length = (
config.data.num_data_points
if samples_seen <= img_reader.count
else img_reader.count
)
scaled_samples = length * config.train.current_epoch
start_point = (
scaled_samples - samples_seen if scaled_samples > samples_seen else samples_seen
)
if trainer.accelerator.is_main_process:
click.secho(f"Resuming at sample: {start_point}", fg="yellow")
train_loader.dataset.set_start(start_point)
# start training
if trainer.accelerator.is_main_process:
click.secho(
f"Beginning Prior Training : Distributed={accelerator.state.distributed_type != accelerate_dataclasses.DistributedType.NO}",
fg="yellow",
)
train(
trainer=trainer,
tracker=tracker,
train_loader=train_loader,
eval_loader=eval_loader,
test_loader=test_loader,
@@ -400,23 +757,13 @@ def initialize_training(config, accelerator=None):
@click.command()
@click.option("--hfa", default=True)
@click.option("--config_path", default="configs/prior.json")
def main(hfa, config_path):
# start HFA if requested
if hfa:
accelerator = Accelerator()
else:
accelerator = None
@click.option("--config_file", default="configs/train_prior_config.example.json")
def main(config_file):
# start HFA
accelerator = Accelerator()
# load the configuration file on main process
if not exists(accelerator) or accelerator.is_main_process:
click.secho(f"Loading configuration from {config_path}", fg="green")
config = TrainDiffusionPriorConfig.from_json_path(config_path)
# send config to get processed
initialize_training(config, accelerator)
# setup training
initialize_training(config_file, accelerator)
if __name__ == "__main__":