函数:返回GPU

def try_gpu(i=0):  #@save"""如果存在,则返回gpu(i),否则返回cpu()"""if torch.cuda.device_count() >= i + 1:  # 如果存在第 i 个 GPUreturn torch.device(f'cuda:{i}')    # 返回第 i 个 GPU 设备return torch.device('cpu') # 若系统中无足够的GPU设备(即GPU数量<i+1),则返回CPU设备def try_all_gpus():  #@save"""返回所有可用的GPU,如果没有GPU,则返回[cpu(),]"""devices = [torch.device(f'cuda:{i}')for i in range(torch.cuda.device_count())]# 如果存在可用的 GPU,则返回一个包含所有 GPU 设备的列表return devices if devices else [torch.device('cpu')]
# from common import try_gpu, try_all_gpus
print(f"{try_gpu(), try_gpu(10), try_all_gpus()}")
print(f"默认尝试返回第1个GPU设备:{try_gpu()}")
print(f"尝试返回第11个GPU设备:{try_gpu(10)}")
print(f"返回所有可用的GPU:{try_all_gpus()}")

函数:生成数据集(生成 “符合线性关系 y=Xw+b+噪声” 的合成数据集)synthetic_data

'''(与 线性神经网络 的一样)
# 生成 “符合线性关系 y=Xw+b+噪声” 的合成数据集
# w: 权重向量(决定线性关系的斜率)
# b: 偏置项(决定线性关系的截距)
# num_examples: 要生成的样本数量
在指定正态分布中随机生成特征矩阵X,
然后根据传入的权重和偏置再加上随机生成的噪声计算得到标签向量y。
'''
def synthetic_data(w, b, num_examples):  # @save"""生成y=Xw+b+噪声"""# 生成一个形状为 (num_examples, len(w)) 的矩阵,每个元素从均值为0、标准差为1的正态分布中随机采样X = torch.normal(0, 1, (num_examples, len(w)))print(f"X的形状{X.shape}")y = torch.matmul(X, w) + b  # 计算线性部分 Xw + by += torch.normal(0, 0.01, y.shape)  # 添加噪声(均值为0,标准差为0.01的正态分布)使数据更接近真实场景(避免完全线性可分)return X, y.reshape((-1, 1))  # 返回特征矩阵X和标签向量y, y.reshape((-1, 1)) 确保y是列向量(形状为 (num_examples, 1))

使用示例:

# 定义真实的权重 w = [2, -3.4] 和偏置 b = 4.2
true_w = torch.tensor([2, -3.4])
true_b = 4.2# features: 形状为 (1000, 2) 的矩阵,每行皆包含一个二维数据样本
# labels: 形状为 (1000, 1) 的向量,每行皆包含一维数据标签值(一个标量)
# 标签由线性关系 y = 2*x1 - 3.4*x2 + 4.2 + 噪声 生成
features, labels = synthetic_data(true_w, true_b, 1000) # 生成1000个样本
print('features:', features[0],'\nlabel:', labels[0])
# 生成数据
n_train, n_test, num_inputs, batch_size = 20, 100, 200, 5
true_w, true_b = torch.ones((num_inputs, 1)) * 0.01, 0.05train_data = common.synthetic_data(true_w, true_b, n_train)
train_iter = common.load_array(train_data, batch_size)test_data = common.synthetic_data(true_w, true_b, n_test)
test_iter = common.load_array(test_data, batch_size, is_train=False)

函数:绘图函数plot & 设置轴属性set_axes

# 封装了 Matplotlib 轴属性的常用设置
def set_axes(axes, xlabel=None, ylabel=None, xlim=None, ylim=None,xscale='linear', yscale='linear', legend=None):"""设置绘图的轴属性"""if xlabel: axes.set_xlabel(xlabel)  # 设置x轴标签(如果提供)if ylabel: axes.set_ylabel(ylabel)  # 设置y轴标签(如果提供)if xlim: axes.set_xlim(xlim)        # 设置x轴范围(如 [0, 10])(如果提供)if ylim: axes.set_ylim(ylim)        # 设置y轴范围(如 [0, 10])(如果提供)axes.set_xscale(xscale)         # 设置x轴刻度类型(线性linear或对数log)axes.set_yscale(yscale)         # 设置y轴刻度类型(线性linear或对数log)if legend: axes.legend(legend)  # 添加图例文本列表(如 ['train', 'test'])(如果提供)axes.grid(True)                 # 显示背景网格线,提升可读性# 绘图函数
def plot(X, Y=None, xlabel=None, ylabel=None, legend=None,xlim=None, ylim=None, xscale='linear', yscale='linear',fmts=('-', 'm--', 'g-.', 'r:', 'c-.', 'y-', 'k:'), figsize=(5, 2.5), axes=None):"""绘制数据点"""if legend is None: legend = [] # 默认图例为空列表(避免后续判断报错)# 创建画布(如果未提供外部axes)plt.figure(figsize=figsize)axes = axes if axes is not None else plt.gca()  # 获取当前轴# 如果X有一个轴,输出True。判断输入数据是否为一维(列表或一维数组)def has_one_axis(X):return (hasattr(X, "ndim") and X.ndim == 1 or isinstance(X, list)and not hasattr(X[0], "__len__"))# 标准化X和Y的形状:确保X和Y都是列表的列表(支持多条曲线)if has_one_axis(X):X = [X]  # 将一维X转换为二维(单条曲线)if Y is None: # 如果未提供Y,则X是Y的值,X轴为索引(如 plot(y))X, Y = [[]] * len(X), Xelif has_one_axis(Y):Y = [Y] # 将一维Y转换为二维if len(X) != len(Y): # 如果X和Y数量不匹配,复制X以匹配Y的数量X = X * len(Y)axes.clear() # 清空当前轴(避免重叠绘图)for x, y, fmt in zip(X, Y, fmts):if len(x): axes.plot(x, y, fmt) # 如果提供了x和y,绘制xy曲线else: axes.plot(y, fmt) # 如果未提供x,绘制y关于索引的曲线(如 plot(y))set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend) # 设置轴属性# 自动调整布局并显示图像plt.tight_layout()  # 自动调整子图参数,使之填充整个图像区域,防止标签溢出plt.show()

使用示例:

T = 1000  # 总共产生1000个点
time = torch.arange(1, T + 1, dtype=torch.float32)
x = torch.sin(0.01 * time) + torch.normal(0, 0.2, (T,))
common.plot(time, [x], 'time', 'x', xlim=[1, 1000], figsize=(6, 3))
freqs = [freq for token, freq in vocab.token_freqs] # 词频(降序)
# 对比 三种模型中的词元频率:一元语法、二元语法和三元语法
bigram_freqs = [freq for token, freq in bigram_vocab.token_freqs]
trigram_freqs = [freq for token, freq in trigram_vocab.token_freqs]
common.plot([freqs, bigram_freqs, trigram_freqs], xlabel='token: x',ylabel='frequency: n(x)', xscale='log', yscale='log',legend=['unigram', 'bigram', 'trigram'])

函数:加载数据集 load_array

from torch.utils.data import DataLoader, TensorDataset
import torch
def load_array(data_arrays, batch_size, is_train=True):dataset = TensorDataset(*data_arrays)return DataLoader(dataset, batch_size, shuffle=is_train)

使用示例:

    train_iter = common.load_array((train_features, train_labels.reshape(-1,1)),batch_size) # 将数据加载为可迭代的批量数据test_iter = common.load_array((test_features, test_labels.reshape(-1,1)),batch_size, is_train=False) # is_train=False表示测试集不需要打乱数据

函数:自定义优化算法 sgd

