import math import numpy as np from typing import List, Union, Tuple import paddle from paddle import nn from paddle.nn import functional as F from paddle.nn import initializer as I from parakeet.utils import checkpoint from parakeet.modules import geometry as geo __all__ = ["WaveFlow", "ConditionalWaveFlow", "WaveFlowLoss"] def fold(x, n_group): r"""Fold audio or spectrogram's temporal dimension in to groups. Parameters ---------- x : Tensor [shape=(\*, time_steps) The input tensor. n_group : int The size of a group. Returns --------- Tensor : [shape=(`*, time_steps // n_group, group)] Folded tensor. """ *spatial_shape, time_steps = x.shape new_shape = spatial_shape + [time_steps // n_group, n_group] return paddle.reshape(x, new_shape) class UpsampleNet(nn.LayerList): """Layer to upsample mel spectrogram to the same temporal resolution with the corresponding waveform. It consists of several conv2dtranspose layers which perform deconvolution on mel and time dimension. Parameters ---------- upscale_factors : List[int], optional Time upsampling factors for each Conv2DTranspose Layer. The ``UpsampleNet`` contains ``len(upscale_factor)`` Conv2DTranspose Layers. Each upscale_factor is used as the ``stride`` for the corresponding Conv2DTranspose. Defaults to [16, 16], this the default upsampling factor is 256. Notes ------ ``np.prod(upscale_factors)`` should equals the ``hop_length`` of the stft transformation used to extract spectrogram features from audio. For example, ``16 * 16 = 256``, then the spectrogram extracted with a stft transformation whose ``hop_length`` equals 256 is suitable. See Also --------- ``librosa.core.stft`` """ def __init__(self, upsample_factors): super(UpsampleNet, self).__init__() for factor in upsample_factors: std = math.sqrt(1 / (3 * 2 * factor)) init = I.Uniform(-std, std) self.append( nn.utils.weight_norm( nn.Conv2DTranspose(1, 1, (3, 2 * factor), padding=(1, factor // 2), stride=(1, factor), weight_attr=init, bias_attr=init))) # upsample factors self.upsample_factor = np.prod(upsample_factors) self.upsample_factors = upsample_factors def forward(self, x, trim_conv_artifact=False): r"""Forward pass of the ``UpsampleNet``. Parameters ----------- x : Tensor [shape=(batch_size, input_channels, time_steps)] The input spectrogram. trim_conv_artifact : bool, optional Trim deconvolution artifact at each layer. Defaults to False. Returns -------- Tensor: [shape=(batch_size, input_channels, time_steps \* upsample_factor)] The upsampled spectrogram. Notes -------- If trim_conv_artifact is ``True``, the output time steps is less than ``time_steps \* upsample_factors``. """ x = paddle.unsqueeze(x, 1) #(B, C, T) -> (B, 1, C, T) for layer in self: x = layer(x) if trim_conv_artifact: time_cutoff = layer._kernel_size[1] - layer._stride[1] x = x[:, :, :, :-time_cutoff] x = F.leaky_relu(x, 0.4) x = paddle.squeeze(x, 1) # back to (B, C, T) return x #TODO write doc class ResidualBlock(nn.Layer): """ResidualBlock, the basic unit of ResidualNet used in WaveFlow. It has a conv2d layer, which has causal padding in height dimension and same paddign in width dimension. It also has projection for the condition and output. Parameters ---------- channels : int Feature size of the input. cond_channels : int Featuer size of the condition. kernel_size : Tuple[int] Kernel size of the Convolution2d applied to the input. dilations : int Dilations of the Convolution2d applied to the input. """ def __init__(self, channels, cond_channels, kernel_size, dilations): super(ResidualBlock, self).__init__() # input conv std = math.sqrt(1 / channels * np.prod(kernel_size)) init = I.Uniform(-std, std) receptive_field = [1 + (k - 1) * d for (k, d) in zip(kernel_size, dilations)] rh, rw = receptive_field paddings = [rh - 1, 0, rw // 2, (rw - 1) // 2] # causal & same conv = nn.Conv2D(channels, 2 * channels, kernel_size, padding=paddings, dilation=dilations, weight_attr=init, bias_attr=init) self.conv = nn.utils.weight_norm(conv) self.rh = rh self.rw = rw self.dilations = dilations # condition projection std = math.sqrt(1 / cond_channels) init = I.Uniform(-std, std) condition_proj = nn.Conv2D(cond_channels, 2 * channels, (1, 1), weight_attr=init, bias_attr=init) self.condition_proj = nn.utils.weight_norm(condition_proj) # parametric residual & skip connection std = math.sqrt(1 / channels) init = I.Uniform(-std, std) out_proj = nn.Conv2D(channels, 2 * channels, (1, 1), weight_attr=init, bias_attr=init) self.out_proj = nn.utils.weight_norm(out_proj) def forward(self, x, condition): """Compute output for a whole folded sequence. Parameters ---------- x : Tensor [shape=(batch_size, channel, height, width)] The input. condition : Tensor [shape=(batch_size, condition_channel, height, width)] The local condition. Returns ------- res : Tensor [shape=(batch_size, channel, height, width)] The residual output. skip : Tensor [shape=(batch_size, channel, height, width)] The skip output. """ x_in = x x = self.conv(x) x += self.condition_proj(condition) content, gate = paddle.chunk(x, 2, axis=1) x = paddle.tanh(content) * F.sigmoid(gate) x = self.out_proj(x) res, skip = paddle.chunk(x, 2, axis=1) res = x_in + res return res, skip def start_sequence(self): """Prepare the layer for incremental computation of causal convolution. Reset the buffer for causal convolution. Raises: ValueError: If not in evaluation mode. """ if self.training: raise ValueError("Only use start sequence at evaluation mode.") self._conv_buffer = None # NOTE: call self.conv's weight norm hook expliccitly since # its weight will be visited directly in `add_input` without # calling its `__call__` method. If we do not trigger the weight # norm hook, the weight may be outdated. e.g. after loading from # a saved checkpoint # see also: https://github.com/pytorch/pytorch/issues/47588 for hook in self.conv._forward_pre_hooks.values(): hook(self.conv, None) def add_input(self, x_row, condition_row): """Compute the output for a row and update the buffer. Parameters ---------- x_row : Tensor [shape=(batch_size, channel, 1, width)] A row of the input. condition_row : Tensor [shape=(batch_size, condition_channel, 1, width)] A row of the condition. Returns ------- res : Tensor [shape=(batch_size, channel, 1, width)] A row of the the residual output. res : Tensor [shape=(batch_size, channel, 1, width)] A row of the skip output. """ x_row_in = x_row if self._conv_buffer is None: self._init_buffer(x_row) self._update_buffer(x_row) rw = self.rw x_row = F.conv2d( self._conv_buffer, self.conv.weight, self.conv.bias, padding=[0, 0, rw // 2, (rw - 1) // 2], dilation=self.dilations) x_row += self.condition_proj(condition_row) content, gate = paddle.chunk(x_row, 2, axis=1) x_row = paddle.tanh(content) * F.sigmoid(gate) x_row = self.out_proj(x_row) res, skip = paddle.chunk(x_row, 2, axis=1) res = x_row_in + res return res, skip def _init_buffer(self, input): batch_size, channels, _, width = input.shape self._conv_buffer = paddle.zeros( [batch_size, channels, self.rh, width], dtype=input.dtype) def _update_buffer(self, input): self._conv_buffer = paddle.concat( [self._conv_buffer[:, :, 1:, :], input], axis=2) class ResidualNet(nn.LayerList): """A stack of several ResidualBlocks. It merges condition at each layer. Parameters ---------- n_layer : int Number of ResidualBlocks in the ResidualNet. residual_channels : int Feature size of each ResidualBlocks. condition_channels : int Feature size of the condition. kernel_size : Tuple[int] Kernel size of each ResidualBlock. dilations_h : List[int] Dilation in height dimension of every ResidualBlock. Raises ------ ValueError If the length of dilations_h does not equals n_layers. """ def __init__(self, n_layer: int, residual_channels: int, condition_channels: int, kernel_size: Tuple[int], dilations_h: List[int]): if len(dilations_h) != n_layer: raise ValueError("number of dilations_h should equals num of layers") super(ResidualNet, self).__init__() for i in range(n_layer): dilation = (dilations_h[i], 2 ** i) layer = ResidualBlock(residual_channels, condition_channels, kernel_size, dilation) self.append(layer) def forward(self, x, condition): """Comput the output of given the input and the condition. Parameters ----------- x : Tensor [shape=(batch_size, channel, height, width)] The input. condition : Tensor [shape=(batch_size, condition_channel, height, width)] The local condition. Returns -------- Tensor : [shape=(batch_size, channel, height, width)] The output, which is an aggregation of all the skip outputs. """ skip_connections = [] for layer in self: x, skip = layer(x, condition) skip_connections.append(skip) out = paddle.sum(paddle.stack(skip_connections, 0), 0) return out def start_sequence(self): """Prepare the layer for incremental computation. """ for layer in self: layer.start_sequence() def add_input(self, x_row, condition_row): """Compute the output for a row and update the buffers. Parameters ---------- x_row : Tensor [shape=(batch_size, channel, 1, width)] A row of the input. condition_row : Tensor [shape=(batch_size, condition_channel, 1, width)] A row of the condition. Returns ------- res : Tensor [shape=(batch_size, channel, 1, width)] A row of the the residual output. res : Tensor [shape=(batch_size, channel, 1, width)] A row of the skip output. """ skip_connections = [] for layer in self: x_row, skip = layer.add_input(x_row, condition_row) skip_connections.append(skip) out = paddle.sum(paddle.stack(skip_connections, 0), 0) return out class Flow(nn.Layer): """A bijection (Reversable layer) that transform a density of latent variables p(Z) into a complex data distribution p(X). It's an auto regressive flow. The `forward` method implements the probability density estimation. The `inverse` method implements the sampling. Parameters ---------- n_layers : int Number of ResidualBlocks in the Flow. channels : int Feature size of the ResidualBlocks. mel_bands : int Feature size of the mel spectrogram (mel bands). kernel_size : Tuple[int] Kernel size of each ResisualBlocks in the Flow. n_group : int Number of timesteps to the folded into a group. """ dilations_dict = { 8: [1, 1, 1, 1, 1, 1, 1, 1], 16: [1, 1, 1, 1, 1, 1, 1, 1], 32: [1, 2, 4, 1, 2, 4, 1, 2], 64: [1, 2, 4, 8, 16, 1, 2, 4], 128: [1, 2, 4, 8, 16, 32, 64, 1] } def __init__(self, n_layers, channels, mel_bands, kernel_size, n_group): super(Flow, self).__init__() # input projection self.input_proj = nn.utils.weight_norm( nn.Conv2D(1, channels, (1, 1), weight_attr=I.Uniform(-1., 1.), bias_attr=I.Uniform(-1., 1.))) # residual net self.resnet = ResidualNet(n_layers, channels, mel_bands, kernel_size, self.dilations_dict[n_group]) # output projection self.output_proj = nn.Conv2D(channels, 2, (1, 1), weight_attr=I.Constant(0.), bias_attr=I.Constant(0.)) # specs self.n_group = n_group def _predict_parameters(self, x, condition): x = self.input_proj(x) x = self.resnet(x, condition) bijection_params = self.output_proj(x) logs, b = paddle.chunk(bijection_params, 2, axis=1) return logs, b def _transform(self, x, logs, b): z_0 = x[:, :, :1, :] # the first row, just copy it z_out = x[:, :, 1:, :] * paddle.exp(logs) + b z_out = paddle.concat([z_0, z_out], axis=2) return z_out def forward(self, x, condition): """Probability density estimation. It is done by inversely transform a sample from p(X) into a sample from p(Z). Parameters ----------- x : Tensor [shape=(batch, 1, height, width)] A input sample of the distribution p(X). condition : Tensor [shape=(batch, condition_channel, height, width)] The local condition. Returns -------- z (Tensor): shape(batch, 1, height, width), the transformed sample. Tuple[Tensor, Tensor] The parameter of the transformation. logs (Tensor): shape(batch, 1, height - 1, width), the log scale of the transformation from x to z. b (Tensor): shape(batch, 1, height - 1, width), the shift of the transformation from x to z. """ # (B, C, H-1, W) logs, b = self._predict_parameters( x[:, :, :-1, :], condition[:, :, 1:, :]) z = self._transform(x, logs, b) return z, (logs, b) def _predict_row_parameters(self, x_row, condition_row): x_row = self.input_proj(x_row) x_row = self.resnet.add_input(x_row, condition_row) bijection_params = self.output_proj(x_row) logs, b = paddle.chunk(bijection_params, 2, axis=1) return logs, b def _inverse_transform_row(self, z_row, logs, b): x_row = (z_row - b) * paddle.exp(-logs) return x_row def _inverse_row(self, z_row, x_row, condition_row): logs, b = self._predict_row_parameters(x_row, condition_row) x_next_row = self._inverse_transform_row(z_row, logs, b) return x_next_row, (logs, b) def _start_sequence(self): self.resnet.start_sequence() def inverse(self, z, condition): """Sampling from the the distrition p(X). It is done by sample form p(Z) and transform the sample. It is a auto regressive transformation. Parameters ----------- z : Tensor [shape=(batch, 1, height, width)] A sample of the distribution p(Z). condition : Tensor [shape=(batch, condition_channel, height, width)] The local condition. Returns --------- x : Tensor [shape=(batch, 1, height, width)] The transformed sample. Tuple[Tensor, Tensor] The parameter of the transformation. logs (Tensor): shape(batch, 1, height - 1, width), the log scale of the transformation from x to z. b (Tensor): shape(batch, 1, height - 1, width), the shift of the transformation from x to z. """ z_0 = z[:, :, :1, :] x = [] logs_list = [] b_list = [] x.append(z_0) self._start_sequence() for i in range(1, self.n_group): x_row = x[-1] # actuallt i-1:i z_row = z[:, :, i:i+1, :] condition_row = condition[:, :, i:i+1, :] x_next_row, (logs, b) = self._inverse_row(z_row, x_row, condition_row) x.append(x_next_row) logs_list.append(logs) b_list.append(b) x = paddle.concat(x, 2) logs = paddle.concat(logs_list, 2) b = paddle.concat(b_list, 2) return x, (logs, b) class WaveFlow(nn.LayerList): """An Deep Reversible layer that is composed of severel auto regressive flows. Parameters ----------- n_flows : int Number of flows in the WaveFlow model. n_layers : int Number of ResidualBlocks in each Flow. n_group : int Number of timesteps to fold as a group. channels : int Feature size of each ResidualBlock. mel_bands : int Feature size of mel spectrogram (mel bands). kernel_size : Union[int, List[int]] Kernel size of the convolution layer in each ResidualBlock. """ def __init__(self, n_flows, n_layers, n_group, channels, mel_bands, kernel_size): if n_group % 2 or n_flows % 2: raise ValueError("number of flows and number of group must be even " "since a permutation along group among flows is used.") super(WaveFlow, self).__init__() for _ in range(n_flows): self.append(Flow(n_layers, channels, mel_bands, kernel_size, n_group)) # permutations in h self.perms = self._create_perm(n_group, n_flows) # specs self.n_group = n_group self.n_flows = n_flows def _create_perm(self, n_group, n_flows): indices = list(range(n_group)) half = n_group // 2 perms = [] for i in range(n_flows): if i < n_flows // 2: perms.append(indices[::-1]) else: perm = list(reversed(indices[:half])) + list(reversed(indices[half:])) perms.append(perm) return perms def _trim(self, x, condition): assert condition.shape[-1] >= x.shape[-1] pruned_len = int(x.shape[-1] // self.n_group * self.n_group) if x.shape[-1] > pruned_len: x = x[:, :pruned_len] if condition.shape[-1] > pruned_len: condition = condition[:, :, :pruned_len] return x, condition def forward(self, x, condition): """Probability density estimation of random variable x given the condition. Parameters ----------- x : Tensor [shape=(batch_size, time_steps)] The audio. condition : Tensor [shape=(batch_size, condition channel, time_steps)] The local condition (mel spectrogram here). Returns -------- z : Tensor [shape=(batch_size, time_steps)] The transformed random variable. log_det_jacobian: Tensor [shape=(1,)] The log determinant of the jacobian of the transformation from x to z. """ # x: (B, T) # condition: (B, C, T) upsampled condition x, condition = self._trim(x, condition) # to (B, C, h, T//h) layout x = paddle.unsqueeze(paddle.transpose(fold(x, self.n_group), [0, 2, 1]), 1) condition = paddle.transpose(fold(condition, self.n_group), [0, 1, 3, 2]) # flows logs_list = [] for i, layer in enumerate(self): x, (logs, b) = layer(x, condition) logs_list.append(logs) # permute paddle has no shuffle dim x = geo.shuffle_dim(x, 2, perm=self.perms[i]) condition = geo.shuffle_dim(condition, 2, perm=self.perms[i]) z = paddle.squeeze(x, 1) # (B, H, W) batch_size = z.shape[0] z = paddle.reshape(paddle.transpose(z, [0, 2, 1]), [batch_size, -1]) log_det_jacobian = paddle.sum(paddle.stack(logs_list)) return z, log_det_jacobian def inverse(self, z, condition): """Sampling from the the distrition p(X). It is done by sample a ``z`` form p(Z) and transform it into ``x``. Each Flow transform .. math:: `z_{i-1}` to .. math:: `z_{i}` in an autoregressive manner. Parameters ---------- z : Tensor [shape=(batch, 1, time_steps] A sample of the distribution p(Z). condition : Tensor [shape=(batch, condition_channel, time_steps)] The local condition. Returns -------- x : Tensor [shape=(batch_size, time_steps)] The transformed sample (audio here). """ z, condition = self._trim(z, condition) # to (B, C, h, T//h) layout z = paddle.unsqueeze(paddle.transpose(fold(z, self.n_group), [0, 2, 1]), 1) condition = paddle.transpose(fold(condition, self.n_group), [0, 1, 3, 2]) # reverse it flow by flow for i in reversed(range(self.n_flows)): z = geo.shuffle_dim(z, 2, perm=self.perms[i]) condition = geo.shuffle_dim(condition, 2, perm=self.perms[i]) z, (logs, b) = self[i].inverse(z, condition) x = paddle.squeeze(z, 1) # (B, H, W) batch_size = x.shape[0] x = paddle.reshape(paddle.transpose(x, [0, 2, 1]), [batch_size, -1]) return x class ConditionalWaveFlow(nn.LayerList): """ConditionalWaveFlow, a UpsampleNet with a WaveFlow model. Parameters ---------- upsample_factors : List[int] Upsample factors for the upsample net. n_flows : int Number of flows in the WaveFlow model. n_layers : int Number of ResidualBlocks in each Flow. n_group : int Number of timesteps to fold as a group. channels : int Feature size of each ResidualBlock. n_mels : int Feature size of mel spectrogram (mel bands). kernel_size : Union[int, List[int]] Kernel size of the convolution layer in each ResidualBlock. """ def __init__(self, upsample_factors: List[int], n_flows: int, n_layers: int, n_group: int, channels: int, n_mels: int, kernel_size: Union[int, List[int]]): super(ConditionalWaveFlow, self).__init__() self.encoder = UpsampleNet(upsample_factors) self.decoder = WaveFlow( n_flows=n_flows, n_layers=n_layers, n_group=n_group, channels=channels, mel_bands=n_mels, kernel_size=kernel_size) def forward(self, audio, mel): """Compute the transformed random variable z (x to z) and the log of the determinant of the jacobian of the transformation from x to z. Parameters ---------- audio : Tensor [shape=(B, T)] The audio. mel : Tensor [shape=(B, C_mel, T_mel)] The mel spectrogram. Returns ------- z : Tensor [shape=(B, T)] The inversely transformed random variable z (x to z) log_det_jacobian: Tensor [shape=(1,)] the log of the determinant of the jacobian of the transformation from x to z. """ condition = self.encoder(mel) z, log_det_jacobian = self.decoder(audio, condition) return z, log_det_jacobian @paddle.no_grad() def infer(self, mel): r"""Generate raw audio given mel spectrogram. Parameters ---------- mel : Tensor [shape=(B, C_mel, T_mel)] Mel spectrogram (in log-magnitude). Returns ------- Tensor : [shape=(B, T)] The synthesized audio, where``T <= T_mel \* upsample_factors``. """ condition = self.encoder(mel, trim_conv_artifact=True) #(B, C, T) batch_size, _, time_steps = condition.shape z = paddle.randn([batch_size, time_steps], dtype=mel.dtype) x = self.decoder.inverse(z, condition) return x @paddle.no_grad() def predict(self, mel): """Generate raw audio given mel spectrogram. Parameters ---------- mel : np.ndarray [shape=(C_mel, T_mel)] Mel spectrogram of an utterance(in log-magnitude). Returns ------- np.ndarray [shape=(T,)] The synthesized audio. """ mel = paddle.to_tensor(mel) mel = paddle.unsqueeze(mel, 0) audio = self.infer(mel) audio = audio[0].numpy() return audio @classmethod def from_pretrained(cls, config, checkpoint_path): """Build a ConditionalWaveFlow model from a pretrained model. Parameters ---------- config: yacs.config.CfgNode model configs checkpoint_path: Path or str the path of pretrained model checkpoint, without extension name Returns ------- ConditionalWaveFlow The model built from pretrained result. """ model = cls( upsample_factors=config.model.upsample_factors, n_flows=config.model.n_flows, n_layers=config.model.n_layers, n_group=config.model.n_group, channels=config.model.channels, n_mels=config.data.n_mels, kernel_size=config.model.kernel_size) checkpoint.load_parameters(model, checkpoint_path=checkpoint_path) return model class WaveFlowLoss(nn.Layer): """Criterion of a WaveFlow model. Parameters ---------- sigma : float The standard deviation of the gaussian noise used in WaveFlow, by default 1.0. """ def __init__(self, sigma=1.0): super(WaveFlowLoss, self).__init__() self.sigma = sigma self.const = 0.5 * np.log(2 * np.pi) + np.log(self.sigma) def forward(self, z, log_det_jacobian): """Compute the loss given the transformed random variable z and the log_det_jacobian of transformation from x to z. Parameters ---------- z : Tensor [shape=(B, T)] The transformed random variable (x to z). log_det_jacobian : Tensor [shape=(1,)] The log of the determinant of the jacobian matrix of the transformation from x to z. Returns ------- Tensor [shape=(1,)] The loss. """ loss = paddle.sum(z * z) / (2 * self.sigma * self.sigma) - log_det_jacobian loss = loss / np.prod(z.shape) return loss + self.const