手撕 Transformer (5):模型构建

张开发
2026/4/12 16:58:57 15 分钟阅读

分享文章

手撕 Transformer (5):模型构建
1 编码器-解码器的代码实现class EncoderDecoder(nn.Module):def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):encoder: 编码器模块用于处理源序列decoder: 解码器模块用于生成目标序列src_embed: 源序列嵌入层将源序列tokens转换为向量表示tgt_embed: 目标序列嵌入层将目标序列tokens转换为向量表示generator: 生成器模块将解码器输出转换为目标词汇表上的概率分布super(EncoderDecoder, self).__init__()self.encoder encoderself.decoder decoderself.src_embed src_embedself.tgt_embed tgt_embedself.generator generatordef forward(self, src, tgt, src_mask, tgt_mask):Args:src: 源序列形状为 [batch_size, src_seq_len]tgt: 目标序列形状为 [batch_size, tgt_seq_len]src_mask: 源序列的掩码形状为 [batch_size, 1, src_seq_len]用于屏蔽填充位置tgt_mask: 目标序列的掩码形状为 [batch_size, tgt_seq_len, tgt_seq_len]用于屏蔽填充位置和未来位置Return:解码器的输出形状为 [batch_size, tgt_seq_len, d_model]# 先对源序列进行编码得到编码器输出# 然后将编码器输出、源序列掩码、目标序列和目标序列掩码传递给解码器return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)def encode(self, src, src_mask):Args:src: 源序列形状为 [batch_size, src_seq_len]src_mask: 源序列的掩码形状为 [batch_size, 1, src_seq_len]用于屏蔽填充位置Return:编码器的输出形状为 [batch_size, src_seq_len, d_model]作为解码器的输入# (1) 对源序列进行嵌入处理将tokens转换为向量表示# (2) 将嵌入结果和源序列掩码传递给编码器# (3) 返回编码器的输出作为解码器的“记忆”输入return self.encoder(self.src_embed(src), src_mask)def decode(self, memory, src_mask, tgt, tgt_mask):Args:memory: 编码器的输出src_mask: 源序列的掩码tgt: 目标序列tgt_mask: 目标序列的掩码# (1) 对目标序列进行嵌入处理将tokens转换为向量表示# (2) 将嵌入结果、编码器输出、源序列掩码、目标序列掩码传给解码器# (3) 返回解码器的输出return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)实例化验证一下代码实现vocab_size 1000d_model 512encoder endecoder desource_embed nn.Embedding(vocab_size, d_model)target_embed nn.Embedding(vocab_size, d_model)generator gen# 假设源数据与目标数据相同实际中并不相同source target Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))# 假设src_mask与tgt_mask相同实际中并不相同source_mask target_mask Variable(torch.zeros(2, 4, 4))ed EncoderDecoder(encoder, decoder, source_embed, target_embed, generator)ed_result ed(source, target, source_mask, target_mask)print(ed_result)print(ed_result.shape)运行结果tensor([[[-0.6909, -0.9724, 0.6361, ..., 0.0525, 0.5336, -0.4723],[-1.7389, -1.2510, 0.7788, ..., -0.0156, 0.0258, -0.9280],[-1.7369, -1.2625, 0.7260, ..., -0.4470, -1.0672, 0.1064],[-1.3041, -1.3951, -0.0759, ..., -0.1239, -0.6066, -0.8090]],[[-0.0288, 0.2606, 1.1938, ..., 0.3468, -0.5532, -1.2409],[-0.2443, -0.1397, 0.7443, ..., -0.2610, 0.6653, -1.7663],[-0.9244, -0.9061, 1.7313, ..., 1.1313, 1.1599, -2.4360],[ 0.0872, -0.6195, 0.7045, ..., -0.0286, 0.4245, -0.6196]]],grad_fnAddBackward0)torch.Size([2, 4, 512])2 完整模型构建基于上述编码器-解码器结构构建用于训练的、完整的 Transformer 模型。def make_model(src_vocab, tgt_vocab, N6, d_model512, d_ff2048, h8, dropout0.1):Args:src_vocab: 源语言词汇表大小tgt_vocab: 目标语言词汇表大小N: 编码器和解码器的层数d_model: 模型的隐藏维度d_ff: 前馈网络的隐藏层维度h: 多头注意力的头数dropout: Dropout 丢弃率# 创建一个深拷贝函数用于复制组件实例避免共享参数c copy.deepcopy# 初始化多头注意力机制处理不同子空间的注意力计算attn MultiHeadedAttention(h, d_model)# 初始化位置前馈网络对每个位置的表示进行非线性变换ff PositionwiseFeedForward(d_model, d_ff, dropout)# 初始化位置编码为输入序列添加位置信息position PositionalEncoding(d_model, dropout)# 构建完整模型model EncoderDecoder(# 编码器由 N 个 EncoderLayer 组成每个层包含自注意力机制(attn)和前馈网络(ff)Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),# 解码器由 N 个 DecoderLayer 组成每个层包含自注意力机制(attn)、编码器-解码器注意力机制(attn)和前馈网络(ff)Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N),# 源嵌入将源语言词汇转换为词向量并添加位置编码nn.Sequential(Embeddings(d_model, src_vocab), c(position)),# 目标嵌入将目标语言词汇转换为词向量并添加位置编码nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)),# 生成器将解码器输出转换为目标词汇表的概率分布Generator(d_model, tgt_vocab),)# 参数初始化# 使用 Xavier 均匀分布初始化所有维度大于 1 的参数# 这有助于模型的稳定训练和收敛for p in model.parameters():if p.dim() 1:nn.init.xavier_uniform_(p)return model验证一下完整模型的代码实现source_vocab 11target_vocab 11N 6res make_model(source_vocab, target_vocab, N)print(res)运行结果EncoderDecoder((encoder): Encoder((layers): ModuleList((0-5): 6 x EncoderLayer((self_attn): MultiHeadedAttention((linears): ModuleList((0-3): 4 x Linear(in_features512, out_features512, biasTrue))(dropout): Dropout(p0.1, inplaceFalse))(feed_forward): PositionwiseFeedForward((w_1): Linear(in_features512, out_features2048, biasTrue)(w_2): Linear(in_features2048, out_features512, biasTrue)(dropout): Dropout(p0.1, inplaceFalse))(sublayer): ModuleList((0-1): 2 x SublayerConnection((norm): LayerNorm()(dropout): Dropout(p0.1, inplaceFalse)))))(norm): LayerNorm())(decoder): Decoder((layers): ModuleList((0-5): 6 x DecoderLayer((self_attn): MultiHeadedAttention((linears): ModuleList((0-3): 4 x Linear(in_features512, out_features512, biasTrue))(dropout): Dropout(p0.1, inplaceFalse))(src_attn): MultiHeadedAttention((linears): ModuleList((0-3): 4 x Linear(in_features512, out_features512, biasTrue))(dropout): Dropout(p0.1, inplaceFalse))(feed_forward): PositionwiseFeedForward((w_1): Linear(in_features512, out_features2048, biasTrue)(w_2): Linear(in_features2048, out_features512, biasTrue)(dropout): Dropout(p0.1, inplaceFalse))(sublayer): ModuleList((0-2): 3 x SublayerConnection((norm): LayerNorm()(dropout): Dropout(p0.1, inplaceFalse)))))(norm): LayerNorm())(src_embed): Sequential((0): Embeddings((lut): Embedding(11, 512))(1): PositionalEncoding((dropout): Dropout(p0.1, inplaceFalse)))(tgt_embed): Sequential((0): Embeddings((lut): Embedding(11, 512))(1): PositionalEncoding((dropout): Dropout(p0.1, inplaceFalse)))(generator): Generator((proj): Linear(in_features512, out_features11, biasTrue)))3 推理测试的代码实现def inference_test():# 创建一个小型 Transformer 模型词汇表大小为 11编码器和解码器各 2 层test_model make_model(11, 11, 2)# 将模型设置为评估模式test_model.eval()# 创建一个长度为 10 的源序列src torch.LongTensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]])# 创建一个全 1 的源掩码表示所有位置都是有效的非填充src_mask torch.ones(1, 1, 10)# 使用模型的 encode 方法对源序列进行编码得到编码器的输出 memorymemory test_model.encode(src, src_mask)# 初始化目标序列 ys开始时只包含一个起始标记0ys torch.zeros(1, 1).type_as(src)# 循环 9 次每次生成一个词for i in range(9):# 使用 decode 方法传入编码器输出、源掩码、目标序列、未来信息掩码out test_model.decode(memory, src_mask, ys, subsequent_mask(ys.size(1)).type_as(src.data))# 从解码器输出中取出最后一个位置的表示通过 generator 生成概率分布prob test_model.generator(out[:, -1])# 选择概率最大的词作为下一个词_, next_word torch.max(prob, dim1)next_word next_word.data[0]# 将新生成的词添加到 ys 中形成新的输入序列ys torch.cat([ys, torch.empty(1, 1).type_as(src.data).fill_(next_word)], dim1)# 打印最终生成的序列 ysprint(Example Untrained Model Prediction:, ys)# 重复执行测试def run_tests():for _ in range(10):inference_test()运行结果Example Untrained Model Prediction: tensor([[ 0, 3, 10, 10, 10, 10, 10, 10, 3, 5]])Example Untrained Model Prediction: tensor([[0, 2, 6, 2, 6, 2, 6, 2, 6, 2]])Example Untrained Model Prediction: tensor([[0, 5, 3, 2, 2, 2, 2, 2, 2, 2]])Example Untrained Model Prediction: tensor([[ 0, 10, 7, 10, 7, 10, 7, 10, 7, 10]])Example Untrained Model Prediction: tensor([[ 0, 3, 10, 5, 10, 5, 10, 5, 10, 5]])Example Untrained Model Prediction: tensor([[0, 7, 2, 4, 7, 2, 0, 7, 2, 3]])Example Untrained Model Prediction: tensor([[ 0, 1, 2, 3, 4, 2, 3, 4, 10, 2]])Example Untrained Model Prediction: tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])Example Untrained Model Prediction: tensor([[ 0, 10, 7, 4, 6, 10, 7, 4, 4, 4]])Example Untrained Model Prediction: tensor([[0, 3, 6, 9, 3, 6, 9, 3, 6, 9]])

更多文章