# 定义优化算法
'''
# 实现小批量随机梯度下降(Stochastic Gradient Descent, SGD)优化算法params: 需更新的参数列表。通常为神经网络的可训练权重w和偏置blr: 学习率(learning rate),是一个标量,用于控制每次参数更新的步长
batch_size: 批量大小,用于调整梯度更新的幅度
'''
def sgd(params, lr, batch_size):  #@save"""小批量随机梯度下降"""with torch.no_grad(): # 禁用梯度计算,所有的操作都不会被记录到计算图中,因此不会影响自动微分的过程。参数更新操作时必须的,因为参数更新本身不应该被微分for param in params:# 计算参数更新的步长, /batch_size 是为了对小批量数据的梯度进行平均param -= lr * param.grad / batch_size # (param -= ...是将计算出的更新步长应用到参数上,从而更新参数)param.grad.zero_() # 将参数的梯度手动清零(因为梯度是累积的,以免影响下一次的梯度计算)
sgd([w, b], lr, batch_size)  # 使用参数的梯度更新参数 # 具体使用见线性回归的从0开始实现
sgd(net.params, lr, batch_size) # 具体使用见train_ch8函数定义

函数:预测函数—生成prefix之后的新字符 predict_ch8

# 预测函数:生成prefix之后的新字符
def predict_ch8(prefix, num_preds, net, vocab, device):  #@save"""在prefix后面生成新字符"""state = net.begin_state(batch_size=1, device=device) # 初始化隐藏状态,批量大小为1 (单序列预测)# 将prefix的第一个字符转换为索引# prefix[0]第一个字符,vocab[prefix[0]]获取第一个字符的索引outputs = [vocab[prefix[0]]]  # 存储生成的索引(用列表存储)# 辅助函数:获取当前输入 (形状为 (1, 1))# [outputs[-1]]获取索引列表中的最后一个,即 刚刚存进去的那个,也就是当前个get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))# 预热期:处理前缀中的剩余字符# 模型自我更新(例如,更新隐状态),但不进行预测for y in prefix[1:]: # 遍历前缀中除第一个字符外的所有字符_, state = net(get_input(), state)  # 前向传播(忽略输出,只更新隐藏状态)(把类当作函数使用,调用__call__)outputs.append(vocab[y])            # 将当前字符添加到输出列表# 预测阶段:生成新字符,预测num_preds步# 预热期结束后,隐状态的值比刚开始的初始值更适合预测,现在开始预测字符并输出for _ in range(num_preds):  # 预测指定数量的字符# y 形状为 (1, vocab_size)(批量大小=1)y, state = net(get_input(), state)          # 前向传播,获取预测输出# argmax(dim=1) 获取概率最高的词索引# int(y.argmax(dim=1).reshape(1)) 转换为 Python整数pred_idx = int(y.argmax(dim=1).reshape(1))  # 从输出中选择概率最高的索引outputs.append(pred_idx)                    # 将预测索引添加到输出列表# 将索引序列 转换回 字符序列# ''.join(...)将字符列表中的所有字符串(每个字符是一个长度为1的字符串)连接成一个字符串return ''.join([vocab.idx_to_token[i] for i in outputs])
print(f"未训练网络的情况下,测试函数基于time traveller这个前缀生成10个后续字符:\n"f"{common.predict_ch8('time traveller ', 10, net, vocab, common.try_gpu())}")# 具体使用见train_ch8函数定义 ↓# 4. 定义预测函数,用于生成 以给定前缀开头的文本(生成50个字符)predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device) # 设置预测函数

函数:计算正确预测数 accuracy

def accuracy(y_hat, y):  # @save"""计算预测正确的数量"""# len是查看矩阵的行数# y_hat.shape[1]就是取列数if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:# 第2个维度为预测标签,取最大元素y_hat = y_hat.argmax(axis=1)  # 变成一列,列中每行元素为 行里的最大值下标# 将y_hat转换为y的数据类型然后作比较,cmp函数存储bool类型cmp = y_hat.type(y.dtype) == yreturn float(cmp.type(y.dtype).sum())  # 将正确预测的数量相加

使用示例: 单周期训练函数中有调用该函数。

函数:单周期训练 train_epoch_ch3

def train_epoch_ch3(net, train_iter, loss, updater):  # @save"""训练模型一个迭代周期(定义见第3章)"""# 判断net模型是否为深度学习类型,将模型设置为训练模式if isinstance(net, torch.nn.Module):net.train()  # 要计算梯度,启用训练模式(启用Dropout/BatchNorm等训练专用层)# Accumulator(3)创建3个变量:训练损失总和、训练准确度总和、样本数metric = Accumulator(3)  # 用于跟踪训练损失、准确率和样本数for X, y in train_iter:# 计算梯度并更新参数y_hat = net(X)  # 前向传播:模型预测l = loss(y_hat, y)  # 计算损失(向量形式,每个样本一个损失值)# 判断updater是否为优化器if isinstance(updater, torch.optim.Optimizer):  # 使用PyTorch内置优化器# 使用PyTorch内置的优化器和损失函数updater.zero_grad()  # 把梯度设置为0(清除之前的梯度,避免梯度累加)l.mean().backward()  # 计算梯度(反向传播:计算梯度(对损失取平均))l.mean()表示对批次损失取平均后再求梯度updater.step()  # 自更新(根据梯度更新模型参数)else:  # 使用自定义更新逻辑# 使用定制的优化器和损失函数# 自我实现的话,l出来是向量,先求和再求梯度l.sum().backward()updater(X.shape[0])metric.add(float(l.sum()), accuracy(y_hat, y), y.numel())# 返回训练损失(平均损失)和训练精度,metric的值由Accumulator得到return metric[0] / metric[2], metric[1] / metric[2]

使用示例: 绘制器类调用示例中已包含该函数调用。 

函数:梯度裁剪 grad_clipping

''' 梯度裁剪,目的:
防止梯度爆炸
稳定训练过程
特别适合 RNN 这类容易出现梯度问题的模型
'''
def grad_clipping(net, theta):  #@save"""裁剪梯度"""# 获取需要梯度的参数if isinstance(net, nn.Module): # PyTorch 模块:获取所有可训练参数params = [p for p in net.parameters() if p.requires_grad]else: # 自定义模型:使用模型自带的参数列表params = net.params# 计算所有参数的 梯度的 L2 范数norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))if norm > theta: # 如果范数超过阈值θ,进行裁剪(将所有梯度按比例缩放)for param in params: # 保持梯度方向不变,只缩小幅度param.grad[:] *= theta / norm # 按比例缩放梯度
grad_clipping(net, 1) # 梯度裁剪,具体使用见train_epoch_ch8函数定义

函数:单迭代周期训练—用困惑度做评估指标 train_epoch_ch8

