import torch import torch.nn as nn from packaging import version OPENAIUNETWRAPPER = "sgm.modules.diffusionmodules.wrappers.OpenAIWrapper" class IdentityWrapper(nn.Module): def __init__(self, diffusion_model, compile_model: bool = False, dual_concat: bool = False): super().__init__() compile = ( torch.compile if (version.parse(torch.__version__) >= version.parse("2.0.0")) and compile_model else lambda x: x ) self.diffusion_model = compile(diffusion_model) self.dual_concat = dual_concat def forward(self, *args, **kwargs): return self.diffusion_model(*args, **kwargs) class OpenAIWrapper(IdentityWrapper): def forward( self, x: torch.Tensor, t: torch.Tensor, c: dict, **kwargs ) -> torch.Tensor: if self.dual_concat: x_1 = x[:, : x.shape[1] // 2] x_2 = x[:, x.shape[1] // 2 :] x_1 = torch.cat((x_1, c.get("concat", torch.Tensor([]).type_as(x_1))), dim=1) x_2 = torch.cat((x_2, c.get("concat", torch.Tensor([]).type_as(x_2))), dim=1) x = torch.cat((x_1, x_2), dim=1) else: x = torch.cat((x, c.get("concat", torch.Tensor([]).type_as(x))), dim=1) if "cond_view" in c: return self.diffusion_model( x, timesteps=t, context=c.get("crossattn", None), y=c.get("vector", None), cond_view=c.get("cond_view", None), cond_motion=c.get("cond_motion", None), **kwargs, ) else: return self.diffusion_model( x, timesteps=t, context=c.get("crossattn", None), y=c.get("vector", None), **kwargs, )