卷积神经网络可以有效地处理空间信息,那么本章的循环神经网络(recurrent neural network RNN)则可以更好地处理序列信息。循环神经网络通过引入状态变量存储过去的信息和当前的输入,从而可以确定当前的输出
序列模型
时间动力学
统计工具
处理序列数据需要统计工具和新的深度神经网络架构
以股票交易为例,一个交易员想在t日的股市中表现良好,于是通过以下途径预测价格$x_t$
$$
x_{t}\sim P(x_{t}\mid x_{t-1},\ldots,x_{1}).
$$
自回归模型
第一种策略,假设在现实情况下相当长的序列$x_{t−1}, . . . , x_1$可能是不必要的,因此我们只需要满足某个长度为τ的时间跨度,即使用观测序列$x_{t−1}, . . . , x_{t−\tau} $。当下获得的最直接的好处就是参数的数量总是不变的,至少在t > τ时如此,这就使我们能够训练一个上面提及的深度网络。这种模型被称为自回归模型(autoregressive models),因为它们是对自己执行回归
第二种策略,如图8.1.2所示,是保留一些对过去观测的总结ht,并且同时更新预测ˆxt和总结ht。这就产生了基于$\hat{x_t} = P(x_t | h_t)$估计xt,以及公式$h_t = g(h_t−1, x_t−1)$更新的模型。由于ht从未被观测到,这类模型也被称为隐变量自回归模型(latent autoregressive models)
一个常见的假设是虽然特定值xt可能会改变,但是序列本身的动力学不会改变
统计学家称不变的动力学为静止的(stationary)。因此,整个序列的估计值都将通过以下的方式获得:
$$
P(x_1,\ldots,x_T)=\prod_{t=1}^TP(x_t\mid x_{t-1},\ldots,x_1).
$$
马尔可夫模型
自回归模型法的近似是正确的,就可以说序列满足马尔可夫条件
$\tau=1$得到一个一阶马尔可夫模型(first‐order Markov model)
$$
P(x_1,\ldots,x_T)=\prod_{t=1}^TP(x_t\mid x_{t-1})\text{当}P(x_1\mid x_0)=P(x_1).
$$
$$
\begin{aligned}
P(x_{t+1}\mid x_{t-1})& =\frac{\sum_{x_{t}}P(x_{t+1},x_{t},x_{t-1})}{P(x_{t-1})} \
&=\frac{\sum_{x_t}P(x_{t+1}\mid x_t,x_{t-1})P(x_t,x_{t-1})}{P(x_{t-1})} \
&=\sum_{m}P(x_{t+1}\mid x_{t})P(x_{t}\mid x_{t-1})
\end{aligned}
$$
因果关系
虽然上面的公式可以倒序展开,但是时间不能反过来,时间是向前的
训练
def train(net,train_iter,loss,epochs,lr):
trainer=torch.optim.Adam(net.parameters(),lr)
for epoch in range(epochs):
print(f'epoch{epoch+1}')
ls=0
for X,y in train_iter:
trainer.zero_grad()
l = loss(net(X), y)
l.sum().backward()
trainer.step()
ls+=l.sum().item()
print(f'mean_loss:{ls/600}')
net=get_net()
预测
onestep_preds=net(features)
文本预处理
读取数据集
import requests
import re
url='https://d2l-data.s3-accelerate.amazonaws.com/timemachine.txt'
path='time_machine.txt'
response=requests.get(url,verify=False)
with open("time_machine.txt",'w') as f:
f.write(response.text)
f.close()
def read_time_machine():
with open("time_machine.txt",'r') as f:
lines=f.readlines()
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
lines=read_time_machine()
print(f'# 文本总行数: {len(lines)}')
print(lines[0])
print(lines[10])
# 文本总行数: 3221
the time machine by h g wells
twinkled and his usually pale face was flushed and animated the
词元化
def tokenize(lines,token='word'):
if token=='word':
return [line.split() for line in lines]
elif token=='char':
return [list(line) for line in lines]
else:
print('错误:未知词源类型:'+token)
tokens=tokenize(lines)
for i in range(11):
print(tokens[i])
词表
将训练集中的所有文档合并在一起,对它们的唯一词元进行统计,得到的统计结果称之为语料
class Vocab: #@save
"""文本词表"""
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率排序
counter = count_corpus(tokens)
self._token_freqs = sorted(counter.items(), key=lambda x: x[1],reverse=True)
# 未知词元的索引为0
self.idx_to_token = ['<unk>'] + reserved_tokens
self.token_to_idx = {token: idx
for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
if freq < min_freq:
break
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
def __len__(self):
return len(self.idx_to_token)
def __getitem__(self, tokens):
if not isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
def to_tokens(self, indices):
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]
@property
def unk(self): # 未知词元的索引为0
return 0
@property
def token_freqs(self):
return self._token_freqs
def count_corpus(tokens): #@save
"""统计词元的频率"""
# 这里的tokens是1D列表或2D列表
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
return collections.Counter(tokens)
整合所有功能
使用字符(而不是单词)实现文本词元化
def load_corpus_time_machine(max_tokens=-1): #@save
"""返回时光机器数据集的词元索引列表和词表"""
lines = read_time_machine()
tokens = tokenize(lines, 'char')
vocab = Vocab(tokens)
# 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
# 所以将所有文本行展平到一个列表中
corpus = [vocab[token] for line in tokens for token in line]
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab
corpus, vocab = load_corpus_time_machine()
len(corpus), len(vocab)
语言模型和数据集
学习语言模型
概率模型出发
马尔可夫模型与n元语法
涉及一个、两个和三个变量的概率公式分别被称为一元语法(unigram)、二元语法(bigram)和三元语法(trigram)模型。
自然语言统计
最流行的词看起来很无聊,这些词通常被称为停用词
tokens = d2l.tokenize(d2l.read_time_machine())
# 因为每个文本行不一定是一个句子或一个段落,因此我们把所有文本行拼接到一起
corpus = [token for line in tokens for token in line]
vocab = d2l.Vocab(corpus)
vocab.token_freqs[:10]
[('the', 2261),
('i', 1267),
('and', 1245),
('of', 1155),
('a', 816),
('to', 695),
('was', 552),
('in', 541),
('that', 443),
('my', 440)]
还有个明显的问题是词频衰减的速度相当地快
freqs = [freq for token, freq in vocab.token_freqs]
d2l.plot(freqs, xlabel='token: x', ylabel='frequency: n(x)',
xscale='log', yscale='log')
单词的频率满足齐普夫定律
$$
n_{i}\propto\frac{1}{i^{\alpha}}
$$
读取长序列数据
从随机偏移量开始划分序列,以同时获得覆盖性(coverage)和随机性(randomness)
随机采样
在随机采样中,每个样本都是在原始的长序列上任意捕获的子序列。在迭代过程中,来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻。对于语言建模,目标是基于到目前为止看到的词元来预测下一个词元,因此标签是移位了一个词元的原始序列。
吃个栗子
从数据中随机生成一个小批量。在这里,参数batch_size
指定了每个小批量中子序列样
本的数目,参数num_steps
是每个子序列中预定义的时间步数。
def seq_data_iter_random(corpus,batch_size,num_steps):
corpus=corpus[random.randint(0,num_steps-1):]
num_subseqs=(len(corpus)-1)//num_steps
initial_indices=list(range(0,num_subseqs*num_steps,num_steps))
random.shuffle(initial_indices)
def data(pos):
return corpus[pos:pos+num_steps]
num_batches=num_subseqs//batch_size
for i in range(0,batch_size*num_batches,batch_size):
initial_indices_per_batch=initial_indices[i:i+batch_size]
X=[data(j) for j in initial_indices_per_batch]
Y=[data(j+1) for j in initial_indices_per_batch]
yield torch.Tensor(X),torch.Tensor(Y)
顺序分区
除了对原始序列可以随机抽样外,还可以保证两个相邻的小批量中的子序列在原始序列上也是相邻的
这种策略在基于小批量的迭代过程中保留了拆分的子序列的顺序,因此称为顺序分区
def seq_data_iter_sequential(corpus, batch_size, num_steps): #@save
"""使用顺序分区生成一个小批量子序列"""
# 从随机偏移量开始划分序列
offset = random.randint(0, num_steps)
num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
Xs = torch.tensor(corpus[offset: offset + num_tokens])
Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
num_batches = Xs.shape[1] // num_steps
for i in range(0, num_steps * num_batches, num_steps):
X = Xs[:, i: i + num_steps]
Y = Ys[:, i: i + num_steps]
yield X, Y
循环神经网络
循环神经网络(recurrent neural networks,RNNs)是具有隐状态的神经网络
无隐状态的神经网络
MLP
有隐状态的循环神经网络
当前时间步隐藏变量由当前时间步的输入与前一个时间步的隐藏变量一起计算得出
$$
\mathbf{H}{t}=\phi(\mathbf{X}{t}\mathbf{W}{xh}+\mathbf{H}{t-1}\mathbf{W}{hh}+\mathbf{b}{h})
$$
在任意时间步t,隐状态的计算可以被视为:
- 拼接当前时间步t的输入$X_t$和前一时间步$t − 1$的隐状态$H_{t−1}$;
- 将拼接的结果送入带有激活函数$\phi$的全连接层。全连接层的输出是当前时间步t的隐状态$H_t$
基于循环神经网络的字符级语言模型
使用当前的和先前的字符预测下一个字符
困惑度 Perplexity
通过一个序列中所有的n个词元的交叉熵损失的平均值来衡量
$$
{\frac{1}{n}}\sum_{t=1}^{n}-\log P(x_{t}\mid x_{t-1},\ldots,x_{1})
$$
困惑度
$$
\exp({\frac{1}{n}}\sum_{t=1}^{n}-\log P(x_{t}\mid x_{t-1},\ldots,x_{1}))
$$
循环神经网络的从零开始实现
one-hot编码(独热编码)
采样的小批量数据形状是二维张量:(批量大小,时间步数)
one_hot函数将这样一个小批量数据转换成三维张量,张量的最后一个维度等于词表大小(len(vocab))。我们经常转换输入的维度,以便获得形状为(时间步数,批量大小,词表大小)的输出。
初始化模型参数
隐藏单元数num_hiddens是一个可调的超参数。当训练语言模型时,输入和输出来自相同的词表
def get_params(vocab_size, num_hiddens, device):
num_inputs = num_outputs = vocab_size
def normal(shape):
return torch.randn(size=shape, device=device) * 0.01
# 隐藏层参数
W_xh = normal((num_inputs, num_hiddens))
W_hh = normal((num_hiddens, num_hiddens))
b_h = torch.zeros(num_hiddens, device=device)
# 输出层参数
W_hq = normal((num_hiddens, num_outputs))
b_q = torch.zeros(num_outputs, device=device)
# 附加梯度
params = [W_xh, W_hh, b_h, W_hq, b_q]
for param in params:
param.requires_grad_(True)
return params
循环神经网络
先需要一个init_rnn_state函数在初始化时返回隐状态
def init_rnn_state(batch_size, num_hiddens, device):
return (torch.zeros((batch_size, num_hiddens), device=device), )
定义RNN模块
def rnn(inputs, state, params):
# inputs的形状:(时间步数量,批量大小,词表大小)
W_xh, W_hh, b_h, W_hq, b_q = params
H, = state
outputs = []
# X的形状:(批量大小,词表大小)
for X in inputs:
H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
Y = torch.mm(H, W_hq) + b_q
outputs.append(Y)
return torch.cat(outputs, dim=0), (H,)
class RNNModelScratch: #@save
"""从零开始实现的循环神经网络模型"""
def __init__(self, vocab_size, num_hiddens, device,
get_params, init_state, forward_fn):
self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
self.params = get_params(vocab_size, num_hiddens, device)
self.init_state, self.forward_fn = init_state, forward_fn
def __call__(self, X, state):
X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
return self.forward_fn(X, state, self.params)
def begin_state(self, batch_size, device):
return self.init_state(batch_size, self.num_hiddens, device)
预测
首先定义预测函数来生成prefix之后的新字符,其中的prefix是一个用户提供的包含多个字符的字符串。在循环遍历prefix中的开始字符时,不断地将隐状态传递到下一个时间步,但是不生成任何输出。这被称为预热(warm‐up)期,因为在此期间模型会自我更新(例如,更新隐状态),但不会进行预测。预热期结束后,隐状态的值通常比刚开始的初始值更适合预测,从而预测字符并输出它们
def predict_ch8(prefix, num_preds, net, vocab, device): #@save
"""在prefix后面生成新字符"""
state = net.begin_state(batch_size=1, device=device)
outputs = [vocab[prefix[0]]]
get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
for y in prefix[1:]: # 预热期
_, state = net(get_input(), state)
outputs.append(vocab[y])
for _ in range(num_preds): # 预测num_preds步
y, state = net(get_input(), state)
outputs.append(int(y.argmax(dim=1).reshape(1)))
return ''.join([vocab.idx_to_token[i] for i in outputs])
梯度裁剪
$$
\mathbf{g}\leftarrow\min\left(1,\frac{\theta}{|\mathbf{g}|}\right)\mathbf{g}
$$
一个流行的替代方案是通过将梯度g投影回给定半径(例如θ)的球来裁剪梯度g。
def grad_clipping(net, theta): #@save
"""裁剪梯度"""
if isinstance(net, nn.Module):
params = [p for p in net.parameters() if p.requires_grad]
else:
params = net.params
norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm
训练
#@save
def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
"""训练网络一个迭代周期(定义见第8章)"""
state, timer = None, d2l.Timer()
metric = d2l.Accumulator(2) # 训练损失之和,词元数量
for X, Y in train_iter:
if state is None or use_random_iter:
# 在第一次迭代或使用随机抽样时初始化state
state = net.begin_state(batch_size=X.shape[0], device=device)
else:
if isinstance(net, nn.Module) and not isinstance(state, tuple):
# state对于nn.GRU是个张量
state.detach_()
else:
# state对于nn.LSTM或对于我们从零开始实现的模型是个张量
for s in state:
s.detach_()
y = Y.T.reshape(-1)
X, y = X.to(device), y.to(device)
y_hat, state = net(X, state)
l = loss(y_hat, y.long()).mean()
if isinstance(updater, torch.optim.Optimizer):
updater.zero_grad()
l.backward()
grad_clipping(net, 1)
updater.step()
else:
l.backward()
grad_clipping(net, 1)
# 因为已经调用了mean函数
updater(batch_size=1)
metric.add(l * y.numel(), y.numel())
return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
循环神经网络的简洁实现
定义模型
构造一个具有256个隐藏单元的单隐藏层的循环神经网络层rnn_layer
num_hiddens = 256
rnn_layer = nn.RNN(len(vocab), num_hiddens)
定义一个完整的循环神经网络模型
#@save
class RNNModel(nn.Module):
"""循环神经网络模型"""
def __init__(self, rnn_layer, vocab_size, **kwargs):
super(RNNModel, self).__init__(**kwargs)
self.rnn = rnn_layer
self.vocab_size = vocab_size
self.num_hiddens = self.rnn.hidden_size
# 如果RNN是双向的(之后将介绍),num_directions应该是2,否则应该是1
if not self.rnn.bidirectional:
self.num_directions = 1
self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
else:
self.num_directions = 2
self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)
def forward(self, inputs, state):
X = F.one_hot(inputs.T.long(), self.vocab_size)
X = X.to(torch.float32)
Y, state = self.rnn(X, state)
# 全连接层首先将Y的形状改为(时间步数*批量大小,隐藏单元数)
# 它的输出形状是(时间步数*批量大小,词表大小)。
output = self.linear(Y.reshape((-1, Y.shape[-1])))
return output, state
def begin_state(self, device, batch_size=1):
if not isinstance(self.rnn, nn.LSTM):
# nn.GRU以张量作为隐状态
return torch.zeros((self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens),
device=device)
else:
# nn.LSTM以元组作为隐状态
return (torch.zeros((
self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens), device=device),
torch.zeros((
self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens), device=device))
训练与预测
num_epochs, lr = 500, 1
d2l.train_ch8(net, train_iter, vocab, lr, num_epochs, device)