# 单迭代周期训练,以困惑度(Perplexity)作为评估指标
# 返回困惑度 和 训练速度(每秒处理的词元数量,用于衡量训练效率)
def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):"""训练网络一个迭代周期(定义见第8章)"""state, timer = None, Timer() # 初始化状态和计时器metric = Accumulator(2)  # 训练损失之和,词元数量[loss_sum, token_count]for X, Y in train_iter: # 遍历数据批次# 状态初始化:如果是第一次迭代 或 使用随机抽样if state is None or use_random_iter:# 在第一次迭代 或 使用随机抽样时初始化state (创建全零的初始隐藏状态)state = net.begin_state(batch_size=X.shape[0], device=device)else:# 否则,分离状态,断开与历史计算图的连接,即 断开计算图(防止梯度传播到前一批次)# 训练循环中,对于非随机抽样(即顺序抽样),# 希望状态能跨批次传递,但又不希望梯度从当前批次反向传播到前一批次# (因为那样会导致计算图非常长,占用大量内存且可能梯度爆炸)。# 因此每次迭代开始时,需要将状态从计算图中分离出来if isinstance(net, nn.Module) and not isinstance(state, tuple):# 如果net是PyTorch模块,且状态非元组(例如GRU,它的状态是一个张量)# state对于nn.GRU是个张量state.detach_() # 用detach_()断开 状态与历史计算图 的连接(对张量进行原地分离操作)else:# 若状态是元组(例如LSTM的状态是两个张量,或者自定义的模型状态可能是元组)# state对于nn.LSTM 或 对于从零开始实现的模型 是个张量# LSTM 状态 或 自定义模型状态是元组for s in state:s.detach_()# 准备数据:转置标签并展平# 先转置再展平 是为了 让标签的顺序与模型输出的顺序一致,从而正确计算损失# Y 原始形状: (batch_size, num_steps)# 转置后: (num_steps, batch_size); 展平后: (num_steps * batch_size)# 与 y_hat 形状 (num_steps * batch_size, vocab_size) 匹配y = Y.T.reshape(-1) # 形状: (num_steps * batch_size)X, y = X.to(device), y.to(device)   # 将数据转移到设备y_hat, state = net(X, state)        # 前向传播l = loss(y_hat, y.long()).mean()    # 计算每个词元的损失后取平均=对整个批次的加权平均# 反向传播if isinstance(updater, torch.optim.Optimizer):updater.zero_grad()             # PyTorch 优化器l.backward()grad_clipping(net, 1)           # 梯度裁剪updater.step()else: # 自定义优化器l.backward()grad_clipping(net, 1)# 因为已经调用了mean函数updater(batch_size=1) # 因为损失已经取平均,batch_size=1metric.add(l * y.numel(), y.numel()) # 累积指标:损失 * 词元数,词元数perplexity = math.exp(metric[0] / metric[1]) # 计算困惑度 = exp(平均损失)speed = metric[1] / timer.stop() # 计算训练速度 = 词元数/秒return perplexity, speed
# 具体使用见train_ch8函数定义 ↓# 5. 主训练循环:训练和预测for epoch in range(num_epochs):# 5.1 训练一个epoch,返回困惑度(ppl)和训练速度(speed)(每秒处理多少个词元)# speed 表示每秒处理的词元数量,用于衡量训练效率ppl, speed = train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter)# 5.2 每10个epoch进行一次评估和可视化if (epoch + 1) % 10 == 0: # 每10个epochprint(predict('time traveller'))  # 使用前缀'time traveller'生成文本并打印animator.add(epoch + 1, [ppl]) # 将当前epoch的困惑度添加到动画中

函数:精度评估 evaluate_accuracy

def evaluate_accuracy(net, data_iter):  # @save"""计算在指定数据集上模型的精度"""if isinstance(net, torch.nn.Module):  # 判断模型是否为深度学习模型net.eval()  # 将模型设置为评估模式# Accumulator(2)创建2个变量:正确预测的样本数总和、样本数metric = Accumulator(2)  # metric:度量,累加正确预测数、预测总数with torch.no_grad():  # 梯度不需要反向传播for X, y in data_iter:  # 每次从迭代器中拿出一个X和y# net(X):X放在net模型中进行softmax操作# numel()函数:返回数组中元素的个数,在此可以求得样本数metric.add(accuracy(net(X), y), y.numel())# metric[0, 1]分别为网络预测正确数量和总预测数量return metric[0] / metric[1]

函数:精度评估GPU版 evaluate_accuracy_gpu

"""
# 评估函数定义精度评估函数:1、将数据集复制到显存中2、通过调用accuracy计算数据集的精度
"""
def evaluate_accuracy_gpu(net, data_iter, device=None): #@save"""使用GPU计算模型在数据集上的精度"""if isinstance(net, nn.Module):  # 判断net是否属于torch.nn.Module类(模型是否为深度学习模型)net.eval()  # 设置为评估模式(关闭Dropout和BatchNorm的随机性)if not device: # 如果没有指定设备,自动使用模型参数所在的设备(如GPU)device = next(iter(net.parameters())).device # 自动检测设备# 初始化计数器:累计 正确预测的数量 和 总预测的数量metric = Accumulator(2) # metric[0]=正确数, metric[1]=总数with torch.no_grad():  # 禁用梯度计算(加速评估并减少内存占用)for X, y in data_iter:  # 每次从迭代器中拿出一个X和y# 将数据X,y移动到指定设备(如GPU)if isinstance(X, list):# BERT微调所需的(之后将介绍)X = [x.to(device) for x in X]else:X = X.to(device)y = y.to(device)# 计算预测值和准确率,并累加到metric中metric.add(accuracy(net(X), y), y.numel()) # 累加准确率和样本数# metric[0, 1]分别为网络预测正确数量和总预测数量return metric[0] / metric[1] # 计算准确率

使用示例: 用GPU训练模型函数中已包含该函数调用。

函数:用GPU训练模型 train_ch6

"""定义GPU训练函数:1、为了使用gpu,首先需要将每一小批量数据移动到指定的设备(例如GPU)上;2、使用Xavier随机初始化模型参数;3、使用交叉熵损失函数和小批量随机梯度下降。
"""
def train_ch6(net, train_iter, test_iter, num_epochs, lr, device):"""用GPU训练模型(在第六章定义)"""def init_weights(m): # 定义初始化参数,对线性层和卷积层生效if type(m) == nn.Linear or type(m) == nn.Conv2d:nn.init.xavier_uniform_(m.weight) # Xavier初始化,保持输入输出的方差稳定net.apply(init_weights)  # 应用初始化到整个网络(初始化权重)# 在设备device上进行训练print('training on', device)net.to(device)  # 模型移至指定设备(如GPU)optimizer = torch.optim.SGD(net.parameters(), lr=lr) # 定义优化器:随机梯度下降(SGD),学习率为lrloss = nn.CrossEntropyLoss()  # 交叉熵损失# 初始化动画绘图器,用于动态绘制训练曲线animator = Animator(xlabel='epoch',xlim=[1, num_epochs],legend=['train loss', 'train acc', 'test acc'])# 初始化计时器和计算总批次数timer, num_batches = Timer(), len(train_iter) # 调用Timer函数统计时间# 开始训练循环for epoch in range(num_epochs):# Accumulator(3)创建3个变量:训练损失总和、训练准确度总和、样本数metric = Accumulator(3) # 用于跟踪训练损失、准确率和样本数net.train()  # 切换到训练模式(启用Dropout和BatchNorm的训练行为)for i, (X, y) in enumerate(train_iter):timer.start()           # 开始计时optimizer.zero_grad()   # 清空梯度X, y = X.to(device), y.to(device)   # 将数据移动到设备y_hat = net(X)          # 前向传播:模型预测l = loss(y_hat, y)      # 计算损失(向量形式,每个样本一个损失值)l.backward()            # 反向传播计算梯度optimizer.step()        # 更新参数with torch.no_grad(): # 禁用梯度计算后累计指标metric.add(l * X.shape[0], accuracy(y_hat, y), X.shape[0])timer.stop()            # 停止计时train_l = metric[0] / metric[2]     # 平均训练损失train_acc = metric[1] / metric[2]   # 平均训练准确率# 每训练完1/5的epoch 或 最后一个batch时,更新训练曲线if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:animator.add(epoch + (i + 1) / num_batches,(train_l, train_acc, None))# 测试精度test_acc = evaluate_accuracy_gpu(net, test_iter)  # 测试集准确率animator.add(epoch + 1, (None, None, test_acc)) # 更新测试集准确率曲线print(f'最终结果:loss {train_l:.3f}, train acc {train_acc:.3f}, 'f'test acc {test_acc:.3f}') # 输出损失值、训练精度、测试精度print(f'训练速度(样本数/总时间):{metric[2] * num_epochs / timer.sum():.1f} examples/sec 'f'on {str(device)}') # 设备的计算能力

lr, num_epochs = 0.9, 10 # 学习率,训练轮数(训练10轮)
train_ch6(net, train_iter, test_iter, num_epochs, lr, common.try_gpu())

函数:训练字符级循环神经网络模型 train_ch8

