Merge pull request #44 from iclementine/reborn

hide models that are not updated yet
This commit is contained in:
Feiyu Chan 2020-12-03 19:06:06 +08:00 committed by GitHub
commit a6806389f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 2 additions and 1351 deletions

View File

@ -1,158 +0,0 @@
import paddle
from paddle import nn
from paddle.nn import functional as F
from paddle import distribution as D
from parakeet.models.wavenet import WaveNet, UpsampleNet, crop
__all__ = ["Clarinet"]
class ParallelWaveNet(nn.LayerList):
def __init__(self, n_loops, n_layers, residual_channels, condition_dim,
filter_size):
"""ParallelWaveNet, an inverse autoregressive flow model, it contains several flows(WaveNets).
Args:
n_loops (List[int]): `n_loop` for each flow.
n_layers (List[int]): `n_layer` for each flow.
residual_channels (int): `residual_channels` for every flow.
condition_dim (int): `condition_dim` for every flow.
filter_size (int): `filter_size` for every flow.
"""
super(ParallelWaveNet, self).__init__()
for n_loop, n_layer in zip(n_loops, n_layers):
# teacher's log_scale_min does not matter herem, -100 is a dummy value
self.append(
WaveNet(n_loop, n_layer, residual_channels, 3, condition_dim,
filter_size, "mog", -100.0))
def forward(self, z, condition=None):
"""Transform a random noise sampled from a standard Gaussian distribution into sample from the target distribution. And output the mean and log standard deviation of the output distribution.
Args:
z (Variable): shape(B, T), random noise sampled from a standard gaussian disribution.
condition (Variable, optional): shape(B, F, T), dtype float, the upsampled condition. Defaults to None.
Returns:
(z, out_mu, out_log_std)
z (Variable): shape(B, T), dtype float, transformed noise, it is the synthesized waveform.
out_mu (Variable): shape(B, T), dtype float, means of the output distributions.
out_log_std (Variable): shape(B, T), dtype float, log standard deviations of the output distributions.
"""
for i, flow in enumerate(self):
theta = flow(z, condition) # w, mu, log_std [0: T]
w, mu, log_std = paddle.chunk(theta, 3, axis=-1) # (B, T, 1) for each
mu = paddle.squeeze(mu, -1) #[0: T]
log_std = paddle.squeeze(log_std, -1) #[0: T]
z = z * paddle.exp(log_std) + mu #[0: T]
if i == 0:
out_mu = mu
out_log_std = log_std
else:
out_mu = out_mu * paddle.exp(log_std) + mu
out_log_std += log_std
return z, out_mu, out_log_std
# Gaussian IAF model
class Clarinet(nn.Layer):
def __init__(self, encoder, teacher, student, stft,
min_log_scale=-6.0, lmd=4.0):
"""Clarinet model. Conditional Parallel WaveNet.
Args:
encoder (UpsampleNet): an UpsampleNet to upsample mel spectrogram.
teacher (WaveNet): a WaveNet, the teacher.
student (ParallelWaveNet): a ParallelWaveNet model, the student.
stft (STFT): a STFT model to perform differentiable stft transform.
min_log_scale (float, optional): used only for computing loss, the minimal value of log standard deviation of the output distribution of both the teacher and the student . Defaults to -6.0.
lmd (float, optional): weight for stft loss. Defaults to 4.0.
"""
super(Clarinet, self).__init__()
self.encoder = encoder
self.teacher = teacher
self.student = student
self.stft = stft
self.lmd = lmd
self.min_log_scale = min_log_scale
def forward(self, audio, mel, audio_start, clip_kl=True):
"""Compute loss of Clarinet model.
Args:
audio (Variable): shape(B, T_audio), dtype flaot32, ground truth waveform.
mel (Variable): shape(B, F, T_mel), dtype flaot32, condition(mel spectrogram here).
audio_start (Variable): shape(B, ), dtype int64, audio starts positions.
clip_kl (bool, optional): whether to clip kl_loss by maximum=100. Defaults to True.
Returns:
Dict(str, Variable)
loss (Variable): shape(1, ), dtype flaot32, total loss.
kl (Variable): shape(1, ), dtype flaot32, kl divergence between the teacher's output distribution and student's output distribution.
regularization (Variable): shape(1, ), dtype flaot32, a regularization term of the KL divergence.
spectrogram_frame_loss (Variable): shape(1, ), dytpe: float, stft loss, the L1-distance of the magnitudes of the spectrograms of the ground truth waveform and synthesized waveform.
"""
batch_size, audio_length = audio.shape # audio clip's length
z = paddle.randn(audio.shape)
condition = self.encoder(mel) # (B, C, T)
condition_slice = crop(condition, audio_start, audio_length)
x, s_means, s_scales = self.student(z, condition_slice) # all [0: T]
s_means = s_means[:, 1:] # (B, T-1), time steps [1: T]
s_scales = s_scales[:, 1:] # (B, T-1), time steps [1: T]
s_clipped_scales = paddle.clip(s_scales, self.min_log_scale, 100.)
# teacher outputs single gaussian
y = self.teacher(x[:, :-1], condition_slice[:, :, 1:])
_, t_means, t_scales = paddle.chunk(y, 3, axis=-1) # time steps [1: T]
t_means = paddle.squeeze(t_means, [-1]) # (B, T-1), time steps [1: T]
t_scales = paddle.squeeze(t_scales, [-1]) # (B, T-1), time steps [1: T]
t_clipped_scales = paddle.clip(t_scales, self.min_log_scale, 100.)
s_distribution = D.Normal(s_means, paddle.exp(s_clipped_scales))
t_distribution = D.Normal(t_means, paddle.exp(t_clipped_scales))
# kl divergence loss, so we only need to sample once? no MC
kl = s_distribution.kl_divergence(t_distribution)
if clip_kl:
kl = paddle.clip(kl, -100., 10.)
# context size dropped
kl = paddle.reduce_mean(kl[:, self.teacher.context_size:])
# major diff here
regularization = F.mse_loss(t_scales[:, self.teacher.context_size:],
s_scales[:, self.teacher.context_size:])
# introduce information from real target
spectrogram_frame_loss = F.mse_loss(
self.stft.magnitude(audio), self.stft.magnitude(x))
loss = kl + self.lmd * regularization + spectrogram_frame_loss
loss_dict = {
"loss": loss,
"kl_divergence": kl,
"regularization": regularization,
"stft_loss": spectrogram_frame_loss
}
return loss_dict
@paddle.no_grad()
def synthesis(self, mel):
"""Synthesize waveform using the encoder and the student network.
Args:
mel (Variable): shape(B, F, T_mel), the condition(mel spectrogram here).
Returns:
Variable: shape(B, T_audio), the synthesized waveform. (T_audio = T_mel * upscale_factor, where upscale_factor is the `upscale_factor` of the encoder.)
"""
condition = self.encoder(mel)
samples_shape = (condition.shape[0], condition.shape[-1])
z = paddle.randn(samples_shape)
x, s_means, s_scales = self.student(z, condition)
return x
# TODO(chenfeiyu): ClariNetLoss

