Parakeet/tests/unit/test_pwg.py

244 lines
7.5 KiB
Python

# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import torch
from parallel_wavegan.layers import residual_block
from parallel_wavegan.layers import upsample
from parallel_wavegan.models import parallel_wavegan as pwgan
from timer import timer
from parakeet.models.parallel_wavegan import ConvInUpsampleNet
from parakeet.models.parallel_wavegan import PWGDiscriminator
from parakeet.models.parallel_wavegan import PWGGenerator
from parakeet.models.parallel_wavegan import ResidualBlock
from parakeet.models.parallel_wavegan import ResidualPWGDiscriminator
from parakeet.utils.layer_tools import summary
from parakeet.utils.profile import synchronize
paddle.set_device("gpu:0")
device = torch.device("cuda:0")
def test_convin_upsample_net():
net = ConvInUpsampleNet(
[4, 4, 4, 4],
"LeakyReLU", {"negative_slope": 0.2},
freq_axis_kernel_size=3,
aux_context_window=0)
net2 = upsample.ConvInUpsampleNetwork(
[4, 4, 4, 4],
nonlinear_activation="LeakyReLU",
nonlinear_activation_params={"negative_slope": 0.2},
freq_axis_kernel_size=3,
aux_context_window=0).to(device)
summary(net)
for k, v in net2.named_parameters():
print(k, v.shape)
net.state_dict()[k].set_value(v.data.cpu().numpy())
c = paddle.randn([4, 80, 180])
synchronize()
with timer(unit='s') as t:
out = net(c)
synchronize()
print(f"paddle conv_in_upsample_net forward takes {t.elapse}s.")
with timer(unit='s') as t:
out.sum().backward()
synchronize()
print(f"paddle conv_in_upsample_net backward takes {t.elapse}s.")
c_torch = torch.as_tensor(c.numpy()).to(device)
torch.cuda.synchronize()
with timer(unit='s') as t:
out2 = net2(c_torch)
print(f"torch conv_in_upsample_net forward takes {t.elapse}s.")
with timer(unit='s') as t:
out2.sum().backward()
print(f"torch conv_in_upsample_net backward takes {t.elapse}s.")
print("forward check")
print(out.numpy()[0])
print(out2.data.cpu().numpy()[0])
print("backward check")
print(net.conv_in.weight.grad.numpy()[0])
print(net2.conv_in.weight.grad.data.cpu().numpy()[0])
def test_residual_block():
net = ResidualBlock(dilation=9)
net2 = residual_block.ResidualBlock(dilation=9)
summary(net)
summary(net2)
for k, v in net2.named_parameters():
net.state_dict()[k].set_value(v.data.cpu().numpy())
x = paddle.randn([4, 64, 180])
c = paddle.randn([4, 80, 180])
res, skip = net(x, c)
res2, skip2 = net2(torch.as_tensor(x.numpy()), torch.as_tensor(c.numpy()))
print("forward:")
print(res.numpy()[0])
print(res2.data.cpu().numpy()[0])
print(skip.numpy()[0])
print(skip2.data.cpu().numpy()[0])
(res.sum() + skip.sum()).backward()
(res2.sum() + skip2.sum()).backward()
print("backward:")
print(net.conv.weight.grad.numpy().squeeze()[0])
print(net2.conv.weight.grad.data.cpu().numpy().squeeze()[0])
def test_pwg_generator():
net = PWGGenerator(
layers=9,
stacks=3,
upsample_scales=[4, 4, 4, 4],
nonlinear_activation="LeakyReLU",
nonlinear_activation_params={"negative_slope": 0.5},
use_weight_norm=True)
net2 = pwgan.ParallelWaveGANGenerator(
layers=9,
stacks=3,
upsample_params={
"upsample_scales": [4, 4, 4, 4],
"nonlinear_activation": "LeakyReLU",
"nonlinear_activation_params": {
"negative_slope": 0.5
}
},
use_weight_norm=True).to(device)
summary(net)
summary(net2)
for k, v in net2.named_parameters():
p = net.state_dict()[k]
if k.endswith("_g"):
p.set_value(v.data.cpu().numpy().reshape([-1]))
else:
p.set_value(v.data.cpu().numpy())
x = paddle.randn([4, 1, 80 * 256])
c = paddle.randn([4, 80, 80 + 4])
synchronize()
with timer(unit='s') as t:
out = net(x, c)
synchronize()
print(f"paddle generator forward takes {t.elapse}s.")
synchronize()
with timer(unit='s') as t:
out.sum().backward()
synchronize()
print(f"paddle generator backward takes {t.elapse}s.")
x_torch = torch.as_tensor(x.numpy()).to(device)
c_torch = torch.as_tensor(c.numpy()).to(device)
torch.cuda.synchronize()
with timer(unit='s') as t:
out2 = net2(x_torch, c_torch)
torch.cuda.synchronize()
print(f"torch generator forward takes {t.elapse}s.")
torch.cuda.synchronize()
with timer(unit='s') as t:
out2.sum().backward()
torch.cuda.synchronize()
print(f"torch generator backward takes {t.elapse}s.")
print("test forward:")
print(out.numpy()[0])
print(out2.data.cpu().numpy()[0])
print("test backward:")
print("wv")
print(net.first_conv.weight_v.grad.numpy().squeeze())
print(net2.first_conv.weight_v.grad.data.cpu().numpy().squeeze())
print("wg")
print(net.first_conv.weight_g.grad.numpy().squeeze())
print(net2.first_conv.weight_g.grad.data.cpu().numpy().squeeze())
# print(out.shape)
def test_pwg_discriminator():
net = PWGDiscriminator()
net2 = pwgan.ParallelWaveGANDiscriminator().to(device)
summary(net)
summary(net2)
for k, v in net2.named_parameters():
p = net.state_dict()[k]
if k.endswith("_g"):
p.set_value(v.data.cpu().numpy().reshape([-1]))
else:
p.set_value(v.data.cpu().numpy())
x = paddle.randn([4, 1, 180 * 256])
synchronize()
with timer() as t:
y = net(x)
synchronize()
print(f"forward takes {t.elapse}s.")
synchronize()
with timer() as t:
y.sum().backward()
synchronize()
print(f"backward takes {t.elapse}s.")
x_torch = torch.as_tensor(x.numpy()).to(device)
torch.cuda.synchronize()
with timer() as t:
y2 = net2(x_torch)
torch.cuda.synchronize()
print(f"forward takes {t.elapse}s.")
torch.cuda.synchronize()
with timer() as t:
y2.sum().backward()
torch.cuda.synchronize()
print(f"backward takes {t.elapse}s.")
print("test forward:")
print(y.numpy()[0])
print(y2.data.cpu().numpy()[0])
print("test backward:")
print(net.conv_layers[0].weight_v.grad.numpy().squeeze())
print(net2.conv_layers[0].weight_v.grad.data.cpu().numpy().squeeze())
def test_residual_pwg_discriminator():
net = ResidualPWGDiscriminator()
net2 = pwgan.ResidualParallelWaveGANDiscriminator()
summary(net)
summary(net2)
for k, v in net2.named_parameters():
p = net.state_dict()[k]
if k.endswith("_g"):
p.set_value(v.data.cpu().numpy().reshape([-1]))
else:
p.set_value(v.data.cpu().numpy())
x = paddle.randn([4, 1, 180 * 256])
y = net(x)
y2 = net2(torch.as_tensor(x.numpy()))
print(y.numpy()[0])
print(y2.data.cpu().numpy()[0])
print(y.shape)