Python强化学习实战:从Q学习到深度强化学习

张开发
2026/4/11 22:07:46 15 分钟阅读

分享文章

Python强化学习实战:从Q学习到深度强化学习
Python强化学习实战从Q学习到深度强化学习前言大家好我是第一程序员名字大人很菜。作为一个非科班转码、正在学习Rust和Python的萌新最近我开始学习强化学习。今天我想分享一下Python强化学习的实战经验从Q学习到深度强化学习。一、强化学习基础1.1 强化学习的基本概念强化学习通过与环境交互学习最优策略的机器学习方法智能体Agent学习和执行动作的实体环境Environment智能体交互的外部世界状态State环境的当前状态动作Action智能体可以执行的操作奖励Reward智能体执行动作后获得的反馈策略Policy从状态到动作的映射价值函数评估状态或状态-动作对的价值1.2 强化学习的应用场景游戏AI训练游戏AI击败人类玩家机器人控制控制机器人完成各种任务自动驾驶训练自动驾驶系统推荐系统优化推荐策略金融交易优化交易策略二、环境搭建2.1 安装必要的库# 安装NumPy pip install numpy # 安装OpenAI Gym pip install gym # 安装PyTorch pip install torch torchvision # 安装其他库 pip install matplotlib seaborn三、Q学习3.1 Q学习的基本原理Q学习是一种基于值函数的强化学习算法通过学习状态-动作对的价值Q值来找到最优策略。3.2 实现Q学习使用Python实现Q学习import numpy as np import gym # 创建环境 env gym.make(FrozenLake-v1) # 初始化Q表 state_size env.observation_space.n action_size env.action_space.n q_table np.zeros((state_size, action_size)) # 超参数 epsilon 1.0 # 探索率 epsilon_min 0.01 epsilon_decay 0.995 gamma 0.95 # 折扣因子 learning_rate 0.1 n_episodes 10000 # 训练 for episode in range(n_episodes): state env.reset()[0] done False total_reward 0 while not done: # 选择动作 if np.random.uniform(0, 1) epsilon: action env.action_space.sample() # 探索 else: action np.argmax(q_table[state, :]) # 利用 # 执行动作 next_state, reward, done, _, _ env.step(action) # 更新Q值 old_value q_table[state, action] next_max np.max(q_table[next_state, :]) new_value (1 - learning_rate) * old_value learning_rate * (reward gamma * next_max) q_table[state, action] new_value state next_state total_reward reward # 衰减探索率 if epsilon epsilon_min: epsilon * epsilon_decay if (episode 1) % 1000 0: print(fEpisode: {episode1}, Epsilon: {epsilon:.4f}, Total Reward: {total_reward}) # 测试 state env.reset()[0] done False total_reward 0 while not done: action np.argmax(q_table[state, :]) next_state, reward, done, _, _ env.step(action) state next_state total_reward reward env.render() print(fTest Total Reward: {total_reward}) env.close()四、深度Q网络DQN4.1 DQN的基本原理DQN是一种结合深度学习和Q学习的算法使用神经网络来近似Q值函数。4.2 实现DQN使用PyTorch实现DQNimport numpy as np import gym import torch import torch.nn as nn import torch.optim as optim from collections import deque import random # 创建环境 env gym.make(CartPole-v1) # 定义DQN模型 class DQN(nn.Module): def __init__(self, state_size, action_size): super(DQN, self).__init__() self.fc1 nn.Linear(state_size, 64) self.fc2 nn.Linear(64, 64) self.fc3 nn.Linear(64, action_size) def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) x self.fc3(x) return x # 超参数 state_size env.observation_space.shape[0] action_size env.action_space.n learning_rate 0.001 gamma 0.99 epsilon 1.0 epsilon_min 0.01 epsilon_decay 0.995 batch_size 64 memory_size 10000 n_episodes 1000 # 初始化模型和优化器 policy_net DQN(state_size, action_size) target_net DQN(state_size, action_size) target_net.load_state_dict(policy_net.state_dict()) optimizer optim.Adam(policy_net.parameters(), lrlearning_rate) # 经验回放缓冲区 memory deque(maxlenmemory_size) # 选择动作 def select_action(state, epsilon): if np.random.uniform(0, 1) epsilon: return env.action_space.sample() else: with torch.no_grad(): state torch.FloatTensor(state).unsqueeze(0) q_values policy_net(state) return torch.argmax(q_values).item() # 经验回放 def replay(): if len(memory) batch_size: return batch random.sample(memory, batch_size) states, actions, rewards, next_states, dones zip(*batch) states torch.FloatTensor(states) actions torch.LongTensor(actions).unsqueeze(1) rewards torch.FloatTensor(rewards) next_states torch.FloatTensor(next_states) dones torch.FloatTensor(dones) # 计算当前Q值 current_q policy_net(states).gather(1, actions) # 计算目标Q值 with torch.no_grad(): next_q target_net(next_states).max(1)[0] target_q rewards (1 - dones) * gamma * next_q # 计算损失 loss nn.MSELoss()(current_q.squeeze(), target_q) # 优化模型 optimizer.zero_grad() loss.backward() optimizer.step() # 训练 for episode in range(n_episodes): state env.reset()[0] done False total_reward 0 while not done: action select_action(state, epsilon) next_state, reward, done, _, _ env.step(action) total_reward reward # 存储经验 memory.append((state, action, reward, next_state, done)) state next_state # 经验回放 replay() # 衰减探索率 if epsilon epsilon_min: epsilon * epsilon_decay # 更新目标网络 if (episode 1) % 10 0: target_net.load_state_dict(policy_net.state_dict()) if (episode 1) % 100 0: print(fEpisode: {episode1}, Epsilon: {epsilon:.4f}, Total Reward: {total_reward}) # 测试 state env.reset()[0] done False total_reward 0 while not done: action select_action(state, 0) # 完全利用 next_state, reward, done, _, _ env.step(action) state next_state total_reward reward env.render() print(fTest Total Reward: {total_reward}) env.close()五、深度确定性策略梯度DDPG5.1 DDPG的基本原理DDPG是一种用于连续动作空间的强化学习算法结合了确定性策略梯度和深度Q网络。5.2 实现DDPG使用PyTorch实现DDPGimport numpy as np import gym import torch import torch.nn as nn import torch.optim as optim from collections import deque import random # 创建环境 env gym.make(Pendulum-v1) # 定义Actor模型 class Actor(nn.Module): def __init__(self, state_size, action_size, action_low, action_high): super(Actor, self).__init__() self.fc1 nn.Linear(state_size, 64) self.fc2 nn.Linear(64, 64) self.fc3 nn.Linear(64, action_size) self.action_low action_low self.action_high action_high def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) x torch.tanh(self.fc3(x)) return x * (self.action_high - self.action_low) / 2 (self.action_high self.action_low) / 2 # 定义Critic模型 class Critic(nn.Module): def __init__(self, state_size, action_size): super(Critic, self).__init__() self.fc1 nn.Linear(state_size action_size, 64) self.fc2 nn.Linear(64, 64) self.fc3 nn.Linear(64, 1) def forward(self, state, action): x torch.cat([state, action], dim1) x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) x self.fc3(x) return x # 超参数 state_size env.observation_space.shape[0] action_size env.action_space.shape[0] action_low env.action_space.low[0] action_high env.action_space.high[0] learning_rate_actor 0.0001 learning_rate_critic 0.001 gamma 0.99 tau 0.001 # 软更新参数 memory_size 100000 batch_size 64 n_episodes 1000 # 初始化模型和优化器 actor Actor(state_size, action_size, action_low, action_high) target_actor Actor(state_size, action_size, action_low, action_high) target_actor.load_state_dict(actor.state_dict()) critic Critic(state_size, action_size) target_critic Critic(state_size, action_size) target_critic.load_state_dict(critic.state_dict()) optimizer_actor optim.Adam(actor.parameters(), lrlearning_rate_actor) optimizer_critic optim.Adam(critic.parameters(), lrlearning_rate_critic) # 经验回放缓冲区 memory deque(maxlenmemory_size) # 选择动作 def select_action(state, noise_scale0.1): with torch.no_grad(): state torch.FloatTensor(state).unsqueeze(0) action actor(state).item() # 添加噪声 action np.random.normal(0, noise_scale) return np.clip(action, action_low, action_high) # 经验回放 def replay(): if len(memory) batch_size: return batch random.sample(memory, batch_size) states, actions, rewards, next_states, dones zip(*batch) states torch.FloatTensor(states) actions torch.FloatTensor(actions).unsqueeze(1) rewards torch.FloatTensor(rewards).unsqueeze(1) next_states torch.FloatTensor(next_states) dones torch.FloatTensor(dones).unsqueeze(1) # 计算目标Q值 with torch.no_grad(): next_actions target_actor(next_states) target_q target_critic(next_states, next_actions) target_q rewards (1 - dones) * gamma * target_q # 计算当前Q值 current_q critic(states, actions) # 优化Critic critic_loss nn.MSELoss()(current_q, target_q) optimizer_critic.zero_grad() critic_loss.backward() optimizer_critic.step() # 优化Actor actor_loss -critic(states, actor(states)).mean() optimizer_actor.zero_grad() actor_loss.backward() optimizer_actor.step() # 软更新目标网络 for target_param, param in zip(target_actor.parameters(), actor.parameters()): target_param.data.copy_(tau * param.data (1 - tau) * target_param.data) for target_param, param in zip(target_critic.parameters(), critic.parameters()): target_param.data.copy_(tau * param.data (1 - tau) * target_param.data) # 训练 for episode in range(n_episodes): state env.reset()[0] done False total_reward 0 while not done: action select_action(state) next_state, reward, done, _, _ env.step([action]) total_reward reward # 存储经验 memory.append((state, action, reward, next_state, done)) state next_state # 经验回放 replay() if (episode 1) % 100 0: print(fEpisode: {episode1}, Total Reward: {total_reward}) # 测试 state env.reset()[0] done False total_reward 0 while not done: action select_action(state, noise_scale0) # 无噪声 next_state, reward, done, _, _ env.step([action]) state next_state total_reward reward env.render() print(fTest Total Reward: {total_reward}) env.close()六、强化学习的挑战与解决方案6.1 常见挑战探索与利用如何平衡探索新动作和利用已知的最优动作信用分配如何将奖励分配给导致该奖励的动作环境建模如何建立环境的模型样本效率如何使用有限的样本进行学习稳定性如何确保学习过程的稳定性6.2 解决方案探索与利用使用ε-贪婪策略、玻尔兹曼探索等方法信用分配使用时间差分学习、蒙特卡洛方法等环境建模使用模型-based强化学习方法样本效率使用经验回放、批量学习等方法稳定性使用目标网络、软更新等方法七、从Rust开发者角度的思考7.1 性能优化环境模拟使用Rust实现高性能的环境模拟模型推理使用Rust优化模型推理过程内存管理Rust的内存管理可以减少内存泄漏7.2 跨语言集成使用PyO3将Rust代码集成到Python中使用WebAssembly将Rust实现的强化学习功能编译为WebAssembly使用gRPC在Rust和Python之间建立通信八、总结Python强化学习实战是一个从基础到应用的过程需要掌握Q学习、深度Q网络、DDPG等算法。作为一个非科班转码者我认为通过系统学习和实践完全可以掌握强化学习技术。虽然强化学习的学习曲线比较陡峭但通过项目实践和不断积累经验你会逐渐掌握其精髓。同时结合Rust的性能优势可以进一步优化强化学习算法的性能。保持学习保持输出。虽然现在我还是个菜鸡但我相信只要坚持总有一天能成为真正的「第一程序员」

更多文章