View File

@ -1,465 +0,0 @@
import math
import numpy as np
import paddle
from paddle import nn
from paddle.nn import functional as F
from paddle.nn import initializer as I
from parakeet.modules import positional_encoding as pe
__all__ = ["SpectraNet"]
class ConvBlock(nn.Layer):
def __init__(self, in_channel, kernel_size, causal=False, has_bias=False,
bias_dim=None, keep_prob=1.):
super(ConvBlock, self).__init__()
self.causal = causal
self.keep_prob = keep_prob
self.in_channel = in_channel
self.has_bias = has_bias
std = math.sqrt(4 * keep_prob / (kernel_size * in_channel))
padding = "valid" if causal else "same"
conv = nn.Conv1D(in_channel, 2 * in_channel, (kernel_size, ),
padding=padding,
data_format="NLC",
weight_attr=I.Normal(scale=std))
self.conv = nn.utils.weight_norm(conv)
if has_bias:
std = math.sqrt(1 / bias_dim)
self.bias_affine = nn.Linear(bias_dim, 2 * in_channel,
weight_attr=I.Normal(scale=std))
def forward(self, input, bias=None, padding=None):
"""
input: input feature (B, T, C)
padding: only used when using causal conv, we pad mannually
"""
input_dropped = F.dropout(input, 1. - self.keep_prob, training=self.training)
if self.causal:
assert padding is not None
input_dropped = paddle.concat([padding, input_dropped], axis=1)
hidden = self.conv(input_dropped)
if self.has_bias:
assert bias is not None
transformed_bias = F.softsign(self.bias_affine(bias))
hidden_embedded = hidden + paddle.unsqueeze(transformed_bias, 1)
else:
hidden_embedded = hidden
# glu
content, gate = paddle.chunk(hidden, 2, axis=-1)
content = hidden_embedded[:, :, :self.in_channel]
hidden = F.sigmoid(gate) * content
# # residual
hidden = paddle.scale(input + hidden, math.sqrt(0.5))
return hidden
class AffineBlock1(nn.Layer):
def __init__(self, in_channel, out_channel, has_bias=False, bias_dim=0):
super(AffineBlock1, self).__init__()
std = math.sqrt(1.0 / in_channel)
affine = nn.Linear(in_channel, out_channel, weight_attr=I.Normal(scale=std))
self.affine = nn.utils.weight_norm(affine, dim=-1)
if has_bias:
std = math.sqrt(1 / bias_dim)
self.bias_affine = nn.Linear(bias_dim, out_channel,
weight_attr=I.Normal(scale=std))
self.has_bias = has_bias
self.bias_dim = bias_dim
def forward(self, input, bias=None):
"""
input -> (affine + weight_norm) ->hidden
bias -> (affine) -> softsign -> transformed_bis
hidden += transformed_bias
"""
hidden = self.affine(input)
if self.has_bias:
assert bias is not None
transformed_bias = F.softsign(self.bias_affine(bias))
hidden += paddle.unsqueeze(transformed_bias, 1)
return hidden
class AffineBlock2(nn.Layer):
def __init__(self, in_channel, out_channel,
has_bias=False, bias_dim=0, dropout=False, keep_prob=1.):
super(AffineBlock2, self).__init__()
if has_bias:
std = math.sqrt(1 / bias_dim)
self.bias_affine = nn.Linear(bias_dim, in_channel, weight_attr=I.Normal(scale=std))
std = math.sqrt(1.0 / in_channel)
affine = nn.Linear(in_channel, out_channel, weight_attr=I.Normal(scale=std))
self.affine = nn.utils.weight_norm(affine, dim=-1)
self.has_bias = has_bias
self.bias_dim = bias_dim
self.dropout = dropout
self.keep_prob = keep_prob
def forward(self, input, bias=None):
"""
input -> (dropout) ->hidden
bias -> (affine) -> softsign -> transformed_bis
hidden += transformed_bias
hidden -> (affine + weight_norm) -> relu -> hidden
"""
hidden = input
if self.dropout:
hidden = F.dropout(hidden, 1. - self.keep_prob, training=self.training)
if self.has_bias:
assert bias is not None
transformed_bias = F.softsign(self.bias_affine(bias))
hidden += paddle.unsqueeze(transformed_bias, 1)
hidden = F.relu(self.affine(hidden))
return hidden
class Encoder(nn.Layer):
def __init__(self, layers, in_channels, encoder_dim, kernel_size,
has_bias=False, bias_dim=0, keep_prob=1.):
super(Encoder, self).__init__()
self.pre_affine = AffineBlock1(in_channels, encoder_dim, has_bias, bias_dim)
self.convs = nn.LayerList([
ConvBlock(encoder_dim, kernel_size, False, has_bias, bias_dim, keep_prob) \
for _ in range(layers)])
self.post_affine = AffineBlock1(encoder_dim, in_channels, has_bias, bias_dim)
def forward(self, char_embed, speaker_embed=None):
hidden = self.pre_affine(char_embed, speaker_embed)
for layer in self.convs:
hidden = layer(hidden, speaker_embed)
hidden = self.post_affine(hidden, speaker_embed)
keys = hidden
values = paddle.scale(char_embed + hidden, math.sqrt(0.5))
return keys, values
class AttentionBlock(nn.Layer):
def __init__(self, attention_dim, input_dim, position_encoding_weight=1.,
position_rate=1., reduction_factor=1, has_bias=False, bias_dim=0,
keep_prob=1.):
super(AttentionBlock, self).__init__()
# positional encoding
omega_default = position_rate / reduction_factor
self.omega_default = omega_default
# multispeaker case
if has_bias:
std = math.sqrt(1.0 / bias_dim)
self.q_pos_affine = nn.Linear(bias_dim, 1, weight_attr=I.Normal(scale=std))
self.k_pos_affine = nn.Linear(bias_dim, 1, weight_attr=I.Normal(scale=std))
self.omega_initial = self.create_parameter(shape=[1],
attr=I.Constant(value=omega_default))
# mind the fact that q, k, v have the same feature dimension
# so we can init k_affine and q_affine's weight as the same matrix
# to get a better init attention
dtype = self.omega_initial.numpy().dtype
init_weight = np.random.normal(size=(input_dim, attention_dim),
scale=np.sqrt(1. / input_dim)).astype(dtype)
# TODO(chenfeiyu): to report an issue, there is no such initializer
#initializer = paddle.fluid.initializer.NumpyArrayInitializer(init_weight)
# 3 affine transformation to project q, k, v into attention_dim
q_affine = nn.Linear(input_dim, attention_dim)
self.q_affine = nn.utils.weight_norm(q_affine, dim=-1)
k_affine = nn.Linear(input_dim, attention_dim)
self.k_affine = nn.utils.weight_norm(k_affine, dim=-1)
# better to use this, since NumpyInitializer does not support float64
self.q_affine.weight.set_value(init_weight)
self.k_affine.weight.set_value(init_weight)
std = np.sqrt(1.0 / input_dim)
v_affine = nn.Linear(input_dim, attention_dim, weight_attr=I.Normal(scale=std))
self.v_affine = nn.utils.weight_norm(v_affine, dim=-1)
std = np.sqrt(1.0 / attention_dim)
out_affine = nn.Linear(attention_dim, input_dim, weight_attr=I.Normal(scale=std))
self.out_affine = nn.utils.weight_norm(out_affine, dim=-1)
self.keep_prob = keep_prob
self.has_bias = has_bias
self.bias_dim = bias_dim
self.attention_dim = attention_dim
self.position_encoding_weight = position_encoding_weight
def forward(self, q, k, v, lengths, speaker_embed, start_index,
force_monotonic=False, prev_coeffs=None, window=None):
dtype = self.omega_initial.dtype
# add position encoding as an inductive bias
if self.has_bias: # multi-speaker model
omega_q = 2 * F.sigmoid(
paddle.squeeze(self.q_pos_affine(speaker_embed), -1))
omega_k = 2 * self.omega_initial * F.sigmoid(paddle.squeeze(
self.k_pos_affine(speaker_embed), -1))
else: # single-speaker case
batch_size = q.shape[0]
omega_q = paddle.ones((batch_size, ), dtype=dtype)
omega_k = paddle.ones((batch_size, ), dtype=dtype) * self.omega_default
q += self.position_encoding_weight * pe.scalable_positional_encoding(start_index, q.shape[1], q.shape[-1], omega_q)
k += self.position_encoding_weight * pe.scalable_positional_encoding(0, k.shape[1], k.shape[-1], omega_k)
q, k, v = self.q_affine(q), self.k_affine(k), self.v_affine(v)
activations = paddle.matmul(q, k, transpose_y=True)
activations /= math.sqrt(self.attention_dim)
if self.training:
# mask the <pad> parts from the encoder
mask = paddle.fluid.layers.sequence_mask(lengths, dtype=dtype)
attn_bias = paddle.scale(1. - mask, -1000)
activations += paddle.unsqueeze(attn_bias, 1)
elif force_monotonic:
assert window is not None
backward_step, forward_step = window
T_enc = k.shape[1]
batch_size, T_dec, _ = q.shape
# actually T_dec = 1 here
alpha = paddle.fill_constant((batch_size, T_dec), value=0, dtype="int64") \
if prev_coeffs is None \
else paddle.argmax(prev_coeffs, axis=-1)
backward = paddle.fluid.layers.sequence_mask(alpha - backward_step, maxlen=T_enc, dtype="bool")
forward = paddle.fluid.layers.sequence_mask(alpha + forward_step, maxlen=T_enc, dtype="bool")
mask = paddle.cast(paddle.logical_xor(backward, forward), activations.dtype)
# print("mask's shape:", mask.shape)
attn_bias = paddle.scale(1. - mask, -1000)
activations += attn_bias
# softmax
coefficients = F.softmax(activations, axis=-1)
# context vector
coefficients = F.dropout(coefficients, 1. - self.keep_prob, training=self.training)
contexts = paddle.matmul(coefficients, v)
# context normalization
enc_lengths = paddle.cast(paddle.unsqueeze(lengths, axis=[1, 2]), contexts.dtype)
contexts *= paddle.sqrt(enc_lengths)
# out affine
contexts = self.out_affine(contexts)
return contexts, coefficients
class Decoder(nn.Layer):
def __init__(self, in_channels, reduction_factor, prenet_sizes,
layers, kernel_size, attention_dim,
position_encoding_weight=1., omega=1.,
has_bias=False, bias_dim=0, keep_prob=1.):
super(Decoder, self).__init__()
# prenet-mind the difference of AffineBlock2 and AffineBlock1
c_in = in_channels
self.prenet = nn.LayerList()
for i, c_out in enumerate(prenet_sizes):
affine = AffineBlock2(c_in, c_out, has_bias, bias_dim, dropout=(i!=0), keep_prob=keep_prob)
self.prenet.append(affine)
c_in = c_out
# causal convolutions + multihop attention
decoder_dim = prenet_sizes[-1]
self.causal_convs = nn.LayerList()
self.attention_blocks = nn.LayerList()
for i in range(layers):
conv = ConvBlock(decoder_dim, kernel_size, True, has_bias, bias_dim, keep_prob)
attn = AttentionBlock(attention_dim, decoder_dim, position_encoding_weight, omega, reduction_factor, has_bias, bias_dim, keep_prob)
self.causal_convs.append(conv)
self.attention_blocks.append(attn)
# output mel spectrogram
output_dim = reduction_factor * in_channels # r * mel_dim
std = math.sqrt(1.0 / decoder_dim)
out_affine = nn.Linear(decoder_dim, output_dim, weight_attr=I.Normal(scale=std))
self.out_affine = nn.utils.weight_norm(out_affine, dim=-1)
if has_bias:
std = math.sqrt(1 / bias_dim)
self.out_sp_affine = nn.Linear(bias_dim, output_dim, weight_attr=I.Normal(scale=std))
self.has_bias = has_bias
self.kernel_size = kernel_size
self.in_channels = in_channels
self.decoder_dim = decoder_dim
self.reduction_factor = reduction_factor
self.out_channels = output_dim
def forward(self, inputs, keys, values, lengths, start_index, speaker_embed=None,
state=None, force_monotonic_attention=None, coeffs=None, window=(0, 4)):
hidden = inputs
for layer in self.prenet:
hidden = layer(hidden, speaker_embed)
attentions = [] # every layer of (B, T_dec, T_enc) attention
final_state = [] # layers * (B, (k-1)d, C_dec)
batch_size = inputs.shape[0]
causal_padding_shape = (batch_size, self.kernel_size - 1, self.decoder_dim)
for i in range(len(self.causal_convs)):
if state is None:
padding = paddle.zeros(causal_padding_shape, dtype=inputs.dtype)
else:
padding = state[i]
new_state = paddle.concat([padding, hidden], axis=1) # => to be used next step
# causal conv, (B, T, C)
hidden = self.causal_convs[i](hidden, speaker_embed, padding=padding)
# attn
prev_coeffs = None if coeffs is None else coeffs[i]
force_monotonic = False if force_monotonic_attention is None else force_monotonic_attention[i]
context, attention = self.attention_blocks[i](
hidden, keys, values, lengths, speaker_embed,
start_index, force_monotonic, prev_coeffs, window)
# residual connextion (B, T_dec, C_dec)
hidden = paddle.scale(hidden + context, math.sqrt(0.5))
attentions.append(attention) # layers * (B, T_dec, T_enc)
# new state: shift a step, layers * (B, T, C)
new_state = new_state[:, -(self.kernel_size - 1):, :]
final_state.append(new_state)
# predict mel spectrogram (B, 1, T_dec, r * C_in)
decoded = self.out_affine(hidden)
if self.has_bias:
decoded *= F.sigmoid(paddle.unsqueeze(self.out_sp_affine(speaker_embed), 1))
return decoded, hidden, attentions, final_state
class PostNet(nn.Layer):
def __init__(self, layers, in_channels, postnet_dim, kernel_size, out_channels, upsample_factor, has_bias=False, bias_dim=0, keep_prob=1.):
super(PostNet, self).__init__()
self.pre_affine = AffineBlock1(in_channels, postnet_dim, has_bias, bias_dim)
self.convs = nn.LayerList([
ConvBlock(postnet_dim, kernel_size, False, has_bias, bias_dim, keep_prob) for _ in range(layers)
])
std = math.sqrt(1.0 / postnet_dim)
post_affine = nn.Linear(postnet_dim, out_channels, weight_attr=I.Normal(scale=std))
self.post_affine = nn.utils.weight_norm(post_affine, dim=-1)
self.upsample_factor = upsample_factor
def forward(self, hidden, speaker_embed=None):
hidden = self.pre_affine(hidden, speaker_embed)
batch_size, time_steps, channels = hidden.shape # pylint: disable=unused-variable
# NOTE: paddle.expand can only expand dimension whose size is 1
hidden = paddle.expand(paddle.unsqueeze(hidden, 2), [-1, -1, self.upsample_factor, -1])
hidden = paddle.reshape(hidden, [batch_size, -1, channels])
for layer in self.convs:
hidden = layer(hidden, speaker_embed)
spec = self.post_affine(hidden)
return spec
class SpectraNet(nn.Layer):
def __init__(self, char_embedding, speaker_embedding, encoder, decoder, postnet):
super(SpectraNet, self).__init__()
self.char_embedding = char_embedding
self.speaker_embedding = speaker_embedding
self.encoder = encoder
self.decoder = decoder
self.postnet = postnet
def forward(self, text, text_lengths, speakers=None, mel=None, frame_lengths=None,
force_monotonic_attention=None, window=None):
# encode
text_embed = self.char_embedding(text)# no stress embedding here
speaker_embed = F.softsign(self.speaker_embedding(speakers)) if self.speaker_embedding is not None else None
keys, values = self.encoder(text_embed, speaker_embed)
if mel is not None:
return self.teacher_forced_train(keys, values, text_lengths, speaker_embed, mel)
else:
return self.inference(keys, values, text_lengths, speaker_embed, force_monotonic_attention, window)
def teacher_forced_train(self, keys, values, text_lengths, speaker_embed, mel):
# build decoder inputs by shifting over by one frame and add all zero <start> frame
# the mel input is downsampled by a reduction factor
batch_size = mel.shape[0]
mel_input = paddle.reshape(mel, (batch_size, -1, self.decoder.reduction_factor, self.decoder.in_channels))
zero_frame = paddle.zeros((batch_size, 1, self.decoder.in_channels), dtype=mel.dtype)
# downsample mel input as a regularization
mel_input = paddle.concat([zero_frame, mel_input[:, :-1, -1, :]], axis=1)
# decoder
decoded, hidden, attentions, final_state = self.decoder(mel_input, keys, values, text_lengths, 0, speaker_embed)
attentions = paddle.stack(attentions) # (N, B, T_dec, T_encs)
# unfold frames
decoded = paddle.reshape(decoded, (batch_size, -1, self.decoder.in_channels))
# postnet
refined = self.postnet(hidden, speaker_embed)
return decoded, refined, attentions, final_state
def spec_loss(self, decoded, input, num_frames=None):
if num_frames is None:
l1_loss = paddle.mean(paddle.abs(decoded - input))
else:
# mask the <pad> part of the decoder
num_channels = decoded.shape[-1]
l1_loss = paddle.abs(decoded - input)
mask = paddle.fluid.layers.sequence_mask(num_frames, dtype=decoded.dtype)
l1_loss *= paddle.unsqueeze(mask, axis=-1)
l1_loss = paddle.sum(l1_loss) / paddle.scale(paddle.sum(mask), num_channels)
return l1_loss
@paddle.no_grad()
def inference(self, keys, values, text_lengths, speaker_embed,
force_monotonic_attention, window):
MAX_STEP = 500
# layer index of the first monotonic attention
num_monotonic_attention_layers = sum(force_monotonic_attention)
first_mono_attention_layer = 0
if num_monotonic_attention_layers > 0:
for i, item in enumerate(force_monotonic_attention):
if item:
first_mono_attention_layer = i
break
# stop cond (if would be more complicated to support minibatch autoregressive decoding)
# so we only supports batch_size == 0 in inference
def should_continue(i, mel_input, outputs, hidden, attention, state, coeffs):
T_enc = coeffs.shape[-1]
attn_peak = paddle.argmax(coeffs[first_mono_attention_layer, 0, 0]) \
if num_monotonic_attention_layers > 0 \
else paddle.fill_constant([1], "int64", value=0)
return i < MAX_STEP and paddle.reshape(attn_peak, [1]) < T_enc - 1
def loop_body(i, mel_input, outputs, hiddens, attentions, state=None, coeffs=None):
# state is None coeffs is None for the first step
decoded, hidden, new_coeffs, new_state = self.decoder(
mel_input, keys, values, text_lengths, i, speaker_embed,
state, force_monotonic_attention, coeffs, window)
new_coeffs = paddle.stack(new_coeffs) # (N, B, T_dec=1, T_enc)
attentions.append(new_coeffs) # (N, B, T_dec=1, T_enc)
outputs.append(decoded) # (B, T_dec=1, rC_mel)
hiddens.append(hidden) # (B, T_dec=1, C_dec)
# slice the last frame out of r generated frames to be used as the input for the next step
batch_size = mel_input.shape[0]
frames = paddle.reshape(decoded, [batch_size, -1, self.decoder.reduction_factor, self.decoder.in_channels])
input_frame = frames[:, :, -1, :]
return (i + 1, input_frame, outputs, hiddens, attentions, new_state, new_coeffs)
i = 0
batch_size = keys.shape[0]
input_frame = paddle.zeros((batch_size, 1, self.decoder.in_channels), dtype=keys.dtype)
outputs = []
hiddens = []
attentions = []
loop_state = loop_body(i, input_frame, outputs, hiddens, attentions)
while should_continue(*loop_state):
loop_state = loop_body(*loop_state)
outputs, hiddens, attention = loop_state[2], loop_state[3], loop_state[4]
# concat decoder timesteps
outputs = paddle.concat(outputs, axis=1)
hiddens = paddle.concat(hiddens, axis=1)
attention = paddle.concat(attention, axis=2)
# unfold frames
outputs = paddle.reshape(outputs, (batch_size, -1, self.decoder.in_channels))
refined = self.postnet(hiddens, speaker_embed)
return outputs, refined, attention