''' 训练一个 字符级循环神经网络(RNN)模型
包含了训练循环、梯度裁剪、困惑度计算和文本生成预测
net             : 要训练的RNN模型(可以是PyTorch模块或自定义模型)
train_iter      : 训练数据迭代器
vocab           : 词汇表对象,用于索引和字符之间的转换
lr              : 学习率
num_epochs      : 训练的总轮数
device          : 训练设备(CPU或GPU)
use_random_iter : 是否使用随机采样(否则使用顺序分区)
'''
def train_ch8(net, train_iter, vocab, lr, num_epochs, device,use_random_iter=False):"""训练模型(定义见第8章)"""# 1. 初始化损失函数loss = nn.CrossEntropyLoss() # 使用交叉熵损失# 2. 初始化可视化工具:初始化动画器,用于绘制训练过程中的困惑度变化animator = Animator(xlabel='epoch', ylabel='perplexity',legend=['train'], xlim=[10, num_epochs])# 3. 初始化优化器:根据net的类型选择不同的优化器if isinstance(net, nn.Module): # 如果是PyTorch模块,使用SGD优化器updater = torch.optim.SGD(net.parameters(), lr) # 则使用PyTorch的SGD优化器else:  # 如果是自定义模型,使用自定义的SGD优化器# 注意:这里的sgd函数需要三个参数:参数列表、学习率和批量大小(通过闭包捕获net.params和lr)# lambda batch_size: sgd(...) 创建闭包函数# 固定学习率 lr,动态传入 batch_sizeupdater = lambda batch_size: sgd(net.params, lr, batch_size)# 4. 定义预测函数,用于生成 以给定前缀开头的文本(生成50个字符)predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device) # 设置预测函数# 5. 主训练循环:训练和预测for epoch in range(num_epochs):# 5.1 训练一个epoch,返回困惑度(ppl)和训练速度(speed)(每秒处理多少个词元)# speed 表示每秒处理的词元数量,用于衡量训练效率ppl, speed = train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter)# 5.2 每10个epoch进行一次评估和可视化if (epoch + 1) % 10 == 0: # 每10个epochprint(predict('time traveller'))  # 使用前缀'time traveller'生成文本并打印animator.add(epoch + 1, [ppl]) # 将当前epoch的困惑度添加到动画中# 6. 训练结束后的最终评估:打印最终的困惑度和速度# 困惑度:语言模型质量指标(越低越好)# 处理速度:每秒处理的词元数量print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')# 使用两个不同的前缀生成文本print(predict('time traveller'))print(predict('traveller'))
num_epochs, lr = 500, 1 # 迭代周期为500,即训练500轮;学习率为1
common.train_ch8(net, train_iter, vocab, lr, num_epochs, common.try_gpu())# 重新初始化一个RNN模型
net = RNNModelScratch(len(vocab), num_hiddens, common.try_gpu(), get_params,init_rnn_state, rnn)
# 使用随机抽样训练模型
common.train_ch8(net, train_iter, vocab, lr, num_epochs, common.try_gpu(),use_random_iter=True)

函数:加载文本数据《time_machine》read_time_machine

'''
加载文本数据
# 下载器与数据集配置
# 为 time_machine 数据集注册下载信息,包括文件路径和校验哈希值(用于验证文件完整性)
downloader = common.C_Downloader()
DATA_HUB = downloader.DATA_HUB  # 字典,存储数据集名称与下载信息
DATA_URL = downloader.DATA_URL  # 基础URL,指向数据集的存储位置
DATA_HUB['time_machine'] = (DATA_URL + 'timemachine.txt','090b5e7e70c295757f55df93cb0a180b9691891a')
'''
def read_time_machine(downloader):  #@save"""将时间机器数据集加载到文本行的列表中"""# 通过 downloader.download('time_machine') 获取文件路径with open(downloader.download('time_machine'), 'r') as f:lines = f.readlines() # 逐行读取文本文件# 用正则表达式 [^A-Za-z]+ 替换所有非字母字符为空格# 调用 strip() 去除首尾空格,lower() 转换为小写# 返回值:处理后的文本行列表(每行是纯字母组成的字符串)return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
lines = common.read_time_machine(downloader)
print(f'# 文本总行数: {len(lines)}')
print(lines[0])     # 第1行内容
print(lines[10])    # 第11行内容tokens = common.tokenize(common.read_time_machine(downloader))
# 因为每个文本行不一定是一个句子或一个段落,因此我们把所有文本行拼接到一起
corpus = [token for line in tokens for token in line]
vocab = common.Vocab(corpus)
print(f"前10个最常用的(频率最高的)单词:\n{vocab.token_freqs[:10]}")

函数:词元化 (按单词或字符拆分文本) tokenize

# 词元化函数:支持按单词或字符拆分文本
# lines:预处理后的文本行列表
# token:词元类型,可选 'word'(默认)或 'char
# 返回值:嵌套列表,每行对应一个词元列表
def tokenize(lines, token='word'):  #@save"""将文本行拆分为单词或字符词元"""if token == 'word':return [line.split() for line in lines]  # 按空格分词elif token == 'char':return [list(line) for line in lines]   # 按字符拆分else:print('错误:未知词元类型:' + token)
tokens = common.tokenize(lines)
for i in range(11):print(f"第{i}行:{tokens[i]}")

函数:获取《time_machine》的词元索引序列和词表对象load_corpus_time_machine

# 获取《时光机器》的 词元索引序列和词表对象
# max_tokens:限制返回的词元索引序列的最大长度(默认 -1 表示不限制)
def load_corpus_time_machine(downloader, max_tokens=-1):  #@save"""返回时光机器数据集的词元索引列表和词表"""lines = read_time_machine(downloader) # 加载文本数据,得到文本行列表tokens = tokenize(lines, 'char') # 词元化:文本行列表→词元列表,按字符级拆分vocab = Vocab(tokens) # 构建词表# 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,# 所以将所有文本行展平到一个列表中# vocab[token] 查询词元的索引(若词元不存在,则返回0,即未知词索引)# corpus:list,每个元素为词元的对应索引corpus = [vocab[token] for line in tokens for token in line] # 展平词元并转换为索引if max_tokens > 0: # 限制词元序列长度corpus = corpus[:max_tokens] # 截断 corpus 到前 max_tokens 个词元# corpus:词元索引列表(如 [1, 2, 3, ...])# vocab:Vocab对象,用于管理词元与索引的映射return corpus, vocab
corpus, vocab = common.load_corpus_time_machine(downloader) # 加载数据
print(f"corpus词元索引列表的长度:{len(corpus)}")
print(f"词表大小:{len(vocab)}")
print(f"词频统计(降序):\n{vocab.token_freqs}")
# 索引 ↔ 词元转换
print(f"前10个索引对应的词元:\n{vocab.to_tokens(corpus[:10])}")
print(f"前10个词元对应的索引:\n{corpus[:10]}")
print(f"前10个词元对应的索引:\n{[idx for idx in corpus[:10]]}")

函数:【随机采样】(数据生成器) seq_data_iter_random

# 数据生成器:【随机采样】从长序列中随机抽取子序列,生成小批量数据
# batch_size:指定每个小批量中子序列样本的数目
# num_steps:每个子序列中预定义的时间步数(每个子序列长度)
def seq_data_iter_random(corpus, batch_size, num_steps):  #@save"""使用随机抽样生成一个小批量子序列"""# 从随机偏移量开始对序列进行分区,随机范围包括num_steps-1# 随机范围若超过[0,num_steps-1],则从num_steps开始,往后都会与已有的重复,且少了开头的部分子序列# random.randint(0, num_steps-1) 生成一个随机整数offset,范围是[0, num_steps-1]# corpus[random.randint(0, num_steps - 1):]截取从该偏移量到序列末尾的子序列corpus = corpus[random.randint(0, num_steps - 1):] # 随机偏移起始位置# 减去1,是因为需要考虑标签,标签是右移一位的序列num_subseqs = (len(corpus) - 1) // num_steps # 总可用 子序列数# 生成随机起始索引:长度为num_steps 的子序列 的起始索引initial_indices = list(range(0, num_subseqs * num_steps, num_steps)) # 起始索引列表# 在随机抽样的迭代过程中,来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻random.shuffle(initial_indices) # 随机打乱顺序def data(pos): # 返回从pos位置开始的长度为num_steps的序列return corpus[pos: pos + num_steps]# 序列长度35,时间步数5,则最多可有(35-1)/5=34/5=6个子序列# 批量大小2,则可生成批量数=6个子序列/批量大小2=3个小批量num_batches = num_subseqs // batch_size # 可生成的小批量数=总可用子序列数÷批量大小# 构造小批量数据(每次取batch_size个随机起始索引,生成输入X和标签Y)# i就是 当前批量在 总子序列中的第几批开头位置# 从已有的 打乱好的 起始索引list中,选出当前批量对应的那个下标位置上 的起始索引for i in range(0, batch_size * num_batches, batch_size):# 在这里,initial_indices包含子序列的随机起始索引initial_indices_per_batch = initial_indices[i: i + batch_size] # 每批次对应的起始索引X = [data(j) for j in initial_indices_per_batch]     # 输入子序列Y = [data(j + 1) for j in initial_indices_per_batch] # 标签(右移一位)yield torch.tensor(X), torch.tensor(Y) # 使用yield实现生成器,节省内存
my_seq = list(range(35)) # 生成一个从0到34的序列
# 批量大小为2,时间步数为5
for idx, (X, Y) in enumerate(common.seq_data_iter_random(my_seq, batch_size=2, num_steps=5)):print(f" 随机取样 —————— idx={idx} —————— \n"f"X: {X}\nY:{Y}")

