ParakeetRebeccaRosario/parakeet/models/fastspeech/modules.py

149 lines
6.2 KiB
Python

import numpy as np
import math
import utils
import paddle.fluid.dygraph as dg
import paddle.fluid.layers as layers
import paddle.fluid as fluid
from parakeet.modules.layers import Conv, Linear
from parakeet.modules.multihead_attention import MultiheadAttention
from parakeet.modules.feed_forward import PositionwiseFeedForward
class FFTBlock(dg.Layer):
def __init__(self, d_model, d_inner, n_head, d_k, d_v, filter_size, padding, dropout=0.2):
super(FFTBlock, self).__init__()
self.slf_attn = MultiheadAttention(d_model, d_k, d_v, num_head=n_head, dropout=dropout)
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner, filter_size =filter_size, padding =padding, dropout=dropout)
def forward(self, enc_input, non_pad_mask=None, slf_attn_mask=None):
"""
Feed Forward Transformer block in FastSpeech.
Args:
enc_input (Variable): Shape(B, T, C), dtype: float32. The embedding characters input.
T means the timesteps of input.
non_pad_mask (Variable): Shape(B, T, 1), dtype: int64. The mask of sequence.
slf_attn_mask (Variable): Shape(B, len_q, len_k), dtype: int64. The mask of self attention.
len_q means the sequence length of query, len_k means the sequence length of key.
Returns:
output (Variable), Shape(B, T, C), the output after self-attention & ffn.
slf_attn (Variable), Shape(B * n_head, T, T), the self attention.
"""
output, slf_attn = self.slf_attn(enc_input, enc_input, enc_input, mask=slf_attn_mask)
output *= non_pad_mask
output = self.pos_ffn(output)
output *= non_pad_mask
return output, slf_attn
class LengthRegulator(dg.Layer):
def __init__(self, input_size, out_channels, filter_size, dropout=0.1):
super(LengthRegulator, self).__init__()
self.duration_predictor = DurationPredictor(input_size=input_size,
out_channels=out_channels,
filter_size=filter_size,
dropout=dropout)
def LR(self, x, duration_predictor_output, alpha=1.0):
output = []
batch_size = x.shape[0]
for i in range(batch_size):
output.append(self.expand(x[i:i+1], duration_predictor_output[i:i+1], alpha))
output = self.pad(output)
return output
def pad(self, input_ele):
max_len = max([input_ele[i].shape[0] for i in range(len(input_ele))])
out_list = []
for i in range(len(input_ele)):
pad_len = max_len - input_ele[i].shape[0]
one_batch_padded = layers.pad(
input_ele[i], [0, pad_len, 0, 0], pad_value=0.0)
out_list.append(one_batch_padded)
out_padded = layers.stack(out_list)
return out_padded
def expand(self, batch, predicted, alpha):
out = []
time_steps = batch.shape[1]
fertilities = predicted.numpy()
batch = layers.squeeze(batch,[0])
for i in range(time_steps):
if fertilities[0,i]==0:
continue
out.append(layers.expand(batch[i: i + 1, :], [int(fertilities[0,i]), 1]))
out = layers.concat(out, axis=0)
return out
def forward(self, x, alpha=1.0, target=None):
"""
Length Regulator block in FastSpeech.
Args:
x (Variable): Shape(B, T, C), dtype: float32. The encoder output.
alpha (Constant): dtype: float32. The hyperparameter to determine the length of
the expanded sequence mel, thereby controlling the voice speed.
target (Variable): (Variable, optional): Shape(B, T_text),
dtype: int64. The duration of phoneme compute from pretrained transformerTTS.
Returns:
output (Variable), Shape(B, T, C), the output after exppand.
duration_predictor_output (Variable), Shape(B, T, C), the output of duration predictor.
"""
duration_predictor_output = self.duration_predictor(x)
if fluid.framework._dygraph_tracer()._train_mode:
output = self.LR(x, target)
return output, duration_predictor_output
else:
duration_predictor_output = layers.round(duration_predictor_output)
output = self.LR(x, duration_predictor_output, alpha)
mel_pos = dg.to_variable([i+1 for i in range(output.shape[1])])
return output, mel_pos
class DurationPredictor(dg.Layer):
def __init__(self, input_size, out_channels, filter_size, dropout=0.1):
super(DurationPredictor, self).__init__()
self.input_size = input_size
self.out_channels = out_channels
self.filter_size = filter_size
self.dropout = dropout
self.conv1 = Conv(in_channels = self.input_size,
out_channels = self.out_channels,
filter_size = self.filter_size,
padding=1,
data_format='NTC')
self.conv2 = Conv(in_channels = self.out_channels,
out_channels = self.out_channels,
filter_size = self.filter_size,
padding=1,
data_format='NTC')
self.layer_norm1 = dg.LayerNorm(self.out_channels)
self.layer_norm2 = dg.LayerNorm(self.out_channels)
self.linear =Linear(self.out_channels, 1)
def forward(self, encoder_output):
"""
Duration Predictor block in FastSpeech.
Args:
encoder_output (Variable): Shape(B, T, C), dtype: float32. The encoder output.
Returns:
out (Variable), Shape(B, T, C), the output of duration predictor.
"""
# encoder_output.shape(N, T, C)
out = layers.dropout(layers.relu(self.layer_norm1(self.conv1(encoder_output))), self.dropout)
out = layers.dropout(layers.relu(self.layer_norm2(self.conv2(out))), self.dropout)
out = layers.relu(self.linear(out))
out = layers.squeeze(out, axes=[-1])
return out