View File

@ -1,13 +0,0 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -1,113 +0,0 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.dygraph as dg
import paddle.fluid as fluid
from parakeet.models.transformer_tts.utils import *
from parakeet.models.fastspeech.fft_block import FFTBlock
class Decoder(dg.Layer):
def __init__(self,
len_max_seq,
n_layers,
n_head,
d_k,
d_q,
d_model,
d_inner,
fft_conv1d_kernel,
fft_conv1d_padding,
dropout=0.1):
"""Decoder layer of FastSpeech.
Args:
len_max_seq (int): the max mel len of sequence.
n_layers (int): the layers number of FFTBlock.
n_head (int): the head number of multihead attention.
d_k (int): the dim of key in multihead attention.
d_q (int): the dim of query in multihead attention.
d_model (int): the dim of hidden layer in multihead attention.
d_inner (int): the dim of hidden layer in ffn.
fft_conv1d_kernel (int): the conv kernel size in FFTBlock.
fft_conv1d_padding (int): the conv padding size in FFTBlock.
dropout (float, optional): dropout probability of FFTBlock. Defaults to 0.1.
"""
super(Decoder, self).__init__()
n_position = len_max_seq + 1
self.n_head = n_head
self.pos_inp = get_sinusoid_encoding_table(
n_position, d_model, padding_idx=0)
self.position_enc = dg.Embedding(
size=[n_position, d_model],
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(
self.pos_inp),
trainable=False))
self.layer_stack = [
FFTBlock(
d_model,
d_inner,
n_head,
d_k,
d_q,
fft_conv1d_kernel,
fft_conv1d_padding,
dropout=dropout) for _ in range(n_layers)
]
for i, layer in enumerate(self.layer_stack):
self.add_sublayer('fft_{}'.format(i), layer)
def forward(self, enc_seq, enc_pos):
"""
Compute decoder outputs.
Args:
enc_seq (Variable): shape(B, T_mel, C), dtype float32,
the output of length regulator, where T_mel means the timesteps of input spectrum.
enc_pos (Variable): shape(B, T_mel), dtype int64,
the spectrum position.
Returns:
dec_output (Variable): shape(B, T_mel, C), the decoder output.
dec_slf_attn_list (list[Variable]): len(n_layers), the decoder self attention list.
"""
dec_slf_attn_list = []
if fluid.framework._dygraph_tracer()._train_mode:
slf_attn_mask = get_dec_attn_key_pad_mask(enc_pos, self.n_head,
enc_seq.dtype)
else:
len_q = enc_seq.shape[1]
slf_attn_mask = layers.triu(
layers.ones(
shape=[len_q, len_q], dtype=enc_seq.dtype),
diagonal=1)
slf_attn_mask = layers.cast(
slf_attn_mask != 0, dtype=enc_seq.dtype) * -1e30
non_pad_mask = get_non_pad_mask(enc_pos, 1, enc_seq.dtype)
# -- Forward
dec_output = enc_seq + self.position_enc(enc_pos)
for dec_layer in self.layer_stack:
dec_output, dec_slf_attn = dec_layer(
dec_output,
non_pad_mask=non_pad_mask,
slf_attn_mask=slf_attn_mask)
dec_slf_attn_list += [dec_slf_attn]
return dec_output, dec_slf_attn_list

