Parakeet/examples/deepvoice3/utils.py

375 lines
12 KiB
Python

# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import os
import numpy as np
import matplotlib
matplotlib.use("agg")
from matplotlib import cm
import matplotlib.pyplot as plt
import librosa
from scipy import signal
from librosa import display
import soundfile as sf
from paddle import fluid
import paddle.fluid.dygraph as dg
from parakeet.g2p import en
def get_place(device_id):
"""get place from device_id, -1 stands for CPU"""
if device_id == -1:
place = fluid.CPUPlace()
else:
place = fluid.CUDAPlace(device_id)
return place
def add_options(parser):
parser.add_argument("--config", type=str, help="experimrnt config")
parser.add_argument(
"--data",
type=str,
default="/workspace/datasets/LJSpeech-1.1/",
help="The path of the LJSpeech dataset.")
parser.add_argument("--device", type=int, default=-1, help="device to use")
g = parser.add_mutually_exclusive_group()
g.add_argument("--checkpoint", type=str, help="checkpoint to resume from.")
g.add_argument(
"--iteration",
type=int,
help="the iteration of the checkpoint to load from output directory")
parser.add_argument(
"output", type=str, default="experiment", help="path to save results")
def make_evaluator(config, text_sequences, output_dir, writer=None):
c = config["transform"]
p_replace = 0.0
sample_rate = c["sample_rate"]
preemphasis = c["preemphasis"]
win_length = c["win_length"]
hop_length = c["hop_length"]
min_level_db = c["min_level_db"]
ref_level_db = c["ref_level_db"]
synthesis_config = config["synthesis"]
power = synthesis_config["power"]
n_iter = synthesis_config["n_iter"]
return Evaluator(
text_sequences,
p_replace,
sample_rate,
preemphasis,
win_length,
hop_length,
min_level_db,
ref_level_db,
power,
n_iter,
output_dir=output_dir,
writer=writer)
class Evaluator(object):
def __init__(self,
text_sequences,
p_replace,
sample_rate,
preemphasis,
win_length,
hop_length,
min_level_db,
ref_level_db,
power,
n_iter,
output_dir,
writer=None):
self.text_sequences = text_sequences
self.output_dir = output_dir
self.writer = writer
self.p_replace = p_replace
self.sample_rate = sample_rate
self.preemphasis = preemphasis
self.win_length = win_length
self.hop_length = hop_length
self.min_level_db = min_level_db
self.ref_level_db = ref_level_db
self.power = power
self.n_iter = n_iter
def process_a_sentence(self, model, text):
text = np.array(
en.text_to_sequence(
text, p=self.p_replace), dtype=np.int64)
length = len(text)
text_positions = np.arange(1, 1 + length, dtype=np.int64)
text = np.expand_dims(text, 0)
text_positions = np.expand_dims(text_positions, 0)
model.eval()
if isinstance(model, dg.DataParallel):
_model = model._layers
else:
_model = model
mel_outputs, linear_outputs, alignments, done = _model.transduce(
dg.to_variable(text), dg.to_variable(text_positions))
linear_outputs_np = linear_outputs.numpy()[0].T # (C, T)
wav = spec_to_waveform(linear_outputs_np, self.min_level_db,
self.ref_level_db, self.power, self.n_iter,
self.win_length, self.hop_length,
self.preemphasis)
alignments_np = alignments.numpy()[0] # batch_size = 1
return wav, alignments_np
def __call__(self, model, iteration):
writer = self.writer
for i, seq in enumerate(self.text_sequences):
print("[Eval] synthesizing sentence {}".format(i))
wav, alignments_np = self.process_a_sentence(model, seq)
wav_path = os.path.join(
self.output_dir,
"eval_sample_{}_step_{:09d}.wav".format(i, iteration))
sf.write(wav_path, wav, self.sample_rate)
if writer is not None:
writer.add_audio(
"eval_sample_{}".format(i),
wav,
iteration,
sample_rate=self.sample_rate)
attn_path = os.path.join(
self.output_dir,
"eval_sample_{}_step_{:09d}.png".format(i, iteration))
plot_alignment(alignments_np, attn_path)
if writer is not None:
writer.add_image(
"eval_sample_attn_{}".format(i),
cm.viridis(alignments_np),
iteration,
dataformats="HWC")
def make_state_saver(config, output_dir, writer=None):
c = config["transform"]
p_replace = c["replace_pronunciation_prob"]
sample_rate = c["sample_rate"]
preemphasis = c["preemphasis"]
win_length = c["win_length"]
hop_length = c["hop_length"]
min_level_db = c["min_level_db"]
ref_level_db = c["ref_level_db"]
synthesis_config = config["synthesis"]
power = synthesis_config["power"]
n_iter = synthesis_config["n_iter"]
return StateSaver(p_replace, sample_rate, preemphasis, win_length,
hop_length, min_level_db, ref_level_db, power, n_iter,
output_dir, writer)
class StateSaver(object):
def __init__(self,
p_replace,
sample_rate,
preemphasis,
win_length,
hop_length,
min_level_db,
ref_level_db,
power,
n_iter,
output_dir,
writer=None):
self.output_dir = output_dir
self.writer = writer
self.p_replace = p_replace
self.sample_rate = sample_rate
self.preemphasis = preemphasis
self.win_length = win_length
self.hop_length = hop_length
self.min_level_db = min_level_db
self.ref_level_db = ref_level_db
self.power = power
self.n_iter = n_iter
def __call__(self, outputs, inputs, iteration):
mel_output, lin_output, alignments, done_output = outputs
mel_input, lin_input = inputs
writer = self.writer
# mel spectrogram
mel_input = mel_input[0].numpy().T
mel_output = mel_output[0].numpy().T
path = os.path.join(self.output_dir, "mel_spec")
plt.figure(figsize=(10, 3))
display.specshow(mel_input)
plt.colorbar()
plt.title("mel_input")
plt.savefig(
os.path.join(path, "target_mel_spec_step_{:09d}.png".format(
iteration)))
plt.close()
if writer is not None:
writer.add_image(
"target/mel_spec",
cm.viridis(mel_input),
iteration,
dataformats="HWC")
plt.figure(figsize=(10, 3))
display.specshow(mel_output)
plt.colorbar()
plt.title("mel_output")
plt.savefig(
os.path.join(path, "predicted_mel_spec_step_{:09d}.png".format(
iteration)))
plt.close()
if writer is not None:
writer.add_image(
"predicted/mel_spec",
cm.viridis(mel_output),
iteration,
dataformats="HWC")
# linear spectrogram
lin_input = lin_input[0].numpy().T
lin_output = lin_output[0].numpy().T
path = os.path.join(self.output_dir, "lin_spec")
plt.figure(figsize=(10, 3))
display.specshow(lin_input)
plt.colorbar()
plt.title("mel_input")
plt.savefig(
os.path.join(path, "target_lin_spec_step_{:09d}.png".format(
iteration)))
plt.close()
if writer is not None:
writer.add_image(
"target/lin_spec",
cm.viridis(lin_input),
iteration,
dataformats="HWC")
plt.figure(figsize=(10, 3))
display.specshow(lin_output)
plt.colorbar()
plt.title("mel_input")
plt.savefig(
os.path.join(path, "predicted_lin_spec_step_{:09d}.png".format(
iteration)))
plt.close()
if writer is not None:
writer.add_image(
"predicted/lin_spec",
cm.viridis(lin_output),
iteration,
dataformats="HWC")
# alignment
path = os.path.join(self.output_dir, "alignments")
alignments = alignments[:, 0, :, :].numpy()
for idx, attn_layer in enumerate(alignments):
save_path = os.path.join(
path, "train_attn_layer_{}_step_{}.png".format(idx, iteration))
plot_alignment(attn_layer, save_path)
if writer is not None:
writer.add_image(
"train_attn/layer_{}".format(idx),
cm.viridis(attn_layer),
iteration,
dataformats="HWC")
# synthesize waveform
wav = spec_to_waveform(
lin_output, self.min_level_db, self.ref_level_db, self.power,
self.n_iter, self.win_length, self.hop_length, self.preemphasis)
path = os.path.join(self.output_dir, "waveform")
save_path = os.path.join(
path, "train_sample_step_{:09d}.wav".format(iteration))
sf.write(save_path, wav, self.sample_rate)
if writer is not None:
writer.add_audio(
"train_sample", wav, iteration, sample_rate=self.sample_rate)
def spec_to_waveform(spec, min_level_db, ref_level_db, power, n_iter,
win_length, hop_length, preemphasis):
"""Convert output linear spec to waveform using griffin-lim vocoder.
Args:
spec (ndarray): the output linear spectrogram, shape(C, T), where C means n_fft, T means frames.
"""
denoramlized = np.clip(spec, 0, 1) * (-min_level_db) + min_level_db
lin_scaled = np.exp((denoramlized + ref_level_db) / 20 * np.log(10))
wav = librosa.griffinlim(
lin_scaled**power,
n_iter=n_iter,
hop_length=hop_length,
win_length=win_length)
if preemphasis > 0:
wav = signal.lfilter([1.], [1., -preemphasis], wav)
wav = np.clip(wav, -1.0, 1.0)
return wav
def make_output_tree(output_dir):
print("creating output tree: {}".format(output_dir))
ckpt_dir = os.path.join(output_dir, "checkpoints")
state_dir = os.path.join(output_dir, "states")
eval_dir = os.path.join(output_dir, "eval")
for x in [ckpt_dir, state_dir, eval_dir]:
if not os.path.exists(x):
os.makedirs(x)
for x in ["alignments", "waveform", "lin_spec", "mel_spec"]:
p = os.path.join(state_dir, x)
if not os.path.exists(p):
os.makedirs(p)
def plot_alignment(alignment, path):
"""
Plot an attention layer's alignment for a sentence.
alignment: shape(T_dec, T_enc).
"""
plt.figure()
plt.imshow(alignment)
plt.colorbar()
plt.xlabel('Encoder timestep')
plt.ylabel('Decoder timestep')
plt.savefig(path)
plt.close()