背景
这是“实例学PyTorch”系列的第4篇文章。在前三篇文章中
我们介绍了用PyTorch解决图片分类的问题。机器学习领域还有一个非常重要的问题是序列预测,与图片分类不同,序列预测需要考虑数据的前后关联性。能够处理数据的前后关联性的神经网络是循环神经网络(Recurrent Neural Network,RNN)。
PyTorch官方的示例代码https://github.com/pytorch/examples/中有一个使用长短期记忆网络(Long Short-Term Memory,LSTM)实现对sin函数预测的示例,我们暂时先不考虑LSTM,而是使用一个简单的循环神经网络(Recurrent Neural Network,RNN)来实现对正弦函数的预测。
本文的代码可以在我的GitHub仓库https://github.com/jin-li/pytorch-tutorial中的T04_series_rnn
文件夹中找到。
循环神经网络(RNN)简介
在之前的文章中,我们介绍了全连接神经网络和卷积神经网络,这两种神经网络都是前馈神经网络(Feedforward Neural Network)。前馈神经网络的一个缺点是不能处理序列数据,因为它没有存储之前的数据信息。循环神经网络(Recurrent Neural Network,RNN)是一种可以处理序列数据的神经网络,它在每个时间步都会保存之前的数据信息,从而可以处理序列数据。需要注意的是,这里的序列数据不一定是时间序列数据,也可以是空间序列数据。例如,自然语言处理中的句子就是一个空间序列数据,声音识别中的音频数据属于序列数据。这里为了方便,不管序列数据是不是时间序列,我们都将其中的每个元素称为时间步(time step)。
循环神经网络的结构并不复杂,它的核心有两点:
- 一是在训练时,需要将训练数据按照时间步展开,然后通过遍历每个时间步来计算损失函数,最后通过反向传播算法来更新权重。
- 二是对于每个时间步,不止有一个输出,还有一个隐藏状态(hidden state),这个隐藏状态会在下一个时间步被传递到下一个时间步,从而保留了之前的数据信息。
循环神经网络的具体知识可以参考斯坦福大学的CS230课程课件,这里我们用一个动图来简单说明循环神经网络的工作原理:
这里的$x$是输入,$h$是隐藏状态,$y$是输出,$t$是时间步。可以看到,隐藏状态$h$在每个时间步都会被传递到下一个时间步,从而保留了之前的数据信息。$x$, $h$和$y$各为神经网络的一个层,$h$是隐藏层,$x$是输入层,$y$是输出层,各个层的大小可以根据具体问题来确定。
正弦函数预测
正弦函数可以看作一个时间序列,在某些时刻,正弦函数的值也许是相同的,但其后的值可能不同。例如对于$y = \sin(x)$,当$x = 0$和$x = \pi$时,$y$都等于$0$,但在这两个时刻的后一个时间步(假设时间步的大小为0.01,则后两个时间步为$x = 0.01$和$x = \pi + 0.01$)时,$y$就不相同了。要想预测后一个时间步的值,我们不仅需要知道当前时间步的值,还要知道之前几个时间步的值。这正是循环神经网络的用途所在。
RNN模型设计
我们这个问题比较简单,只需要用一个循环神经网络就可以了。我们的循环神经网络的输入是一个序列,输出是这个序列的下一个值。我们的输入序列是正弦函数的值,输出是正弦函数的下一个值。我们的循环神经网络的结构如下:
- 输入层:输入层的大小为1,即每个时间步只有一个输入。
- 隐藏层:隐藏层的大小可以随意选择。考虑到我们这个问题比较简单,我们选择10个神经元作为隐藏层。
- 输出层:输出层的大小为1,即每个时间步只有一个输出。
这样我们的循环神经网络的结构就确定了。我们可以使用PyTorch的nn.RNN
类来实现这个循环神经网络:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
class SimpleRNN(nn.Module):
def __init__(self, rnn_type, input_size, hidden_size, num_layers):
super(SimpleRNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_size, dropout=(0 if num_layers == 1 else 0.05), num_layers=num_layers, batch_first=True)
self.out = nn.Linear(hidden_size, 1) # Linear layer is output of model
def forward(self, x, h_state):
# Define our forward pass, we take some input sequence and an initial hidden state.
r_out, h_state = self.rnn(x, h_state)
final_y = self.out(r_out[:, -1, :]) # Return only the last output of RNN.
return final_y, h_state
|
这里我们定义了一个SimpleRNN
类,它继承自PyTorch的nn.Module
类。在__init__
函数中,我们定义了一个nn.RNN
类的实例,这个实例就是我们的循环神经网络。在forward
函数中,我们定义了循环神经网络的前向传播过程,即我们如何计算输出。这里我们只需要返回最后一个时间步的输出即可。
数据准备
在本系列之前的三篇文章中,我们在训练神经网络时都是使用的别人已经准备好的数据集。但这里,我们需要自己来准备数据。对于本问题,数据集的准备还是非常简单的,我们只需要生成一些正弦函数的数据即可。注意,在生成了数据之后,我们需要将其封装成PyTorch的Dataset
类,这样才能方便地使用PyTorch的DataLoader
类来加载数据。
生成数据集的完整代码在本文对应的GitHub仓库https://github.com/jin-li/pytorch-tutorial中的T04_series_rnn
文件夹中的SineWaveDataset.py
文件中,具体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import numpy as np
import torch
from torch.utils.data import Dataset
class RNNDataset(Dataset):
def __init__(self, x, y=None):
self.data = x
self.labels = y
def __len__(self):
return self.data.shape[0]
def __getitem__(self, idx):
if self.labels is not None:
return self.data[idx], self.labels[idx]
else:
return self.data[idx]
def create_dataset(sequence_length, train_percent=0.8):
# Create sin wave at discrete time steps.
num_time_steps = 2000
time_steps = np.linspace(start=0, stop=1000, num=num_time_steps, dtype=np.float32)
discrete_sin_wave = (np.sin(time_steps * 2 * np.pi / 20)).reshape(-1, 1)
# Take (sequence_length + 1) elements & put as a row in sequence_data, extra element is value we want to predict.
# Move one time step and keep grabbing till we reach the end of our sampled sin wave.
sequence_data = []
for i in range(num_time_steps - sequence_length):
sequence_data.append(discrete_sin_wave[i: i + sequence_length + 1, 0])
sequence_data = np.array(sequence_data)
# Split for train/val.
num_total_samples = sequence_data.shape[0]
num_train_samples = int(train_percent * num_total_samples)
train_set = sequence_data[:num_train_samples, :]
test_set = sequence_data[num_train_samples:, :]
print('{} total sequence samples, {} used for training'.format(num_total_samples, num_train_samples))
# Take off the last element of each row and this will be our target value to predict.
x_train = train_set[:, :-1][:, :, np.newaxis]
y_train = train_set[:, -1][:, np.newaxis]
x_test = test_set[:, :-1][:, :, np.newaxis]
y_test = test_set[:, -1][:, np.newaxis]
train_data = RNNDataset(x_train, y_train)
test_data = RNNDataset(x_test, y_test)
torch.save(train_data, 'train_data.pt')
torch.save(test_data, 'test_data.pt')
if __name__ == '__main__':
create_dataset(sequence_length=80)
|
- 首先我们定义了一个
RNNDataset
类来存放训练数据,它继承自PyTorch的Dataset
类,这样我们就可以使用PyTorch的DataLoader
类来加载数据。
- 然后我们定义了一个
create_dataset
函数来生成正弦函数的数据。这个函数的输入参数有两个,一个是sequence_length
,表示我们要用多少个时间步的数据来预测下一个时间步的数据;另一个是train_percent
,表示我们将多少比例的数据用于训练,剩下的用于测试。这个函数的主要工作是生成正弦函数的数据,并将其封装成RNNDataset
类的实例,最后将训练数据和测试数据保存到文件中。
- 在
create_dataset
函数中,我们首先生成了2000个时间步的正弦函数数据,然后我们从这些正弦数据中生成一些序列数据用于训练。生成训练数据的方法是:
- 从第1个时间步开始,取连续50个时间步的数据作为一个序列,即$x_1, x_2, \cdots, x_{80}$。
- 这50个时间步的下一个时间步即为要预测的值$y = x_{81}$。
- 重复上述过程,直到取完所有的时间步。这里我们有2000个时间步,所以我们一共可以生成$2000 - 80 = 1920$个序列。
- 我们将这1950个序列数据分为两部分,其中80%用于训练($1920 \times 0.8 = 1536$,20%用于测试。
- 最后我们将训练数据和测试数据封装成
RNNDataset
类的实例,并保存到文件train_data.pt
和test_data.pt
中。
x_train
是一个三维张量,其形状为$1536 \times 80 \times 1$,其中$1536$是训练数据的个数,$80$是一个序列的长度,1是输入的维度。
y_train
是一个二维张量,其形状为$1536 \times 1$,其中$1950$是训练数据的个数,1是输出的维度。
y_train
是x_train
中每组训练数据的最后一个时间步之后那个时间步的值。
模型训练
和之前的文章一样,我们需要定义一个训练函数来训练我们的模型。这个训练函数的代码也很简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
def train(model, device, train_dataloader, loss_function, optimizer, epoch_idx, log_interval):
model.train()
trained_cnt = 0
for batch_idx, (x_batch, y_batch) in enumerate(train_dataloader):
x_batch, y_batch = x_batch.to(device), y_batch.to(device)
h_state = torch.zeros([model.num_layers, x_batch.size()[0], model.hidden_size]).to(device)
optimizer.zero_grad()
output, _ = model(x_batch, h_state)
loss = loss_function(output, y_batch)
loss.backward()
optimizer.step()
trained_cnt += len(x_batch)
if batch_idx % log_interval == 0:
print('Train Epoch: {:5d} [{:5d} / {:5d} ({:3.0f}%)]\tLoss: {:.3e}'.format(
epoch_idx, trained_cnt, len(train_dataloader.dataset),
100. * (batch_idx + 1) / len(train_dataloader), loss.item()))
|
这个训练函数的输入参数也和之前的训练函数类似,分别是模型、训练数据加载器、损失函数和优化器,这里不再赘述。
模型测试
除了训练函数,我们再定义一个测试函数,在训练完一个epoch之后,我们需要测试我们的模型在测试数据上的表现。测试函数的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
def test(model, device, test_dataloader, loss_function):
model.eval()
loss_all = []
for x_batch, y_batch in test_dataloader:
x_batch, y_batch = x_batch.to(device), y_batch.to(device)
h_state = torch.zeros([model.num_layers, x_batch.size()[0], model.hidden_size]).to(device)
output, _ = model(x_batch, h_state)
loss = loss_function(output, y_batch)
loss_all.append(loss.cpu().data.numpy())
print('Test loss: ', np.mean(loss_all))
return np.mean(loss_all)
|
模型预测
最后,我们定义一个预测函数,用于测试我们的模型能否根据一个正弦函数的序列数据预测出之后一段的值。预测函数的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
def predict(model, device, dataloader, prediction_steps):
model.eval()
h_state = torch.zeros([model.num_layers, 1, model.hidden_size]).to(device) # Adjusted to 3-D with batch size 1
initial_input = next(iter(dataloader))[1].to(device) # Grab one initial sequence of data for use in prediction.
if initial_input.dim() == 2:
initial_input = initial_input.unsqueeze(0)
initial = initial_input.squeeze().cpu().numpy().tolist()
predictions = []
for _ in range(prediction_steps): # Predict prediction_steps steps ahead
pred, h_state = model(initial_input, h_state)
predictions.append(pred.item())
initial_input = pred.unsqueeze(0) # Ensure pred has the same dimensions as test_input[:, 1:, :]
return initial, predictions
|
运行模型
我们把上述代码整合到一个文件中,在main()
函数中调用上述函数。和之前类似地,我们添加一些命令行参数来控制模型的训练和测试。完整的代码参考我的GitHub仓库https://github.com/jin-li/pytorch-tutorial中T04_series_rnn
文件夹里的time_series_rnn.py
。
-
首先我们生成正弦函数的数据集:
1
|
python SineWaveDataset.py
|
-
然后我们训练模型,用模型来预测一个正弦函数的序列,并把预测结果和真实结果画在一起:
1
|
python time_series_rnn.py --plot
|
这个代码在GPU上运行了约20秒,显存占用约208M;如果使用CPU,则运行时间增加到约1分30秒。运行完这个命令之后,我们可以看到模型预测的结果如下图:
其中前80个数据点是一个初始序列,后面的150个数据点是模型预测的结果。可以看到,模型的预测结果和真实结果非常接近。
总结
在本文中,我们介绍了如何使用PyTorch实现一个简单的循环神经网络(RNN)来预测正弦函数的序列。
本文中的例子我使用不同的随机数种子运行了多次,在给定的参数下,上面展示的结果是其中预测得比较好的一次,随机数种子是18(在GitHub上的代码中已将其设为默认值)。读者可以根据我们在本系列的第二篇文章“实例学PyTorch(2):MNIST手写数字识别(二)——神经网络中的参数选择”中介绍的参数选择方法来调整模型的参数,看看能否得到更好的结果。
有了本文这个简单的例子作为基础,我们将在下一篇文章中介绍如何使用其他的神经网络,例如长短期记忆网络(Long Short-Term Memory,LSTM)、门控循环单元(Gated Recurrent Unit,GRU)等来预测正弦函数的序列。