View File

@ -1,109 +0,0 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.dygraph as dg
import paddle.fluid as fluid
from parakeet.models.transformer_tts.utils import *
from parakeet.models.fastspeech.fft_block import FFTBlock
class Encoder(dg.Layer):
def __init__(self,
n_src_vocab,
len_max_seq,
n_layers,
n_head,
d_k,
d_q,
d_model,
d_inner,
fft_conv1d_kernel,
fft_conv1d_padding,
dropout=0.1):
"""Encoder layer of FastSpeech.
Args:
n_src_vocab (int): the number of source vocabulary.
len_max_seq (int): the max mel len of sequence.
n_layers (int): the layers number of FFTBlock.
n_head (int): the head number of multihead attention.
d_k (int): the dim of key in multihead attention.
d_q (int): the dim of query in multihead attention.
d_model (int): the dim of hidden layer in multihead attention.
d_inner (int): the dim of hidden layer in ffn.
fft_conv1d_kernel (int): the conv kernel size in FFTBlock.
fft_conv1d_padding (int): the conv padding size in FFTBlock.
dropout (float, optional): dropout probability of FFTBlock. Defaults to 0.1.
"""
super(Encoder, self).__init__()
n_position = len_max_seq + 1
self.n_head = n_head
self.src_word_emb = dg.Embedding(
size=[n_src_vocab, d_model],
padding_idx=0,
param_attr=fluid.initializer.Normal(
loc=0.0, scale=1.0))
self.pos_inp = get_sinusoid_encoding_table(
n_position, d_model, padding_idx=0)
self.position_enc = dg.Embedding(
size=[n_position, d_model],
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.NumpyArrayInitializer(
self.pos_inp),
trainable=False))
self.layer_stack = [
FFTBlock(
d_model,
d_inner,
n_head,
d_k,
d_q,
fft_conv1d_kernel,
fft_conv1d_padding,
dropout=dropout) for _ in range(n_layers)
]
for i, layer in enumerate(self.layer_stack):
self.add_sublayer('fft_{}'.format(i), layer)
def forward(self, character, text_pos):
"""
Encode text sequence.
Args:
character (Variable): shape(B, T_text), dtype float32, the input text characters,
where T_text means the timesteps of input characters,
text_pos (Variable): shape(B, T_text), dtype int64, the input text position.
Returns:
enc_output (Variable): shape(B, T_text, C), the encoder output.
enc_slf_attn_list (list[Variable]): len(n_layers), the encoder self attention list.
"""
enc_slf_attn_list = []
# -- Forward
enc_output = self.src_word_emb(character) + self.position_enc(
text_pos) #(N, T, C)
slf_attn_mask = get_attn_key_pad_mask(text_pos, self.n_head,
enc_output.dtype)
non_pad_mask = get_non_pad_mask(text_pos, 1, enc_output.dtype)
for enc_layer in self.layer_stack:
enc_output, enc_slf_attn = enc_layer(
enc_output,
non_pad_mask=non_pad_mask,
slf_attn_mask=slf_attn_mask)
enc_slf_attn_list += [enc_slf_attn]
return enc_output, enc_slf_attn_list