函数:【顺序分区】(数据生成器) seq_data_iter_sequential

# 数据生成器:【顺序分区】按顺序划分长序列,生成小批量数据,保证完整覆盖序列
def seq_data_iter_sequential(corpus, batch_size, num_steps):  #@save"""使用顺序分区生成一个小批量子序列"""# 从随机偏移量开始划分序列offset = random.randint(0, num_steps) # 随机偏移起始位置# 确保能整除 batch_size,避免最后一个小批量不足# (len(corpus) - offset - 1) 起始位置偏移后,剩余右侧 所需的最少长度num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size # 有效词元数# 重构为批量优先格式:将序列重塑为 (batch_size批量大小, sequence_length序列长度) 的张量,便于批量处理# sequence_length序列长度:每个样本(序列)的时间步数(或词元数)Xs = torch.tensor(corpus[offset: offset + num_tokens]) # 截取有效词元区域,这里得到的向量形式的张量Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens]) # 可作为标签的有效词元区域# 重塑张量形状,每列皆为一个批量,每行皆为单批量的序列长度 即总词元数大小Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)num_batches = Xs.shape[1] // num_steps # 批量数=/每个小批量的时间步数 即序列长度# 按步长分割小批量:沿序列长度维度(axis=1)滑动窗口,生成连续的小批量# 将单次批量的总序列大小分割为多个子序列for i in range(0, num_steps * num_batches, num_steps):# 从第i列开始,取num_steps列X = Xs[:, i: i + num_steps] # 输入子序列Y = Ys[:, i: i + num_steps] # 标签yield X, Y # 使用yield实现生成器
my_seq = list(range(35)) # 生成一个从0到34的序列
for idx, (X, Y) in enumerate(common.seq_data_iter_sequential(my_seq, batch_size=2, num_steps=5)):print(f" 顺序分区 —————— idx={idx} —————— \n"f"X: {X}\nY:{Y}")

函数:数据加载load_data_time_machine,同时返回数据迭代器和词表

