文章目录
- 前言
- 核心工具函数
- 广义优势估计 (Generalized Advantage Estimation, GAE)
- 案例一:TRPO 解决离散动作问题 (CartPole-v1)
- 1. 环境初始化
- 2. 网络结构定义
- 3. TRPO 智能体实现
- 4. 训练与可视化
- 5. 训练主程序与结果
- 案例二:TRPO 解决连续动作问题 (Pendulum-v1)
- 1. 环境与工具函数
- 2. 网络结构的关键区别
- 3. TRPO 智能体实现 (连续版)
- 4. 训练与可视化
- 5. 训练主程序与结果
- 总结
前言
欢迎来到深度强化学习(Deep Reinforcement Learning, DRL)的世界!DRL 是深度学习与强化学习的强大结合,它让智能体(Agent)能够在复杂的、高维度的环境中通过试错来学习最优策略。近年来,DRL 在游戏、机器人控制、自然语言处理等领域取得了举世瞩目的成就。
在众多 DRL 算法中,策略梯度(Policy Gradient, PG)方法是一类直接对策略进行优化的算法。然而,传统的 PG 方法存在一个显著的痛点:训练过程不稳定。由于策略更新的步长难以确定,过大的步长可能导致新策略性能急剧下降,使得训练过程崩溃;过小的步长则会导致收敛速度过慢。
为了解决这个问题,来自 UC Berkeley 的 John Schulman 等人提出了**信赖域策略优化(Trust Region Policy Optimization, TRPO)**算法。TRPO 的核心思想是在每次策略更新时,施加一个约束,确保新旧策略之间的差异(用 KL 散度衡量)在一个“信赖域”内。这就像我们下山时,不会一步迈出太远,而是先在脚边找一个可靠的落脚点,小步快走,从而保证了每一步都是在稳健地提升策略性能。
TRPO 通过复杂的数学推导,将这个带约束的优化问题转化为通过**共轭梯度法(Conjugate Gradient)和线性搜索(Line Search)**高效求解的问题,从而在保证稳定性的同时,实现了较高的样本效率。
本篇博客旨在通过 PyTorch 代码,深入浅出地剖析 TRPO 算法的实现细节。我们将从零开始,构建 TRPO 智能体,并分别在两个经典的强化学习环境中进行实战:
- CartPole-v1:一个经典的离散动作空间环境,目标是控制小车移动来保持杆子竖直不倒。
- Pendulum-v1:一个经典的连续动作空间环境,目标是施加力矩来将摆杆竖直向上并保持。
通过这两个案例,我们将全面掌握 TRPO 在不同动作空间下的实现差异与核心思想。无论您是 DRL 的初学者还是有一定经验的实践者,相信这篇代码驱动的博客都能为您带来深刻的理解和启发。
完整代码:下载链接
核心工具函数
在正式进入算法实现之前,我们先介绍两个在策略优化算法中非常实用的工具函数:广义优势估计(GAE)和移动平均。
广义优势估计 (Generalized Advantage Estimation, GAE)
在 Actor-Critic 框架中,优势函数 A(s, a) = Q(s, a) - V(s) 用来评估在状态 s 下采取动作 a 相对于平均水平的好坏。GAE 是一种先进的优势函数估计方法,它通过引入一个参数 λ
来平衡 TD(0) 估计(偏差低,方差高)和蒙特卡洛估计(偏差高,方差低)之间的权衡,从而有效降低策略梯度的方差,提升训练稳定性。
下面是 GAE 的实现代码,包含了详尽的维度分析和注释。
"""
强化学习工具函数集
包含广义优势估计(GAE)和数据平滑处理功能
"""import torch
import numpy as npdef compute_advantage(gamma, lmbda, td_delta):"""计算广义优势估计(Generalized Advantage Estimation,GAE)GAE是一种在强化学习中用于减少策略梯度方差的技术,通过对时序差分误差进行指数加权平均来估计优势函数,平衡偏差和方差的权衡。参数:gamma (float): 折扣因子,维度: 标量取值范围[0,1],决定未来奖励的重要性lmbda (float): GAE参数,维度: 标量 取值范围[0,1],控制偏差-方差权衡lmbda=0时为TD(0)单步时间差分,lmbda=1时为蒙特卡洛方法用采样到的奖励-状态价值估计td_delta (torch.Tensor): 时序差分误差序列,维度: [时间步数]包含每个时间步的TD误差值返回:torch.Tensor: 广义优势估计值,维度: [时间步数]与输入td_delta维度相同的优势函数估计数学公式:A_t^GAE(γ,λ) = Σ_{l=0}^∞ (γλ)^l * δ_{t+l}其中 δ_t = r_t + γV(s_{t+1}) - V(s_t) 是TD误差"""# 将PyTorch张量转换为NumPy数组进行计算# td_delta维度: [时间步数] -> [时间步数]td_delta = td_delta.detach().numpy() # 因为A用来求g的,需要梯度,防止梯度向下传播# 初始化优势值列表,用于存储每个时间步的优势估计# advantage_list维度: 最终为[时间步数]advantage_list = []# 初始化当前优势值,从序列末尾开始反向计算# advantage维度: 标量advantage = 0.0# 从时间序列末尾开始反向遍历TD误差# 反向计算是因为GAE需要利用未来的信息# delta维度: 标量(td_delta中的单个元素)for delta in td_delta[::-1]: # [::-1]实现序列反转# GAE递归公式:A_t = δ_t + γλA_{t+1}# gamma * lmbda * advantage: 来自未来时间步的衰减优势值# delta: 当前时间步的TD误差# advantage维度: 标量advantage = gamma * lmbda * advantage + delta# 将计算得到的优势值添加到列表中# advantage_list维度: 逐步增长到[时间步数]advantage_list.append(advantage)# 由于是反向计算,需要将结果列表反转回正确的时间顺序# advantage_list维度: [时间步数](时间顺序已恢复)advantage_list.reverse()# 将NumPy列表转换回PyTorch张量并返回# 返回值维度: [时间步数]return torch.tensor(advantage_list, dtype=torch.float)def moving_average(data, window_size):"""计算移动平均值,用于平滑奖励曲线该函数通过滑动窗口的方式对时间序列数据进行平滑处理,可以有效减少数据中的噪声,使曲线更加平滑美观。常用于强化学习中对训练过程的奖励曲线进行可视化优化。参数:data (list): 原始数据序列,维度: [num_episodes]包含需要平滑处理的数值数据(如每轮训练的奖励值)window_size (int): 移动窗口大小,维度: 标量决定了平滑程度,窗口越大平滑效果越明显但也会导致更多的数据点丢失返回:list: 移动平均后的数据,维度: [len(data) - window_size + 1]返回的数据长度会比原数据少 window_size - 1 个元素这是因为需要足够的数据点来计算第一个移动平均值示例:>>> data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 维度: [10]>>> smoothed = moving_average(data, 3) # window_size = 3>>> print(smoothed) # 输出: [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] 维度: [8]"""# 边界检查:如果数据长度小于窗口大小,直接返回原数据# 这种情况下无法计算移动平均值# data维度: [num_episodes], window_size维度: 标量if len(data) < window_size:return data# 初始化移动平均值列表# moving_avg维度: 最终为[len(data) - window_size + 1]moving_avg = []# 遍历数据,计算每个窗口的移动平均值# i的取值范围: 0 到 len(data) - window_size# 循环次数: len(data) - window_size + 1# 每次循环处理一个滑动窗口位置for i in range(len(data) - window_size + 1):# 提取当前窗口内的数据切片# window_data维度: [window_size]# 包含从索引i开始的连续window_size个元素# 例如:当i=0, window_size=3时,提取data[0:3]window_data = data[i:i + window_size]# 计算当前窗口内数据的算术平均值# np.mean(window_data)维度: 标量# 将平均值添加到结果列表中moving_avg.append(np.mean(window_data))# 返回移动平均后的数据列表# moving_avg维度: [len(data) - window_size + 1]return moving_avg
案例一:TRPO 解决离散动作问题 (CartPole-v1)
我们首先从相对简单的离散动作环境 CartPole-v1 开始。
1. 环境初始化
我们使用 OpenAI Gym 库来创建 CartPole-v1 环境。该环境的状态是4维连续向量,动作是2维离散值(向左或向右推小车)。
"""
强化学习环境初始化模块
用于创建和配置OpenAI Gym环境
"""import gym# 环境配置
# 定义要使用的强化学习环境名称
# CartPole-v1是经典的平衡杆控制问题:
# - 状态空间:4维连续空间(车位置、车速度、杆角度、杆角速度)
# - 动作空间:2维离散空间(向左推车、向右推车)
# - 目标:保持杆子平衡尽可能长的时间
# env_name维度: 标量(字符串)
env_name = 'CartPole-v1'# 创建强化学习环境实例
# gym.make()函数根据环境名称创建对应的环境对象
# 该环境对象包含了状态空间、动作空间、奖励函数等定义
# env维度: gym.Env对象(包含状态空间[4]和动作空间[2]的环境实例)
# env.observation_space.shape: (4,) - 观测状态维度
# env.action_space.n: 2 - 离散动作数量
env = gym.make(env_name)
2. 网络结构定义
TRPO 属于 Actor-Critic 算法家族,因此我们需要定义两个网络:策略网络(Actor)和价值网络(Critic)。
PolicyNet
(策略网络):输入是环境状态,输出是每个离散动作的选择概率分布。我们使用Softmax
函数来确保输出是合法的概率分布。ValueNet
(价值网络):输入是环境状态,输出是一个标量,代表该状态的价值估计 V(s)。
"""
TRPO(Trust Region Policy Optimization)智能体实现
包含策略网络、价值网络和TRPO算法的完整实现
"""import torch
import torch.nn.functional as F
import numpy as np
import copyclass PolicyNet(torch.nn.Module):"""策略网络,面向离散动作空间使用神经网络来学习状态到动作概率分布的映射,输出每个动作的选择概率,用于策略梯度算法。"""def __init__(self, state_dim, hidden_dim, action_dim):"""初始化策略网络参数:state_dim (int): 状态空间维度,维度: 标量表示环境状态向量的长度hidden_dim (int): 隐藏层神经元数量,维度: 标量控制网络的表达能力action_dim (int): 动作空间维度,维度: 标量表示可选择的离散动作数量"""super(PolicyNet, self).__init__()# 第一层全连接层:状态输入到隐藏层# 权重维度: [hidden_dim, state_dim]# 偏置维度: [hidden_dim]self.fc1 = torch.nn.Linear(state_dim, hidden_dim)# 第二层全连接层:隐藏层到动作输出# 权重维度: [action_dim, hidden_dim] # 偏置维度: [action_dim]self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):"""前向传播计算动作概率分布参数:x (torch.Tensor): 输入状态,维度: [batch_size, state_dim]返回:torch.Tensor: 动作概率分布,维度: [batch_size, action_dim]每行是一个概率分布,所有元素和为1"""# 第一层线性变换后应用ReLU激活函数# x维度: [batch_size, state_dim] -> [batch_size, hidden_dim]x = F.relu(self.fc1(x))# 第二层线性变换后应用Softmax得到概率分布# x维度: [batch_size, hidden_dim] -> [batch_size, action_dim]# dim=1表示在动作维度上进行softmax归一化return F.softmax(self.fc2(x), dim=1)class ValueNet(torch.nn.Module):"""价值网络,对状态进行价值评估学习状态价值函数V(s),用于计算优势函数和TD误差,优势函数用于计算g是Actor-Critic算法中的Critic部分。"""def __init__(self, state_dim, hidden_dim):"""初始化价值网络参数:state_dim (int): 状态空间维度,维度: 标量hidden_dim (int): 隐藏层神经元数量,维度: 标量"""super(ValueNet, self).__init__()# 第一层全连接层:状态输入到隐藏层# 权重维度: [hidden_dim, state_dim]# 偏置维度: [hidden_dim]self.fc1 = torch.nn.Linear(state_dim, hidden_dim)# 第二层全连接层:隐藏层到价值输出(单个标量值)# 权重维度: [1, hidden_dim]# 偏置维度: [1]self.fc2 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):"""前向传播计算状态价值参数:x (torch.Tensor): 输入状态,维度: [batch_size, state_dim]返回:torch.Tensor: 状态价值估计,维度: [batch_size, 1]每个状态对应一个价值估计"""# 第一层线性变换后应用ReLU激活函数# x维度: [batch_size, state_dim] -> [batch_size, hidden_dim]x = F.relu(self.fc1(x))# 第二层线性变换得到价值估计(无激活函数)# x维度: [batch_size, hidden_dim] -> [batch_size, 1]return self.fc2(x)
3. TRPO 智能体实现
这是整个项目的核心。TRPO
类封装了算法的所有逻辑。
__init__
: 初始化 Actor 和 Critic 网络、Critic 的优化器(注意,Actor 的参数不通过传统优化器更新)以及 TRPO 的各项超参数。take_action
: 根据策略网络输出的概率分布,采样一个动作。hessian_matrix_vector_product
: 这是 TRPO 的一个关键技巧。为了避免直接计算和求逆复杂的 Hessian 矩阵(KL 散度对策略参数的二阶导数矩阵 H),我们只计算它与一个向量v
的乘积Hv
。这可以通过两次自动微分高效完成。conjugate_gradient
: 共轭梯度法。这是一个迭代算法,用于高效地求解线性方程组Hx = g
,从而找到策略更新的“自然梯度”方向x = H⁻¹g
。compute_surrogate_obj
: 计算策略优化的代理目标函数,即带重要性采样修正的优势函数期望。line_search
: 在共轭梯度法找到的更新方向上,通过线性搜索(回溯法)找到一个既能提升目标函数,又满足 KL 散度约束的最佳步长。policy_learn
: 整合上述方法,完成一次策略网络的更新。update
: 算法的整体更新流程。它接收一个回合的经验数据,计算 TD 误差、GAE 优势函数,然后分别更新价值网络(通过梯度下降)和策略网络(通过policy_learn
)。
class TRPO:"""TRPO(Trust Region Policy Optimization)算法实现TRPO是一种策略优化算法,通过限制策略更新的步长来保证训练稳定性,——KL散度用来引出H使用信任域约束和共轭梯度法来求解优化问题。"""def __init__(self, hidden_dim, state_space, action_space, lmbda,kl_constraint, alpha, critic_lr, gamma, device):"""初始化TRPO算法参数:hidden_dim (int): 隐藏层维度,维度: 标量state_space (gym.Space): 状态空间,维度: shape为[state_dim]action_space (gym.Space): 动作空间,维度: n为动作数量lmbda (float): GAE参数,维度: 标量,取值[0,1]kl_constraint (float): KL散度约束,维度: 标量alpha (float): 线性搜索参数,维度: 标量critic_lr (float): 价值网络学习率,维度: 标量gamma (float): 折扣因子,维度: 标量,取值[0,1]device (torch.device): 计算设备,维度: 标量"""# 从环境空间中提取维度信息# state_dim维度: 标量,表示状态向量长度state_dim = state_space.shape[0]# action_dim维度: 标量,表示离散动作数量action_dim = action_space.n# 初始化策略网络(Actor)# 策略网络参数不使用传统优化器,而是通过TRPO算法更新# self.actor维度: PolicyNet对象,参数总数取决于网络结构self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)# 初始化价值网络(Critic)# self.critic维度: ValueNet对象self.critic = ValueNet(state_dim, hidden_dim).to(device)# 价值网络优化器,使用Adam算法# self.critic_optimizer维度: 优化器对象self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)# 算法超参数# gamma维度: 标量,未来奖励折扣因子self.gamma = gamma# lmbda维度: 标量,GAE中的偏差-方差权衡参数self.lmbda = lmbda# kl_constraint维度: 标量,KL散度的最大允许值self.kl_constraint = kl_constraint# alpha维度: 标量,线性搜索中的步长缩减因子self.alpha = alpha# device维度: 设备对象,指定计算设备self.device = devicedef take_action(self, state):"""根据当前状态选择动作参数:state (np.ndarray): 当前状态,维度: [state_dim]返回:int: 选择的动作索引,维度: 标量"""# 将状态转换为批量格式并转换为张量# state维度: [state_dim] -> [1, state_dim]state = np.array([state])# state维度: [1, state_dim],torch.Tensor类型state = torch.tensor(state, dtype=torch.float).to(self.device)# 通过策略网络计算动作概率分布# probs维度: [1, action_dim],每个动作的选择概率probs = self.actor(state)# 创建分类分布对象用于采样# action_dist维度: Categorical分布对象action_dist = torch.distributions.Categorical(probs)# 从概率分布中采样动作# action维度: [1],包含选择的动作索引action = action_dist.sample()# 返回标量形式的动作索引# 返回值维度: 标量(整数)return action.item()def hessian_matrix_vector_product(self, states, old_action_dists, vector):"""计算Hessian矩阵与向量的乘积(用于共轭梯度法)在TRPO中,需要计算KL散度Hessian矩阵与向量的乘积,这里使用自动微分的技巧来高效计算。参数:states (torch.Tensor): 状态批次,维度: [batch_size, state_dim]old_action_dists (torch.distributions.Categorical): 旧策略分布,维度: batch_size个分布vector (torch.Tensor): 要相乘的向量,维度: [param_dim],param_dim为策略网络参数总数返回:torch.Tensor: Hessian-向量乘积,维度: [param_dim]"""# 计算新策略的动作分布# new_action_dists维度: batch_size个Categorical分布new_action_dists = torch.distributions.Categorical(self.actor(states))# 计算新旧策略之间的平均KL散度# kl维度: 标量,表示策略更新的大小kl = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists))# 计算KL散度对策略参数的梯度# kl_grad维度: 与策略网络参数结构相同的梯度元组kl_grad = torch.autograd.grad(kl,self.actor.parameters(),create_graph=True)# 将梯度展平为向量形式# kl_grad_vector维度: [param_dim],策略参数的梯度向量kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad])# 计算梯度向量与输入向量的点积# kl_grad_vector_product维度: 标量kl_grad_vector_product = torch.dot(kl_grad_vector, vector)# 计算二阶梯度(Hessian-向量乘积)# grad2维度: 与策略网络参数结构相同的二阶梯度元组grad2 = torch.autograd.grad(kl_grad_vector_product,self.actor.parameters())# 将二阶梯度展平为向量形式# grad2_vector维度: [param_dim],即Hv的结果grad2_vector = torch.cat([grad.view(-1) for grad in grad2