View File

@ -1,133 +0,0 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import numpy as np
import paddle.fluid.dygraph as dg
import paddle.fluid as fluid
from parakeet.g2p.text.symbols import symbols
from parakeet.models.transformer_tts.utils import *
from parakeet.models.transformer_tts.post_convnet import PostConvNet
from parakeet.models.fastspeech.length_regulator import LengthRegulator
from parakeet.models.fastspeech.encoder import Encoder
from parakeet.models.fastspeech.decoder import Decoder
class FastSpeech(dg.Layer):
def __init__(self, cfg, num_mels=80):
"""FastSpeech model.
Args:
cfg: the yaml configs used in FastSpeech model.
num_mels (int, optional): the number of mel bands when calculating mel spectrograms. Defaults to 80.
"""
super(FastSpeech, self).__init__()
self.encoder = Encoder(
n_src_vocab=len(symbols) + 1,
len_max_seq=cfg['max_seq_len'],
n_layers=cfg['encoder_n_layer'],
n_head=cfg['encoder_head'],
d_k=cfg['hidden_size'] // cfg['encoder_head'],
d_q=cfg['hidden_size'] // cfg['encoder_head'],
d_model=cfg['hidden_size'],
d_inner=cfg['encoder_conv1d_filter_size'],
fft_conv1d_kernel=cfg['fft_conv1d_filter'],
fft_conv1d_padding=cfg['fft_conv1d_padding'],
dropout=0.1)
self.length_regulator = LengthRegulator(
input_size=cfg['hidden_size'],
out_channels=cfg['duration_predictor_output_size'],
filter_size=cfg['duration_predictor_filter_size'],
dropout=cfg['dropout'])
self.decoder = Decoder(
len_max_seq=cfg['max_seq_len'],
n_layers=cfg['decoder_n_layer'],
n_head=cfg['decoder_head'],
d_k=cfg['hidden_size'] // cfg['decoder_head'],
d_q=cfg['hidden_size'] // cfg['decoder_head'],
d_model=cfg['hidden_size'],
d_inner=cfg['decoder_conv1d_filter_size'],
fft_conv1d_kernel=cfg['fft_conv1d_filter'],
fft_conv1d_padding=cfg['fft_conv1d_padding'],
dropout=0.1)
self.weight = fluid.ParamAttr(
initializer=fluid.initializer.XavierInitializer())
k = math.sqrt(1.0 / cfg['hidden_size'])
self.bias = fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-k, high=k))
self.mel_linear = dg.Linear(
cfg['hidden_size'],
num_mels * cfg['outputs_per_step'],
param_attr=self.weight,
bias_attr=self.bias, )
self.postnet = PostConvNet(
n_mels=num_mels,
num_hidden=512,
filter_size=5,
padding=int(5 / 2),
num_conv=5,
outputs_per_step=cfg['outputs_per_step'],
use_cudnn=True,
dropout=0.1,
batchnorm_last=True)
def forward(self,
character,
text_pos,
mel_pos=None,
length_target=None,
alpha=1.0):
"""
Compute mel output from text character.
Args:
character (Variable): shape(B, T_text), dtype float32, the input text characters,
where T_text means the timesteps of input characters,
text_pos (Variable): shape(B, T_text), dtype int64, the input text position.
mel_pos (Variable, optional): shape(B, T_mel), dtype int64, the spectrum position,
where T_mel means the timesteps of input spectrum,
length_target (Variable, optional): shape(B, T_text), dtype int64,
the duration of phoneme compute from pretrained transformerTTS. Defaults to None.
alpha (float32, optional): The hyperparameter to determine the length of the expanded sequence
mel, thereby controlling the voice speed. Defaults to 1.0.
Returns:
mel_output (Variable): shape(B, T_mel, C), the mel output before postnet.
mel_output_postnet (Variable): shape(B, T_mel, C), the mel output after postnet.
duration_predictor_output (Variable): shape(B, T_text), the duration of phoneme compute with duration predictor.
enc_slf_attn_list (List[Variable]): len(enc_n_layers), the encoder self attention list.
dec_slf_attn_list (List[Variable]): len(dec_n_layers), the decoder self attention list.
"""
encoder_output, enc_slf_attn_list = self.encoder(character, text_pos)
if fluid.framework._dygraph_tracer()._train_mode:
length_regulator_output, duration_predictor_output = self.length_regulator(
encoder_output, target=length_target, alpha=alpha)
decoder_output, dec_slf_attn_list = self.decoder(
length_regulator_output, mel_pos)
mel_output = self.mel_linear(decoder_output)
mel_output_postnet = self.postnet(mel_output) + mel_output
return mel_output, mel_output_postnet, duration_predictor_output, enc_slf_attn_list, dec_slf_attn_list
else:
length_regulator_output, decoder_pos = self.length_regulator(
encoder_output, alpha=alpha)
decoder_output, _ = self.decoder(length_regulator_output,
decoder_pos)
mel_output = self.mel_linear(decoder_output)
mel_output_postnet = self.postnet(mel_output) + mel_output
return mel_output, mel_output_postnet

