前言
我发现在csdn写完一篇文章越来越难了,有n篇写了一半没往下写。原来我觉得补完203篇,凑到一千篇是个很简单的事,没想到还挺难的。
我回想了一下,过去一年大模型领域继续发生这很剧烈的变化,这是一种新的模式 ,和原来慢慢写一篇技术文章其实是冲突的。而我在过去一年里,半是因为工作的压力,不得不去适应;另一半则是我也认同这种变化,正在适应和迁移。
写文档的本质是梳理自己的知识,并进行传播。
但是大模型太强了,随之产生的一系列生态使得再去慢慢写文章显得奢侈和过时。我觉得我还是会补完剩下的170篇的,但是进度可能会慢一些。而且更多的是关于算法领域的,这方面大模型还差很远,我暂时不会和大模型协同。过去的什么工具部署、使用啥的,我觉得个人已经完全没有必要那么详细的去写细节,而是去了解原理。
下面的文字部分全部手打,代码部分逐行扫过,所以说起来是挺奢侈的。(现在一般是让大模型写,然后测一下,给个报告)
强化学习
介绍啥的也不想说了,反正大模型都懂。这是我觉得非常关键的一步,但迟迟不得门而入。
所以先从QLearning开始,QLearning又从gym开始。这个项目最初是openai做的,当时的确挺open的。
下面这段代码是基于平衡车做的一个基准版本。env代表了环境,env的step方法会对每次的action作出评估:
- 1 next_state : 表示执行了 action 之后,环境当前的新状态。
- 2 reward: agent 执行该动作之后,环境给予的即时奖励。
- 3 terminated: 指示该回合是否自然结束(如达到目标、失败等)
- 4 truncated: 指示该回合是否被人为打断(如超过最大步数、时间限制等)。
- 5 info :包含调试信息或额外状态(例如奖励结构分解、中间结果、原始 observation)。
下面做了一个特别简单(机械)的方法,如果杆子向右倾斜,小车向右移动,如果杆子向左倾斜,则车子向左移动。
import gym# 创建环境(这里使用经典的 CartPole 平衡问题)
env = gym.make('CartPole-v1', render_mode='human') # render_mode='human' 表示可视化# 初始化环境,返回初始状态
state, _ = env.reset()
print("初始状态:", state)# 第一次执行随机动作获取初始状态
action = env.action_space.sample()
next_state, reward, terminated, truncated, info = env.step(action)
print(f"Step 0: 动作={action}, 奖励={reward}, 新状态={next_state}")# 基于状态的控制策略
for t in range(1, 200): # 最多运行 200 步# env.render() # 渲染当前环境(可视化)# 基于杆子角度的简单控制策略if next_state[2] > 0: # 杆子向右倾斜action = 1 # 向右移动else: # 杆子向左倾斜action = 0 # 向左移动# 执行动作,返回:新状态、奖励、是否终止、额外信息next_state, reward, terminated, truncated, info = env.step(action)print(f"Step {t}: 动作={action}, 奖励={reward}, 新状态={next_state}")if terminated or truncated:print("Episode 结束!原因:")if abs(next_state[2]) > 0.2095: # 约12度(0.2095弧度)print(f"- 杆子倾斜角度过大: {next_state[2]:.4f} 弧度(约{next_state[2]*180/3.14159:.1f}度)")if abs(next_state[0]) > 2.4:print(f"- 小车超出边界: 位置={next_state[0]:.4f}")break# 关闭环境
env.close()
这种方法大概过不了关
初始状态: (array([ 0.04398132, -0.01324874, 0.01681901, 0.01633218], dtype=float32), {})
Step 0: 动作=0, 奖励=1.0, 新状态=[ 0.04371634 -0.20860781 0.01714565 0.3142739 ]
Step 1: 动作=1, 奖励=1.0, 新状态=[ 0.03954419 -0.01373424 0.02343113 0.02704708]
Step 2: 动作=1, 奖励=1.0, 新状态=[ 0.0392695 0.181044 0.02397207 -0.25815195]
Step 3: 动作=1, 奖励=1.0, 新状态=[ 0.04289038 0.37581566 0.01880903 -0.5431784 ]
Step 4: 动作=1, 奖励=1.0, 新状态=[ 0.05040669 0.5706683 0.00794546 -0.8298761 ]
Step 5: 动作=1, 奖励=1.0, 新状态=[ 0.06182006 0.76568073 -0.00865206 -1.1200496 ]
Step 6: 动作=0, 奖励=1.0, 新状态=[ 0.07713367 0.57067335 -0.03105305 -0.83009315]
Step 7: 动作=0, 奖励=1.0, 新状态=[ 0.08854714 0.3759893 -0.04765491 -0.547336 ]
Step 8: 动作=0, 奖励=1.0, 新状态=[ 0.09606693 0.18156812 -0.05860163 -0.27004054]
Step 9: 动作=0, 奖励=1.0, 新状态=[ 0.09969829 -0.01267074 -0.06400245 0.00359858]
Step 10: 动作=0, 奖励=1.0, 新状态=[ 0.09944487 -0.20681919 -0.06393047 0.2754211 ]
Step 11: 动作=0, 奖励=1.0, 新状态=[ 0.09530849 -0.40097353 -0.05842205 0.54727495]
Step 12: 动作=0, 奖励=1.0, 新状态=[ 0.08728902 -0.59522814 -0.04747655 0.82099336]
Step 13: 动作=0, 奖励=1.0, 新状态=[ 0.07538446 -0.7896694 -0.03105668 1.0983738 ]
Step 14: 动作=0, 奖励=1.0, 新状态=[ 0.05959107 -0.98436904 -0.00908921 1.3811532 ]
Step 15: 动作=0, 奖励=1.0, 新状态=[ 0.03990369 -1.1793764 0.01853386 1.67098 ]
Step 16: 动作=1, 奖励=1.0, 新状态=[ 0.01631616 -0.9844746 0.05195345 1.3841261 ]
Step 17: 动作=1, 奖励=1.0, 新状态=[-0.00337333 -0.7900377 0.07963598 1.1081318 ]
Step 18: 动作=1, 奖励=1.0, 新状态=[-0.01917408 -0.5960475 0.10179861 0.84145695]
Step 19: 动作=1, 奖励=1.0, 新状态=[-0.03109504 -0.40245155 0.11862775 0.5824435 ]
Step 20: 动作=1, 奖励=1.0, 新状态=[-0.03914407 -0.20917389 0.13027662 0.32935935]
Step 21: 动作=1, 奖励=1.0, 新状态=[-0.04332754 -0.01612387 0.13686381 0.08043125]
Step 22: 动作=1, 奖励=1.0, 新状态=[-0.04365002 0.17679776 0.13847244 -0.16613266]
Step 23: 动作=1, 奖励=1.0, 新状态=[-0.04011406 0.3696939 0.13514978 -0.41212636]
Step 24: 动作=1, 奖励=1.0, 新状态=[-0.03272019 0.56266713 0.12690726 -0.65933347]
Step 25: 动作=1, 奖励=1.0, 新状态=[-0.02146685 0.7558158 0.11372058 -0.90951586]
Step 26: 动作=1, 奖励=1.0, 新状态=[-0.00635053 0.9492302 0.09553026 -1.1644017 ]
Step 27: 动作=1, 奖励=1.0, 新状态=[ 0.01263408 1.1429876 0.07224223 -1.4256694 ]
Step 28: 动作=1, 奖励=1.0, 新状态=[ 0.03549383 1.3371462 0.04372884 -1.694927 ]
Step 29: 动作=1, 奖励=1.0, 新状态=[ 0.06223675 1.5317371 0.0098303 -1.9736822 ]
Step 30: 动作=1, 奖励=1.0, 新状态=[ 0.09287149 1.7267541 -0.02964334 -2.2633033 ]
Step 31: 动作=0, 奖励=1.0, 新状态=[ 0.12740658 1.5319214 -0.0749094 -1.9798967 ]
Step 32: 动作=0, 奖励=1.0, 新状态=[ 0.15804501 1.3376632 -0.11450734 -1.7113292 ]
Step 33: 动作=0, 奖励=1.0, 新状态=[ 0.18479827 1.1440276 -0.14873391 -1.4563696 ]
Step 34: 动作=0, 奖励=1.0, 新状态=[ 0.20767882 0.9510109 -0.17786132 -1.2136078 ]
Step 35: 动作=0, 奖励=1.0, 新状态=[ 0.22669904 0.7585728 -0.20213346 -0.98152035]
Step 36: 动作=0, 奖励=1.0, 新状态=[ 0.2418705 0.5666487 -0.22176388 -0.7585189 ]
Episode 结束!原因:
- 杆子倾斜角度过大: -0.2218 弧度(约-12.7度)
条件 | 是否成功 |
---|---|
支撑 500 步未失败 | ✅ 是 |
中途杆倒或越界 | ❌ 否 |
使用QLearning 解决问题
结果:
...
Episode 19800: Total Reward = 366.0
Episode 19900: Total Reward = 1754.0
Episode 20000: Total Reward = 447.0
...
Step 480:
状态: 位置=-0.19, 速度=-0.03, 角度=-4.9度, 角速度=-0.03
动作: 左
----------------------------
Step 490:
状态: 位置=-0.21, 速度=-0.41, 角度=-4.9度, 角速度=0.29
动作: 左
----------------------------
Step 500:
状态: 位置=-0.31, 速度=-0.40, 角度=-0.9度, 角速度=0.13
动作: 右
----------------------------
最终结果: 平衡了 500 步
成功达到500步最大限制!
对代码进行一些剖析:
- 1 QLearning是一种离散化方法,而当前的状态数据是连续的(比如速度、角度)
用minmax方法以及指定的格子,将连续值分为若干离散状态。这种方法效率比较低,最后我会试图用矩阵计算方法改进一下。
# 离散化连续状态空间
def discretize_state(state, bins):discretized = []for i in range(len(state)):scale = (state[i] + abs(low[i])) / (high[i] - low[i])discretized.append(int(np.clip(scale * bins[i], 0, bins[i]-1)))return tuple(discretized)
环境中的状态可以这样获取(另外,数据规整化的第一步的确是映射为数值)
env = gym.make('CartPole-v1')
state = env.reset()[0]'''
获取到当前的环境的向量
array([-0.01736587, -0.04907 , -0.00341747, -0.01198332], dtype=float32)
'''
- 2 qtable的建立
QLearning的核心是QTable, QTable记录了状态 ~ 行动 ~ 价值的关联。因为名字叫Table,我也就老是想着普通的table格式。逻辑上当然没问题,
但实际上在这次探索时,我发现用tensor来记录这个表的方式更科学。
上面的表可以按下面的方式定义,其中bins列表是每个维度的离散化箱体。
# 初始化Q表
bins = [2, 2, 2, 2] # 切少一点一点,可以观察q表
q_table = np.zeros(bins + [env.action_space.n])
此时,qtable可以想象为一个高维空间,容纳了每一种可能性。这个高维空间的构造方式,则是按照每个维度的箱体数量,构造一个严格MECE的高维表,每个空间都有一个值。
所以上面环境状态和行动被归一化为了「维度」,这些维度都对应了一个「值」。这对于进一步走向底层研究还是比较好的,比如QKV,这些都采用Tensor的方式。虽然很早就知道了Tensor的概念,但是我一直用高维矩阵的方式去理解,在业务意义上立即还是不够。
下面的训练过程中,也就是在不断迭代qtable的过程。
# Q-learning训练
for episode in range(episodes):state = env.reset()[0]discretized_state = discretize_state(state, bins)done = Falsetotal_reward = 0while not done:# ε-greedy策略if np.random.random() < epsilon:action = env.action_space.sample()else:action = np.argmax(q_table[discretized_state])next_state, reward, done, _, _ = env.step(action)total_reward += rewarddiscretized_next_state = discretize_state(next_state, bins)# Q-learning更新current_q = q_table[discretized_state + (action,)]max_next_q = np.max(q_table[discretized_next_state])new_q = (1 - alpha) * current_q + alpha * (reward + gamma * max_next_q)q_table[discretized_state + (action,)] = new_q# 状态更新discretized_state = discretized_next_state# 打印每个回合的训练信息if (episode + 1) % 100 == 0:log_str = f"Episode {episode+1}: Total Reward = {total_reward}\n"print(log_str, end="")log_file.write(log_str)
原理上说:
- 1 每个回合,通过随机,或者历史经验,获取当前状态下将要采取的行动
- 2 采取改行动,获取下一步的状态,奖励,同时也要观察是否出现出现终止信号
- 3 根据贝尔曼方程,更新当前q值
- 4 将下一个状态离散化,然后开始下一次循环
技术细节上,
- 1 找最好的行动
action = np.argmax(q_table[discretized_state])
这个是把前面的维度都锁住,只留下"行动"的column,然后通过argmax方法找出最大使得值最大的动作。如果动作不止是一列,也是一组值呢?按照目前的框架,恐怕也是要映射到一列上才行。
- 2 检索当前的Q值
current_q = q_table[discretized_state + (action,)]
- 3 更新Q值
new_q = (1 - alpha) * current_q + alpha * (reward + gamma * max_next_q)
- 4 状态更新
discretized_state = discretized_next_state
对离散化方法进行一些改进
import numpy as npdef discretize_state_batch(states: np.ndarray, bins: List[int], low: np.ndarray, high: np.ndarray) -> np.ndarray:"""将一个批量的状态数组进行向量化离散化。参数:states: np.ndarray of shape (N, D),N 个状态,每个状态 D 维。bins: List[int],每一维要分多少个桶。low, high: 分别是每个维度的最小值和最大值(用于缩放)。返回:np.ndarray of shape (N, D),每一行是离散化后的状态。"""states = np.array(states, dtype=np.float32) # (N, D)bins = np.array(bins)low = np.array(low)high = np.array(high)# 归一化到 [0, 1]scale = (states - low) / (high - low + 1e-8)scale = np.clip(scale, 0, 1)# 缩放到 [0, bins[i]-1],并转 intdiscretized = (scale * bins).astype(int)discretized = np.clip(discretized, 0, bins - 1)return discretizedstate = np.array([[-0.017, -0.049, -0.003, -0.011]]) # shape (1, 4)
low = np.array([-2.4, -2.0, -0.21, -2.0])
high = np.array([2.4, 2.0, 0.21, 2.0])
bins = [6, 6, 12, 6]discrete = discretize_state_batch(state, bins, low, high)
print(discrete) # [[2 2 5 2]]
总结
到这里,基本上把QLearning的路铺好了,接下来就是开始结合实际的场景开始用熟。