'''
数据加载函数:同时返回数据迭代器和词表
batch_size     :每小批量的子序列数量
num_steps      :每个子序列的时间步数(词元数)
use_random_iter:是否使用随机采样(默认顺序分区)
max_tokens     :限制语料库的最大词元数返回值
data_iter:SeqDataLoader 实例(可迭代)
vocab    :词表对象(用于词元与索引的映射)
'''
def load_data_time_machine(downloader, batch_size, num_steps,  #@saveuse_random_iter=False, max_tokens=10000):"""返回时光机器数据集的迭代器和词表"""data_iter = SeqDataLoader(downloader, batch_size, num_steps, use_random_iter, max_tokens)return data_iter, data_iter.vocab
batch_size, num_steps = 32, 35 # 每个小批量包含32个子序列,每个子序列的词元数为35
train_iter, vocab = common.load_data_time_machine(downloader, batch_size, num_steps) # 词表对象

 函数:

计时器类 Timer

可用于进行 运行时间的基准测试

import time
import numpy as npclass Timer:  # @save"""记录多次运行时间"""def __init__(self):self.times = []self.start()def start(self):"""启动计时器"""self.tik = time.time()def stop(self):"""停止计时器并将时间记录在列表中"""self.times.append(time.time() - self.tik)return self.times[-1]def avg(self):"""返回平均时间"""return sum(self.times) / len(self.times)def sum(self):"""返回时间总和"""return sum(self.times)def cumsum(self):"""返回累计时间"""return np.array(self.times).cumsum().tolist()

 使用示例:

from common import Timer
import torch# 实例化两个全为1的10000维向量
n = 10000
a = torch.ones([n])
b = torch.ones([n])# 开始对工作负载进行基准测试
c = torch.zeros(n)
timer = Timer()for i in range(n):c[i] = a[i] + b[i]
print(f"方法一:使用循环遍历向量,耗时:{timer.stop():.5f} sec")timer.start()
d = a + b
print(f"方法二:使用重载的+运算符来计算按元素的和,耗时:{timer.stop():.5f} sec")

累加器类:记录正确预测数和预测总数 Accumulator

定义一个实用程序类Accumulator,用于对多个变量进行累加,Accumulator实例中创建了2个变量, 分别用于存储正确预测的数量和预测的总数量。

  • __init__():创建一个类,初始化类实例时就会自动执行__init__()方法。该方法的第一个参数为self,表示的就是类的实例。self后面跟随的其他参数就是创建类实例时要传入的参数。
  • zip():将多个可迭代对象作为参数,依次将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的对象,里面的每个元素大概为(self.data,  args)的形式。
  • reset();重新设置空间大小并初始化。
  • __getitem__():接收一个idx参数,这个参数就是自己给的索引值,返回self.data[idx],实现类似数组的取操作。
# 实用程序类,示例中创建两个变量:正确预测的数量 和 预测总数
class Accumulator:  # @save"""在n个变量上累加"""# 初始化根据传进来n的大小来创建n个空间,全部初始化为0.0def __init__(self, n):self.data = [0.0] * n# 把原来类中对应位置的data和新传入的args做a + float(b)加法操作然后重新赋给该位置的data,从而达到累加器的累加效果def add(self, *args):self.data = [a + float(b) for a, b in zip(self.data, args)]# 重新设置空间大小并初始化。def reset(self):self.data = [0.0] * len(self.data)# 实现类似数组的取操作def __getitem__(self, idx):return self.data[idx]

使用示例:

def evaluate_accuracy(net, data_iter): #@save"""计算在指定数据集上模型的精度"""if isinstance(net, torch.nn.Module): # 判断模型是否为深度学习模型net.eval() # 将模型设置为评估模式# Accumulator(2)创建2个变量:正确预测的样本数总和、样本数metric = Accumulator(2) # metric:度量,累加正确预测数、预测总数with torch.no_grad(): # 梯度不需要反向传播for X, y in data_iter: # 每次从迭代器中拿出一个X和y# net(X):X放在net模型中进行softmax操作# numel()函数:返回数组中元素的个数,在此可以求得样本数metric.add(accuracy(net(X), y), y.numel()) # metric[0, 1]分别为网络预测正确数量和总预测数量return metric[0] / metric[1]

 绘制器类:动态绘制数据 Animator

from IPython import display# import matplotlib
# # 强制使用 TkAgg 或 Qt5Agg 后端 (使用独立后端渲染)
# matplotlib.use('TkAgg')  # 或者使用 'Qt5Agg',根据你的系统安装情况
# # matplotlib.use('Qt5Agg')  # 或者使用 'Qt5Agg',根据你的系统安装情况
import matplotlib.pyplot as plt
# 实用程序类,动画绘制器,动态绘制数据
class Animator:  # @save"""在动画中绘制数据"""def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,ylim=None, xscale='linear', yscale='linear',fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,figsize=(3.5, 2.5)):# 增量地绘制多条线if legend is None:legend = []# 创建图形和坐标轴self.fig, self.axes = plt.subplots(nrows, ncols, figsize=figsize)if nrows * ncols == 1:self.axes = [self.axes, ]  # 确保axes是列表形式(即使只有1个子图)# 设置坐标轴配置的函数def set_axes(ax, xlabel, ylabel, xlim, ylim, xscale, yscale, legend):ax.set_xlabel(xlabel)ax.set_ylabel(ylabel)if xlim:ax.set_xlim(xlim)if ylim:ax.set_ylim(ylim)ax.set_xscale(xscale)ax.set_yscale(yscale)if legend:ax.legend(legend)ax.grid()# 使用lambda函数捕获参数self.config_axes = lambda: [set_axes(ax, xlabel, ylabel, xlim, ylim,xscale, yscale, legend) for ax in self.axes]self.X, self.Y, self.fmts = None, None, fmtsplt.ion()  # 开启交互模式,使图形可以实时更新def add(self, x, y):# 向图表中添加多个数据点# x: x值或x值列表# y: y值或y值列表# hasattr(y, "__len__"):检查 y 是否为多值(如列表或数组)if not hasattr(y, "__len__"):y = [y]  # 如果y不是列表/数组,转换为单元素列表n = len(y)if not hasattr(x, "__len__"):x = [x] * n  # 如果x是标量,扩展为与y长度相同的列表if not self.X:self.X = [[] for _ in range(n)]  # 初始化n条曲线的x数据存储if not self.Y:self.Y = [[] for _ in range(n)]  # 初始化n条曲线的y数据存储for i, (a, b) in enumerate(zip(x, y)):if a is not None and b is not None:self.X[i].append(a)  # 添加x数据self.Y[i].append(b)  # 添加y数据for ax in self.axes: # 清除并重新绘制所有子图ax.cla()for x, y, fmt in zip(self.X, self.Y, self.fmts):for ax in self.axes:ax.plot(x, y, fmt) # 重新绘制所有曲线self.config_axes()self.fig.canvas.draw()  # 更新画布self.fig.canvas.flush_events()  # 刷新事件time.sleep(0.1)  # 添加短暂延迟以模拟动画效果plt.show() # pycharm社区版没有科学模块,通过在循环里show来实现动画效果def close(self):"""关闭图形"""plt.ioff()  # 关闭交互模式plt.close(self.fig)

使用示例:(完整版调用在 学习多层感知机的多项式回归 的训练函数中,详情:动手学深度学习——多层感知机实现-CSDN博客)

    # 绘制训练过程中的损失曲线animator = common.Animator(xlabel='epoch', ylabel='loss', yscale='log', # yscale='log':使用对数刻度显示损失值xlim=[1, num_epochs], ylim=[1e-3, 1e2], # 设置坐标轴范围legend=['train', 'test'])  # 图例标签for epoch in range(num_epochs): # 循环训练common.train_epoch_ch3(net, train_iter, loss, trainer)if epoch == 0 or (epoch + 1) % 20 == 0:animator.add(epoch + 1, (evaluate_loss(net, train_iter, loss),evaluate_loss(net, test_iter, loss)))plt.show() # pycharm社区版没有科学模块,通过在循环里show来实现动画效果animator.close()  # 最后记得关闭图形

下载器类:下载和缓存数据集 C_Downloader

import hashlib
import os
import tarfile
import zipfile
import requests

3个功能函数:

  1. download:下载数据集,将数据集缓存在本地目录(默认为../data)中,并返回下载文件的名称(数据集只是表格格式可直接调用)
  2. download_extract:下载并解压一个zip或tar文件
  3. download_all:将使用的所有数据集从DATA_HUB下载到缓存目录中
# 下载器类:下载和缓存数据集
class C_Downloader:def __init__(self, data_url = 'http://d2l-data.s3-accelerate.amazonaws.com/'):# DATA_HUB字典,将数据集名称的字符串映射到数据集相关的二元组上# DATA_HUB为二元组:包含数据集的url和验证文件完整性的sha-1密钥self.DATA_HUB = dict()self.DATA_URL = data_url # 数据集托管在地址为DATA_URL的站点上''' download下载数据集,将数据集缓存在本地目录(默认为../data)中,并返回下载文件的名称若缓存目录中已存在此数据集文件,且其sha-1与存储在DATA_HUB中的相匹配,则使用缓存的文件,以避免重复的下载name:要下载的文件名,必须在DATA_HUB中存在cache_dir:缓存目录,默认为../datasha-1:安全散列算法1'''def download(self, name, cache_dir=os.path.join('..', 'data')):  # @save"""下载一个DATA_HUB中的文件,返回本地文件名"""# 检查指定的文件名是否存在于DATA_HUB中# 如果不存在,则抛出断言错误,提示用户该文件不存在# 断言检查:确保name在DATA_HUB中存在,避免下载不存在的文件assert name in self.DATA_HUB, f"{name} 不存在于 {self.DATA_HUB}"url, sha1_hash = self.DATA_HUB[name] # 从DATA_HUB中获取该文件的URL和SHA-1哈希值# 若目录不存在,则创建目录# exist_ok=True:若目录已存在,也不会抛出错误os.makedirs(cache_dir, exist_ok=True)# 构建本地文件路径# 从URL中提取文件名(通过分割URL字符串获取最后一个部分)# 并将该文件名与缓存目录组合成完整的本地文件路径fname = os.path.join(cache_dir, url.split('/')[-1])if os.path.exists(fname): # 检查本地文件是否已存在sha1 = hashlib.sha1() # 计算本地文件的SHA-1哈希值(shal.sha1():创建一个字符串hashlib_,并将其加密后传入)with open(fname, 'rb') as f:# 读取文件内容,每次读取1MB的数据块,以避免大文件占用过多内存while True:data = f.read(1048576) # 1048576 bytes = 1MBif not data:breaksha1.update(data) # 更新哈希值# 比较计算出的哈希值与DATA_HUB中存储的哈希值if sha1.hexdigest() == sha1_hash:# 若哈希值匹配,说明文件完整且未被篡改,直接返回本地文件路径(命中缓存)return fname  # 命中缓存# 如果本地文件不存在或哈希值不匹配,则从URL下载文件print(f'正在从{url}下载{fname}...')# 使用requests库发起HTTP GET请求,stream=True表示以流的方式下载大文件# verify=True表示验证SSL证书(确保下载的安全性)r = requests.get(url, stream=True, verify=True)# 将下载的内容写入到本地文件中with open(fname, 'wb') as f:f.write(r.content) # 将请求的内容写入文件return fname # 返回本地文件路径''' 下载并解压一个zip或tar文件name:要下载并解压的文件名,必须在DATA_HUB中存在folder:解压后的目标文件夹名(可选)'''def download_extract(self, name, folder=None):  # @save"""下载并解压zip/tar文件"""fname = self.download(name) # 调用download函数下载指定的文件,获取本地文件路径base_dir = os.path.dirname(fname) # 获取缓存目录路径(即下载文件所在的目录)data_dir, ext = os.path.splitext(fname) # 分离文件名和扩展名if ext == '.zip':               # 如果是zip文件,使用zipfile.ZipFile 打开文件fp = zipfile.ZipFile(fname, 'r')elif ext in ('.tar', '.gz'):    # 如果是tar或gz文件,使用tarfile.open 打开文件fp = tarfile.open(fname, 'r')else:                           # 如果文件扩展名不是zip、tar或gz,抛出断言错误assert False, '只有zip/tar文件可以被解压缩'fp.extractall(base_dir) # 将文件解压到缓存目录中# 返回解压后的路径# 如果指定了folder参数,返回解压后的目标文件夹路径# 否则返回解压后的文件路径(即去掉扩展名的文件名)return os.path.join(base_dir, folder) if folder else data_dir# 将使用的所有数据集从DATA_HUB下载到缓存目录中def download_all(self):  # @save"""下载DATA_HUB中的所有文件"""for name in self.DATA_HUB:self.download(name)

使用示例:

import os
import pandas as pddownloader = C_Downloader()
DATA_HUB = downloader.DATA_HUB
DATA_URL = downloader.DATA_URL# 下载并缓存Kaggle房屋数据集
DATA_HUB['kaggle_house_train'] = (  #@saveDATA_URL + 'kaggle_house_pred_train.csv','585e9cc93e70b39160e7921475f9bcd7d31219ce')DATA_HUB['kaggle_house_test'] = (  #@saveDATA_URL + 'kaggle_house_pred_test.csv','fa19780a7b011d9b009e8bff8e99922a8ee2eb90')# 调用download函数下载文件
cache_dir=os.path.join('.', 'data', 'kaggle_house') # 缓存路径为 .\data\kaggle_house
trainData_path = downloader.download('kaggle_house_train', cache_dir)
testData_path = downloader.download('kaggle_house_test', cache_dir)
print(f'【训练集】文件已下载到:{trainData_path}')
print(f'【测试集】文件已下载到:{testData_path}')# 使用pandas分别加载包含训练数据和测试数据的两个CSV文件
train_data = pd.read_csv(trainData_path)
test_data = pd.read_csv(testData_path)
# train_data = pd.read_csv(downloader.download('kaggle_house_train'))
# test_data = pd.read_csv(downloader.download('kaggle_house_test'))print(f"【训练集】:{train_data.shape} 包括1460个样本,每个样本80个特征和1个标")
print(f"【测试集】:{test_data.shape} 包含1459个样本,每个样本80个特征")
print(f"查看前四个和最后两个特征,以及相应标签(房价)\n{train_data.iloc[0:4, [0, 1, 2, 3, -3, -2, -1]]}")# 对于每个样本:删除第一个特征ID,因为其不携带任何用于预测的信息
all_features = pd.concat((train_data.iloc[:, 1:-1], test_data.iloc[:, 1:]))

Vocab类:构建文本词表 & 统计词元(tokens)的频率 count_corpus

'''
Vocab类:构建文本词表(Vocabulary),管理词元与索引的映射关系
功能:
构建词表,管理词元与索引的映射关系,支持:
词元 → 索引(token_to_idx)
索引 → 词元(idx_to_token)
过滤低频词
保留特殊词元(如 <unk>未知, <pad>填充符, <bos>起始符, <eos>结束符)
'''
class Vocab:  #@save"""文本词表"""# tokens:原始词元列表(一维或二维)# min_freq:最低词频阈值,低于此值的词会被过滤# reserved_tokens:预定义的特殊词元(如 ["<pad>", "<bos>"])def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):if tokens is None: tokens = []if reserved_tokens is None: reserved_tokens = []# 统计词频,按出现频率排序counter = count_corpus(tokens) # 统计词频# key=lambda x: x[1] 指定排序依据为第二个元素# reverse=True 降序排序# _var:弱私有(Protected),# 表示变量是内部使用的,提示开“不要从类外部直接访问”,但实际上仍然可访问(Python不会强制限制)self._token_freqs = sorted(counter.items(), key=lambda x: x[1],reverse=True) # 词元频率(词频),按频率降序排序# 初始化词表,未知词元的索引为0(<unk>)# idx_to_token:索引 → 词元,索引0 默认是 <unk>(未知词元),后面是reserved_tokens# token_to_idx:词元 → 索引,是 idx_to_token 的反向映射self.idx_to_token = ['<unk>'] + reserved_tokensself.token_to_idx = {token: idxfor idx, token in enumerate(self.idx_to_token)} # 字典# 按词频从高到低添加词元,过滤低频词for token, freq in self._token_freqs:if freq < min_freq: # 跳过低频词breakif token not in self.token_to_idx: # 若词元不在token_to_idx中,则添加到词表self.idx_to_token.append(token) # 压入新词元self.token_to_idx[token] = len(self.idx_to_token) - 1 # 新词元对应的索引# __len__用于定义对象的长度行为。对类的实例调用len()时,Python会自动调用该实例的__len__方法def __len__(self): # 词表大小(包括 <unk> 和 reserved_tokens)return len(self.idx_to_token) # 返回词表大小# 词表索引查询:支持单个词元或词元列表查询 ↓# vocab["hello"] → 返回索引(如 1)# vocab[["hello", "world"]] → 返回索引列表 [1, 2]# 若词元不存在,返回 unk 的索引(默认 0)# __getitem__定义对象如何响应obj[key]这样的索引操作,实现后 实例可像列表或字典一样通过方括号[]访问元素def __getitem__(self, tokens):if not isinstance(tokens, (list, tuple)): # 若传入参数不为列表或元组,而是单个# 字典的内置方法 dict.get(key, default) 用于安全地获取字典中某个键(key)对应的值# 若键不存在,则返回指定的默认值(default),而不会抛出 KeyError 异常return self.token_to_idx.get(tokens, self.unk)   # 单个词元返回索引,未知词返回0return [self.__getitem__(token) for token in tokens] # 递归处理列表# 索引转词元# 支持单个索引或索引列表转换:# vocab.to_tokens(1) → 返回词元(如 "hello")# vocab.to_tokens([1, 2]) → 返回词元列表 ["hello", "world"]def to_tokens(self, indices):if not isinstance(indices, (list, tuple)): # 若传入参数不为列表或元组,而是单个return self.idx_to_token[indices]  # 单个索引返回词元return [self.idx_to_token[index] for index in indices] # 递归处理列表@propertydef unk(self):  # 返回未知词元的索引(默认为0)return 0@propertydef token_freqs(self): # 返回原始词频统计结果(降序排列)return self._token_freqs # 返回词频统计结果# 辅助函数:统计词元(tokens)的频率,返回一个 Counter对象
def count_corpus(tokens):  #@save"""统计词元的频率"""# 这里的tokens是1D列表或2D列表# 如果tokens是空列表或二维列表(如句子列表),则展平为一维列表if len(tokens) == 0 or isinstance(tokens[0], list):# 将词元列表展平成一个列表tokens = [token for line in tokens for token in line]# collections.Counter统计每个词元的出现次数,返回类似{"hello":2, "world":1}的字典return collections.Counter(tokens)

使用示例:

vocab = common.Vocab(tokens) # 构建词表,管理词元与索引的映射关系
print(f"前几个高频词及其索引:\n{list(vocab.token_to_idx.items())[:10]}")for i in [0, 10]: # 将每一条文本行转换成一个数字索引列表print(f"第{i}行信息:")print('文本:', tokens[i])print('索引:', vocab[tokens[i]])print(f"词表大小:{len(vocab)}")
print(f"词频统计(降序):\n{vocab.token_freqs}")

数据加载器类:整合随机采样和顺序分区,并用作数据迭代器 SeqDataLoader

# 数据加载器类:将随机采样和顺序分区包装到一个类中,以便稍后可以将其用作数据迭代器
class SeqDataLoader:  #@save"""加载序列数据的迭代器"""# max_tokens:限制返回的词元索引序列的最大长度(默认 -1 表示不限制)def __init__(self, downloader, batch_size, num_steps, use_random_iter, max_tokens):# 初始化选择采样方式if use_random_iter:self.data_iter_fn = seq_data_iter_random     # 随机取样else:self.data_iter_fn = seq_data_iter_sequential # 顺序分区self.corpus, self.vocab = load_corpus_time_machine(downloader, max_tokens) # 加载语料和词表self.batch_size, self.num_steps = batch_size, num_steps# __iter__实现迭代器协议:使对象可迭代,直接用于for循环# 从语料库(self.corpus)中 按指定的batch_size和num_step(即sequence_length) 生成批量数据def __iter__(self):return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/91997.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/91997.shtml
英文地址,请注明出处:http://en.pswp.cn/web/91997.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

南太平洋金融基建革命:斐济-巴新交易所联盟的技术破局之路 ——从关税动荡到离岸红利,跨境科技如何重塑太平洋资本生态

一、今日焦点&#xff1a;全球关税震荡与南太平洋的“技术联盟”机遇 1. 特朗普关税大限引爆亚太市场波动&#xff0c;小经济体承压寻路 2025年8月1日&#xff0c;特朗普正式签署行政令&#xff0c;对多国征收10%-41%的“对等关税”。韩国首当其冲&#xff0c;综合指数暴跌近4%…

python爬取豆瓣电影评论通用代码

最近在自学python爬虫&#xff0c;今天闲来无事&#xff0c;爬了一下豆瓣数据 这个网站对于初学者来说还是很友好的注意&#xff1a;有python环境的朋友运行的时候&#xff0c;要把cookie换成自己的 通用性&#xff1a;可以自己换不同的电影id进行数据爬取 Tip&#xff1a;slee…

构建属于自己的第一个 MCP 服务器:初学者教程

为什么需要 MCP 服务器&#xff1f; 你是否遇到过这样的场景&#xff1a;向 AI 助手&#xff08;比如 GitHub Copilot&#xff09;询问 “北京今天的天气”&#xff0c;得到的回复却是 “我无法访问实时天气数据”&#xff1f; 这是因为大多数 AI 模型本身 “与世隔绝”—— 它…

个人项目介绍:语音识别小助手

一、项目内容 基于STM32F103RCT6制作了一款集语音识别、按键控制、信息显示、温湿度监测等多功能于一体的智能设备&#xff0c;满足多样化的交互需求。 二、个人工作内容 依据项目需求&#xff0c;选定 STM32F103RCT6 单片机、SU-03T语音识别模组、AHT25 温湿度传感器等核心元件…

【Django】-1- 开发项目搭建

一、PDM Django 搭建项目&#x1f447;&#x1f3af; 核心目标用 PDM&#xff08;更现代的 Python 包管理工具&#xff09;&#xff0c;快速创建并管理 Django 项目&#xff08;Web 框架&#xff09;&#xff0c;让开发流程更丝滑✨&#x1f9e9; 分步拆解1. 创建项目用 PDM 初…

c++:设计模式训练

写一个鸟类&#xff1a;有一个多态函数&#xff1a;run 写一个企鹅类&#xff0c;继承自鸟类&#xff1a;重写 run 写一个鸵鸟类&#xff0c;继承自鸟类&#xff0c;重写 run 写一个老鹰类&#xff0c;继承自鸟类&#xff0c;重写run 写一个鸟笼&#xff0c;能够存放 不同的鸟…

配置Mybatis环境

配置Mybatis环境MyBatis是什么配置Mybatis环境MyBatis是什么 MyBatis 一个支持普通 SQL 查询、存储过程以及高级映射的持久层框架。MyBatis 免除了几乎所有的 JDBC 代码以及设置参数和获取结果集的工作&#xff0c;使得开发者可以更专注于 SQL 本身&#xff0c;而不必花费过多…

生产环境中基于Istio的Kubernetes多集群灰度发布架构实战经验分享

生产环境中基于Istio的Kubernetes多集群灰度发布架构实战经验分享 在大规模分布式微服务架构中&#xff0c;如何在多集群环境下平滑、安全地发布新版本&#xff0c;一直是保证高可用、高可靠的关键需求。本文以真实生产环境案例为基础&#xff0c;分享我们团队基于Istio Servic…

Kubernetes(k8s)之认识Pod

01了解Pod Pod是Kubernetes创建或部署的最小/最简单的基本单位,一个Pod代表集群上正在运行的一个进程。 一个Pod封装一个应用容器(也可以有多个容器),存储资源、一个独立的网络IP以及管理控制容器运行方式的策略选项。它可能由单个容器或多个容器共享组成的资源。 Kubern…

Nginx服务做负载均衡网关

1. 概述 内部Nginx服务器做服务网关&#xff0c;代理后端应用服务&#xff0c;卸载ssl域名证书&#xff0c;将接收的https请求&#xff0c;转发至后端http服务。华为防火墙负责NAT&#xff0c;启用服务器负载均衡功能&#xff0c;将公网虚拟IP端口映射到内部多台Nginx服务器上…

十三、请求响应-请求:日期参数和JSON参数

日期参数代码&#xff1a;日期参数 //日期时间参数RequestMapping("/dataParam")public String dataParam(DateTimeFormat(pattern "yyyy-MM-dd HH:mm:ss") LocalDateTime updateTime){System.out.println(updateTime);return "OK";}结果JSON参…

可信数据库大会现场,TDengine 时序数据库展示核电场景下的高性能与 AI 创新

设备在升级&#xff0c;场站在扩建&#xff0c;但数据系统却还在“跟不上”。这正是许多核电企业在推进数字化转型过程中最真实的感受。高频采集、长周期存储、精度要求高……这些构成了对数据库系统的“炼狱级考验”。在这样一个背景下&#xff0c;国产数据库的能力边界正在被…

ctflearn-POST practice

靶场地址&#xff1a;165.227.106.113/post.php 解题&#xff1a; 一.分析题目 提示&#xff1a; 知道要用POST请求提交表单&#xff0c;看一下源码信息 得到可能需要用post请求方式去提交表单&#xff0c;并且传数据admin和password&#xff0c;这边提供两种方式 方法一&…

FPGA实现OV7670摄像头图像处理至VGA显示器

本文还有配套的精品资源&#xff0c;点击获取 简介&#xff1a;本项目基于FPGA技术&#xff0c;结合OV7670摄像头传感器进行视频捕获&#xff0c;经SDRAM存储&#xff0c;并通过VGA显示器展示。同时&#xff0c;集成了中值滤波算法提高图像清晰度。该项目涉及数字图像处理系…

使用python写一套完整的智能体小程序

创建一个简单的智能体&#xff08;Agent&#xff09;程序在人工智能和自动化任务中&#xff0c;智能体&#xff08;Agent&#xff09;是指能够感知环境并通过决策和行动来实现目标的实体。Python 提供了丰富的库和框架&#xff0c;可以用于构建智能体程序&#xff0c;例如使用 …

电商项目_性能优化_海量数据读写、存储、检索

海量数据读写方式选择高并发读写场景分析无论任何业务系统&#xff0c;无非就是两个操作&#xff1a;写和读。 在海量数据和高并发的场景下&#xff0c;写和读就会成为系统性能的瓶颈。下面分析不同业务场景下面临的问题&#xff1a;侧重“高并发读”的系统场景1&#xff1a;搜…

RabbitMQ面试精讲 Day 9:优先级队列与惰性队列

【RabbitMQ面试精讲 Day 9】优先级队列与惰性队列 文章标签 RabbitMQ,优先级队列,惰性队列,消息队列,面试技巧,系统架构 文章简述 本文是"RabbitMQ面试精讲"系列第9天&#xff0c;深入解析优先级队列与惰性队列的实现原理与实战应用。文章详细讲解优先级队列的排…

[硬件电路-121]:模拟电路 - 信号处理电路 - 模拟电路中常见的难题

模拟电路设计是电子工程中极具挑战性的领域&#xff0c;其核心难题源于信号的连续性、元件的非理想特性以及环境干扰的复杂性。以下是模拟电路中常见的难题及其技术本质与解决方案&#xff1a;1. 噪声与干扰&#xff1a;信号的“隐形杀手”技术本质&#xff1a;模拟信号对微小电…

Java 大视界 -- Java 大数据在智能交通智能停车诱导与车位共享优化中的应用(381)

Java 大视界 -- Java 大数据在智能交通智能停车诱导与车位共享优化中的应用&#xff08;381&#xff09;引言&#xff1a;正文&#xff1a;一、智能停车的 “老大难”&#xff1a;不只是 “车位少” 那么简单1.1 车主与车位的 “错位困境”1.1.1 信息滞后的 “睁眼瞎”1.1.2 车…

基于落霞归雁思维框架的自动化测试实践与探索

基于落霞归雁思维框架的自动化测试实践与探索 在当今快速发展的软件开发领域&#xff0c;自动化测试已成为提高软件质量和开发效率的关键环节。本文将结合落霞归雁的思维框架——“观察现象 → 找规律 → 应用规律 → 实践验证”&#xff0c;探讨如何将其应用于自动化测试领域&…