View File

@ -1,84 +0,0 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import math
import paddle.fluid.dygraph as dg
import paddle.fluid.layers as layers
import paddle.fluid as fluid
from parakeet.modules.multihead_attention import MultiheadAttention
from parakeet.modules.ffn import PositionwiseFeedForward
class FFTBlock(dg.Layer):
def __init__(self,
d_model,
d_inner,
n_head,
d_k,
d_q,
filter_size,
padding,
dropout=0.2):
"""Feed forward structure based on self-attention.
Args:
d_model (int): the dim of hidden layer in multihead attention.
d_inner (int): the dim of hidden layer in ffn.
n_head (int): the head number of multihead attention.
d_k (int): the dim of key in multihead attention.
d_q (int): the dim of query in multihead attention.
filter_size (int): the conv kernel size.
padding (int): the conv padding size.
dropout (float, optional): dropout probability. Defaults to 0.2.
"""
super(FFTBlock, self).__init__()
self.slf_attn = MultiheadAttention(
d_model,
d_k,
d_q,
num_head=n_head,
is_bias=True,
dropout=dropout,
is_concat=False)
self.pos_ffn = PositionwiseFeedForward(
d_model,
d_inner,
filter_size=filter_size,
padding=padding,
dropout=dropout)
def forward(self, enc_input, non_pad_mask, slf_attn_mask=None):
"""
Feed forward block of FastSpeech
Args:
enc_input (Variable): shape(B, T, C), dtype float32, the embedding characters input,
where T means the timesteps of input.
non_pad_mask (Variable): shape(B, T, 1), dtype int64, the mask of sequence.
slf_attn_mask (Variable, optional): shape(B, len_q, len_k), dtype int64, the mask of self attention,
where len_q means the sequence length of query and len_k means the sequence length of key. Defaults to None.
Returns:
output (Variable): shape(B, T, C), the output after self-attention & ffn.
slf_attn (Variable): shape(B * n_head, T, T), the self attention.
"""
output, slf_attn = self.slf_attn(
enc_input, enc_input, enc_input, mask=slf_attn_mask)
output *= non_pad_mask
output = self.pos_ffn(output)
output *= non_pad_mask
return output, slf_attn

View File

@ -1,181 +0,0 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import math
import parakeet.models.fastspeech.utils
import paddle.fluid.dygraph as dg
import paddle.fluid.layers as layers
import paddle.fluid as fluid
from parakeet.modules.customized import Conv1D
class LengthRegulator(dg.Layer):
def __init__(self, input_size, out_channels, filter_size, dropout=0.1):
"""Length Regulator block in FastSpeech.
Args:
input_size (int): the channel number of input.
out_channels (int): the output channel number.
filter_size (int): the filter size of duration predictor.
dropout (float, optional): dropout probability. Defaults to 0.1.
"""
super(LengthRegulator, self).__init__()
self.duration_predictor = DurationPredictor(
input_size=input_size,
out_channels=out_channels,
filter_size=filter_size,
dropout=dropout)
def LR(self, x, duration_predictor_output):
output = []
batch_size = x.shape[0]
for i in range(batch_size):
output.append(
self.expand(x[i:i + 1], duration_predictor_output[i:i + 1]))
output = self.pad(output)
return output
def pad(self, input_ele):
max_len = max([input_ele[i].shape[0] for i in range(len(input_ele))])
out_list = []
for i in range(len(input_ele)):
pad_len = max_len - input_ele[i].shape[0]
one_batch_padded = layers.pad(input_ele[i], [0, pad_len, 0, 0],
pad_value=0.0)
out_list.append(one_batch_padded)
out_padded = layers.stack(out_list)
return out_padded
def expand(self, batch, predicted):
out = []
time_steps = batch.shape[1]
fertilities = predicted.numpy()
batch = layers.squeeze(batch, [0])
for i in range(time_steps):
if fertilities[0, i] == 0:
continue
out.append(
layers.expand(batch[i:i + 1, :], [int(fertilities[0, i]), 1]))
out = layers.concat(out, axis=0)
return out
def forward(self, x, alpha=1.0, target=None):
"""
Compute length of mel from encoder output use TransformerTTS attention
Args:
x (Variable): shape(B, T, C), dtype float32, the encoder output.
alpha (float32, optional): the hyperparameter to determine the length of
the expanded sequence mel, thereby controlling the voice speed. Defaults to 1.0.
target (Variable, optional): shape(B, T_text), dtype int64, the duration of phoneme compute from pretrained transformerTTS.
Defaults to None.
Returns:
output (Variable): shape(B, T, C), the output after exppand.
duration_predictor_output (Variable): shape(B, T, C), the output of duration predictor.
"""
duration_predictor_output = self.duration_predictor(x)
if fluid.framework._dygraph_tracer()._train_mode:
output = self.LR(x, target)
return output, duration_predictor_output
else:
duration_predictor_output = duration_predictor_output * alpha
duration_predictor_output = layers.ceil(duration_predictor_output)
output = self.LR(x, duration_predictor_output)
mel_pos = dg.to_variable(np.arange(1, output.shape[1] + 1)).astype(
np.int64)
mel_pos = layers.unsqueeze(mel_pos, [0])
return output, mel_pos
class DurationPredictor(dg.Layer):
def __init__(self, input_size, out_channels, filter_size, dropout=0.1):
"""Duration Predictor block in FastSpeech.
Args:
input_size (int): the channel number of input.
out_channels (int): the output channel number.
filter_size (int): the filter size.
dropout (float, optional): dropout probability. Defaults to 0.1.
"""
super(DurationPredictor, self).__init__()
self.input_size = input_size
self.out_channels = out_channels
self.filter_size = filter_size
self.dropout = dropout
k = math.sqrt(1.0 / self.input_size)
self.conv1 = Conv1D(
num_channels=self.input_size,
num_filters=self.out_channels,
filter_size=self.filter_size,
padding=1,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.XavierInitializer()),
bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-k, high=k)))
#data_format='NTC')
k = math.sqrt(1.0 / self.out_channels)
self.conv2 = Conv1D(
num_channels=self.out_channels,
num_filters=self.out_channels,
filter_size=self.filter_size,
padding=1,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.XavierInitializer()),
bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-k, high=k)))
#data_format='NTC')
self.layer_norm1 = dg.LayerNorm(self.out_channels)
self.layer_norm2 = dg.LayerNorm(self.out_channels)
self.weight = fluid.ParamAttr(
initializer=fluid.initializer.XavierInitializer())
k = math.sqrt(1.0 / self.out_channels)
self.bias = fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-k, high=k))
self.linear = dg.Linear(
self.out_channels, 1, param_attr=self.weight, bias_attr=self.bias)
def forward(self, encoder_output):
"""
Predict the duration of each character.
Args:
encoder_output (Variable): shape(B, T, C), dtype float32, the encoder output.
Returns:
out (Variable): shape(B, T, C), the output of duration predictor.
"""
# encoder_output.shape(N, T, C)
out = layers.transpose(encoder_output, [0, 2, 1])
out = self.conv1(out)
out = layers.transpose(out, [0, 2, 1])
out = layers.dropout(
layers.relu(self.layer_norm1(out)),
self.dropout,
dropout_implementation='upscale_in_train')
out = layers.transpose(out, [0, 2, 1])
out = self.conv2(out)
out = layers.transpose(out, [0, 2, 1])
out = layers.dropout(
layers.relu(self.layer_norm2(out)),
self.dropout,
dropout_implementation='upscale_in_train')
out = layers.relu(self.linear(out))
out = layers.squeeze(out, axes=[-1])
return out

View File

@ -1,46 +0,0 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
def get_alignment(attn_probs, mel_lens, n_head):
max_F = 0
assert attn_probs[0].shape[0] % n_head == 0
batch_size = int(attn_probs[0].shape[0] // n_head)
for i in range(len(attn_probs)):
multi_attn = attn_probs[i].numpy()
for j in range(n_head):
attn = multi_attn[j * batch_size:(j + 1) * batch_size]
F = score_F(attn)
if max_F < F:
max_F = F
max_attn = attn
alignment = compute_duration(max_attn, mel_lens)
return alignment, max_attn
def score_F(attn):
max = np.max(attn, axis=-1)
mean = np.mean(max)
return mean
def compute_duration(attn, mel_lens):
alignment = np.zeros([attn.shape[2]])
#for i in range(attn.shape[0]):
for j in range(mel_lens):
max_index = np.argmax(attn[0, j])
alignment[max_index] += 1
return alignment

View File

@ -1,48 +0,0 @@
import attrdict
import flatdict
import argparse
import yaml
class Config(attrdict.AttrDict):
def dump(self, path):
with open(path, 'wt') as f:
yaml.safe_dump(dict(self), f, default_flow_style=None)
def dumps(self):
return yaml.safe_dump(dict(self), default_flow_style=None)
@classmethod
def from_file(cls, path):
with open(path, 'rt') as f:
c = yaml.safe_load(f)
return cls(c)
def merge_file(self, path):
with open(path, 'rt') as f:
other = yaml.safe_load(f)
self.update(self + other)
def merge_args(self, args):
args_dict = vars(args)
args_dict.pop("config") # exclude config file path
args_dict = {k: v for k, v in args_dict.items() if v is not None}
nested_dict = flatdict.FlatDict(args_dict, delimiter=".").as_dict()
self.update(self + nested_dict)
def merge(self, other):
self.update(self + other)
def flatten(self):
flat = flatdict.FlatDict(self, delimiter='.')
return flat
def add_options_to_parser(self, parser):
parser.add_argument(
"--config", type=str,
help="extra config file to override the default config")
flat = self.flatten()
g = parser.add_argument_group("config file options")
for k, v in flat.items():
g.add_argument("--{}".format(k), type=type(v),
help="config file option: {}".format(k))

View File

@ -66,7 +66,8 @@ setup_info = dict(
'llvmlite==0.31.0',
'opencc',
'g2p_en',
'g2pM'
'g2pM',
'yacs',
],
# Package info