reinforcement-learning

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Reinforcement Learning

强化学习

Train intelligent agents that learn optimal behavior through interaction with environments.
训练通过与环境交互来学习最优行为的智能Agent。

Quick Start

快速入门

OpenAI Gymnasium Setup

OpenAI Gymnasium 环境搭建

python
import gymnasium as gym
import numpy as np
python
import gymnasium as gym
import numpy as np

Create environment

Create environment

env = gym.make('CartPole-v1')
env = gym.make('CartPole-v1')

Environment info

Environment info

print(f"Observation space: {env.observation_space}") print(f"Action space: {env.action_space}")
print(f"Observation space: {env.observation_space}") print(f"Action space: {env.action_space}")

Basic interaction loop

Basic interaction loop

observation, info = env.reset()
for _ in range(1000): action = env.action_space.sample() # Random action observation, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
    observation, info = env.reset()
env.close()
undefined
observation, info = env.reset()
for _ in range(1000): action = env.action_space.sample() # Random action observation, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
    observation, info = env.reset()
env.close()
undefined

Q-Learning (Tabular)

表格型Q-Learning

python
import numpy as np

class QLearning:
    """Tabular Q-Learning for discrete state/action spaces"""

    def __init__(self, n_states, n_actions, lr=0.1, gamma=0.99, epsilon=1.0):
        self.q_table = np.zeros((n_states, n_actions))
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995

    def get_action(self, state):
        """Epsilon-greedy action selection"""
        if np.random.random() < self.epsilon:
            return np.random.randint(self.q_table.shape[1])
        return np.argmax(self.q_table[state])

    def update(self, state, action, reward, next_state, done):
        """Update Q-value using Bellman equation"""
        if done:
            target = reward
        else:
            target = reward + self.gamma * np.max(self.q_table[next_state])

        self.q_table[state, action] += self.lr * (target - self.q_table[state, action])

        # Decay epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
python
import numpy as np

class QLearning:
    """Tabular Q-Learning for discrete state/action spaces"""

    def __init__(self, n_states, n_actions, lr=0.1, gamma=0.99, epsilon=1.0):
        self.q_table = np.zeros((n_states, n_actions))
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995

    def get_action(self, state):
        """Epsilon-greedy action selection"""
        if np.random.random() < self.epsilon:
            return np.random.randint(self.q_table.shape[1])
        return np.argmax(self.q_table[state])

    def update(self, state, action, reward, next_state, done):
        """Update Q-value using Bellman equation"""
        if done:
            target = reward
        else:
            target = reward + self.gamma * np.max(self.q_table[next_state])

        self.q_table[state, action] += self.lr * (target - self.q_table[state, action])

        # Decay epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

Training loop

Training loop

env = gym.make('FrozenLake-v1') agent = QLearning(n_states=16, n_actions=4)
for episode in range(10000): state, _ = env.reset() total_reward = 0
while True:
    action = agent.get_action(state)
    next_state, reward, terminated, truncated, _ = env.step(action)
    agent.update(state, action, reward, next_state, terminated)

    total_reward += reward
    state = next_state

    if terminated or truncated:
        break
undefined
env = gym.make('FrozenLake-v1') agent = QLearning(n_states=16, n_actions=4)
for episode in range(10000): state, _ = env.reset() total_reward = 0
while True:
    action = agent.get_action(state)
    next_state, reward, terminated, truncated, _ = env.step(action)
    agent.update(state, action, reward, next_state, terminated)

    total_reward += reward
    state = next_state

    if terminated or truncated:
        break
undefined

Deep Q-Network (DQN)

深度Q网络(DQN)

python
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

class DQN(nn.Module):
    """Deep Q-Network"""

    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(DQN, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )

    def forward(self, x):
        return self.network(x)


class ReplayBuffer:
    """Experience replay buffer"""

    def __init__(self, capacity=100000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            torch.FloatTensor(states),
            torch.LongTensor(actions),
            torch.FloatTensor(rewards),
            torch.FloatTensor(next_states),
            torch.FloatTensor(dones)
        )

    def __len__(self):
        return len(self.buffer)


class DQNAgent:
    """DQN Agent with target network and experience replay"""

    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99,
                 epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay

        # Networks
        self.policy_net = DQN(state_dim, action_dim)
        self.target_net = DQN(state_dim, action_dim)
        self.target_net.load_state_dict(self.policy_net.state_dict())

        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.buffer = ReplayBuffer()

    def get_action(self, state):
        if np.random.random() < self.epsilon:
            return np.random.randint(self.action_dim)

        with torch.no_grad():
            state = torch.FloatTensor(state).unsqueeze(0)
            q_values = self.policy_net(state)
            return q_values.argmax().item()

    def train(self, batch_size=64):
        if len(self.buffer) < batch_size:
            return

        states, actions, rewards, next_states, dones = self.buffer.sample(batch_size)

        # Current Q values
        current_q = self.policy_net(states).gather(1, actions.unsqueeze(1))

        # Target Q values
        with torch.no_grad():
            next_q = self.target_net(next_states).max(1)[0]
            target_q = rewards + self.gamma * next_q * (1 - dones)

        # Loss
        loss = nn.MSELoss()(current_q.squeeze(), target_q)

        # Optimize
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Decay epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def update_target(self):
        """Update target network"""
        self.target_net.load_state_dict(self.policy_net.state_dict())
python
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

class DQN(nn.Module):
    """Deep Q-Network"""

    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(DQN, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )

    def forward(self, x):
        return self.network(x)


class ReplayBuffer:
    """Experience replay buffer"""

    def __init__(self, capacity=100000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            torch.FloatTensor(states),
            torch.LongTensor(actions),
            torch.FloatTensor(rewards),
            torch.FloatTensor(next_states),
            torch.FloatTensor(dones)
        )

    def __len__(self):
        return len(self.buffer)


class DQNAgent:
    """DQN Agent with target network and experience replay"""

    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99,
                 epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay

        # Networks
        self.policy_net = DQN(state_dim, action_dim)
        self.target_net = DQN(state_dim, action_dim)
        self.target_net.load_state_dict(self.policy_net.state_dict())

        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.buffer = ReplayBuffer()

    def get_action(self, state):
        if np.random.random() < self.epsilon:
            return np.random.randint(self.action_dim)

        with torch.no_grad():
            state = torch.FloatTensor(state).unsqueeze(0)
            q_values = self.policy_net(state)
            return q_values.argmax().item()

    def train(self, batch_size=64):
        if len(self.buffer) < batch_size:
            return

        states, actions, rewards, next_states, dones = self.buffer.sample(batch_size)

        # Current Q values
        current_q = self.policy_net(states).gather(1, actions.unsqueeze(1))

        # Target Q values
        with torch.no_grad():
            next_q = self.target_net(next_states).max(1)[0]
            target_q = rewards + self.gamma * next_q * (1 - dones)

        # Loss
        loss = nn.MSELoss()(current_q.squeeze(), target_q)

        # Optimize
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Decay epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def update_target(self):
        """Update target network"""
        self.target_net.load_state_dict(self.policy_net.state_dict())

Policy Gradient Methods

策略梯度方法

REINFORCE

REINFORCE算法

python
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical

class PolicyNetwork(nn.Module):
    """Policy network for REINFORCE"""

    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        return self.network(x)

    def get_action(self, state):
        probs = self.forward(torch.FloatTensor(state))
        dist = Categorical(probs)
        action = dist.sample()
        return action.item(), dist.log_prob(action)


class REINFORCE:
    """REINFORCE with baseline"""

    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99):
        self.policy = PolicyNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
        self.gamma = gamma

    def compute_returns(self, rewards):
        """Compute discounted returns"""
        returns = []
        G = 0
        for r in reversed(rewards):
            G = r + self.gamma * G
            returns.insert(0, G)

        returns = torch.tensor(returns)
        # Normalize for stable training
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)
        return returns

    def update(self, log_probs, rewards):
        returns = self.compute_returns(rewards)
        log_probs = torch.stack(log_probs)

        # Policy gradient loss
        loss = -(log_probs * returns).mean()

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
python
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical

class PolicyNetwork(nn.Module):
    """Policy network for REINFORCE"""

    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        return self.network(x)

    def get_action(self, state):
        probs = self.forward(torch.FloatTensor(state))
        dist = Categorical(probs)
        action = dist.sample()
        return action.item(), dist.log_prob(action)


class REINFORCE:
    """REINFORCE with baseline"""

    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99):
        self.policy = PolicyNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
        self.gamma = gamma

    def compute_returns(self, rewards):
        """Compute discounted returns"""
        returns = []
        G = 0
        for r in reversed(rewards):
            G = r + self.gamma * G
            returns.insert(0, G)

        returns = torch.tensor(returns)
        # Normalize for stable training
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)
        return returns

    def update(self, log_probs, rewards):
        returns = self.compute_returns(rewards)
        log_probs = torch.stack(log_probs)

        # Policy gradient loss
        loss = -(log_probs * returns).mean()

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

Training

Training

agent = REINFORCE(state_dim=4, action_dim=2)
for episode in range(1000): state, _ = env.reset() log_probs = [] rewards = []
while True:
    action, log_prob = agent.policy.get_action(state)
    next_state, reward, terminated, truncated, _ = env.step(action)

    log_probs.append(log_prob)
    rewards.append(reward)

    state = next_state
    if terminated or truncated:
        break

agent.update(log_probs, rewards)
undefined
agent = REINFORCE(state_dim=4, action_dim=2)
for episode in range(1000): state, _ = env.reset() log_probs = [] rewards = []
while True:
    action, log_prob = agent.policy.get_action(state)
    next_state, reward, terminated, truncated, _ = env.step(action)

    log_probs.append(log_prob)
    rewards.append(reward)

    state = next_state
    if terminated or truncated:
        break

agent.update(log_probs, rewards)
undefined

Proximal Policy Optimization (PPO)

近端策略优化(PPO)

python
import torch
import torch.nn as nn
import torch.optim as optim

class ActorCritic(nn.Module):
    """Actor-Critic network for PPO"""

    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super().__init__()

        # Shared feature extractor
        self.features = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU()
        )

        # Actor (policy)
        self.actor = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1)
        )

        # Critic (value function)
        self.critic = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, x):
        features = self.features(x)
        return self.actor(features), self.critic(features)


class PPO:
    """Proximal Policy Optimization"""

    def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
                 clip_ratio=0.2, epochs=10, batch_size=64):
        self.model = ActorCritic(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.gamma = gamma
        self.clip_ratio = clip_ratio
        self.epochs = epochs
        self.batch_size = batch_size

    def compute_gae(self, rewards, values, dones, gamma=0.99, lam=0.95):
        """Generalized Advantage Estimation"""
        advantages = []
        gae = 0

        for t in reversed(range(len(rewards))):
            if t == len(rewards) - 1:
                next_value = 0
            else:
                next_value = values[t + 1]

            delta = rewards[t] + gamma * next_value * (1 - dones[t]) - values[t]
            gae = delta + gamma * lam * (1 - dones[t]) * gae
            advantages.insert(0, gae)

        return torch.tensor(advantages)

    def update(self, states, actions, old_log_probs, returns, advantages):
        """PPO update with clipping"""

        for _ in range(self.epochs):
            # Get current policy outputs
            probs, values = self.model(states)
            dist = Categorical(probs)
            log_probs = dist.log_prob(actions)
            entropy = dist.entropy().mean()

            # Ratio for PPO clipping
            ratio = torch.exp(log_probs - old_log_probs)

            # Clipped surrogate loss
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.clip_ratio,
                               1 + self.clip_ratio) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()

            # Critic loss
            critic_loss = nn.MSELoss()(values.squeeze(), returns)

            # Total loss with entropy bonus
            loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy

            self.optimizer.zero_grad()
            loss.backward()
            nn.utils.clip_grad_norm_(self.model.parameters(), 0.5)
            self.optimizer.step()
python
import torch
import torch.nn as nn
import torch.optim as optim

class ActorCritic(nn.Module):
    """Actor-Critic network for PPO"""

    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super().__init__()

        # Shared feature extractor
        self.features = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU()
        )

        # Actor (policy)
        self.actor = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1)
        )

        # Critic (value function)
        self.critic = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, x):
        features = self.features(x)
        return self.actor(features), self.critic(features)


class PPO:
    """Proximal Policy Optimization"""

    def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
                 clip_ratio=0.2, epochs=10, batch_size=64):
        self.model = ActorCritic(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.gamma = gamma
        self.clip_ratio = clip_ratio
        self.epochs = epochs
        self.batch_size = batch_size

    def compute_gae(self, rewards, values, dones, gamma=0.99, lam=0.95):
        """Generalized Advantage Estimation"""
        advantages = []
        gae = 0

        for t in reversed(range(len(rewards))):
            if t == len(rewards) - 1:
                next_value = 0
            else:
                next_value = values[t + 1]

            delta = rewards[t] + gamma * next_value * (1 - dones[t]) - values[t]
            gae = delta + gamma * lam * (1 - dones[t]) * gae
            advantages.insert(0, gae)

        return torch.tensor(advantages)

    def update(self, states, actions, old_log_probs, returns, advantages):
        """PPO update with clipping"""

        for _ in range(self.epochs):
            # Get current policy outputs
            probs, values = self.model(states)
            dist = Categorical(probs)
            log_probs = dist.log_prob(actions)
            entropy = dist.entropy().mean()

            # Ratio for PPO clipping
            ratio = torch.exp(log_probs - old_log_probs)

            # Clipped surrogate loss
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.clip_ratio,
                               1 + self.clip_ratio) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()

            # Critic loss
            critic_loss = nn.MSELoss()(values.squeeze(), returns)

            # Total loss with entropy bonus
            loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy

            self.optimizer.zero_grad()
            loss.backward()
            nn.utils.clip_grad_norm_(self.model.parameters(), 0.5)
            self.optimizer.step()

Multi-Agent RL

多智能体强化学习

python
class MultiAgentEnv:
    """Simple multi-agent environment wrapper"""

    def __init__(self, n_agents, env_fn):
        self.n_agents = n_agents
        self.envs = [env_fn() for _ in range(n_agents)]

    def reset(self):
        return [env.reset()[0] for env in self.envs]

    def step(self, actions):
        results = [env.step(a) for env, a in zip(self.envs, actions)]
        observations = [r[0] for r in results]
        rewards = [r[1] for r in results]
        dones = [r[2] or r[3] for r in results]
        return observations, rewards, dones


class IndependentLearners:
    """Independent Q-learning agents"""

    def __init__(self, n_agents, state_dim, action_dim):
        self.agents = [
            DQNAgent(state_dim, action_dim)
            for _ in range(n_agents)
        ]

    def get_actions(self, observations):
        return [agent.get_action(obs)
                for agent, obs in zip(self.agents, observations)]

    def train(self):
        for agent in self.agents:
            agent.train()
python
class MultiAgentEnv:
    """Simple multi-agent environment wrapper"""

    def __init__(self, n_agents, env_fn):
        self.n_agents = n_agents
        self.envs = [env_fn() for _ in range(n_agents)]

    def reset(self):
        return [env.reset()[0] for env in self.envs]

    def step(self, actions):
        results = [env.step(a) for env, a in zip(self.envs, actions)]
        observations = [r[0] for r in results]
        rewards = [r[1] for r in results]
        dones = [r[2] or r[3] for r in results]
        return observations, rewards, dones


class IndependentLearners:
    """Independent Q-learning agents"""

    def __init__(self, n_agents, state_dim, action_dim):
        self.agents = [
            DQNAgent(state_dim, action_dim)
            for _ in range(n_agents)
        ]

    def get_actions(self, observations):
        return [agent.get_action(obs)
                for agent, obs in zip(self.agents, observations)]

    def train(self):
        for agent in self.agents:
            agent.train()

Reward Shaping

奖励塑造

python
def shape_reward(reward, state, next_state, done, info):
    """Design better reward signals"""

    shaped_reward = reward

    # Progress reward (encourage forward movement)
    if 'x_position' in info:
        progress = info['x_position'] - info.get('prev_x', 0)
        shaped_reward += 0.1 * progress

    # Survival bonus
    if not done:
        shaped_reward += 0.01

    # Penalty for dangerous states
    if 'danger_zone' in info and info['danger_zone']:
        shaped_reward -= 0.5

    # Goal proximity reward
    if 'goal_distance' in info:
        shaped_reward += 0.1 * (1.0 / (info['goal_distance'] + 1))

    return shaped_reward
python
def shape_reward(reward, state, next_state, done, info):
    """Design better reward signals"""

    shaped_reward = reward

    # Progress reward (encourage forward movement)
    if 'x_position' in info:
        progress = info['x_position'] - info.get('prev_x', 0)
        shaped_reward += 0.1 * progress

    # Survival bonus
    if not done:
        shaped_reward += 0.01

    # Penalty for dangerous states
    if 'danger_zone' in info and info['danger_zone']:
        shaped_reward -= 0.5

    # Goal proximity reward
    if 'goal_distance' in info:
        shaped_reward += 0.1 * (1.0 / (info['goal_distance'] + 1))

    return shaped_reward

Curriculum learning

Curriculum learning

class CurriculumEnv: """Environment with difficulty progression"""
def __init__(self, base_env, difficulty_schedule):
    self.env = base_env
    self.schedule = difficulty_schedule
    self.current_level = 0
    self.episode_count = 0

def reset(self):
    self.episode_count += 1

    # Increase difficulty based on schedule
    if self.episode_count in self.schedule:
        self.current_level += 1
        self._update_difficulty()

    return self.env.reset()

def _update_difficulty(self):
    # Modify environment parameters
    pass
undefined
class CurriculumEnv: """Environment with difficulty progression"""
def __init__(self, base_env, difficulty_schedule):
    self.env = base_env
    self.schedule = difficulty_schedule
    self.current_level = 0
    self.episode_count = 0

def reset(self):
    self.episode_count += 1

    # Increase difficulty based on schedule
    if self.episode_count in self.schedule:
        self.current_level += 1
        self._update_difficulty()

    return self.env.reset()

def _update_difficulty(self):
    # Modify environment parameters
    pass
undefined

Stable Baselines3 (Production Ready)

Stable Baselines3(生产可用)

python
from stable_baselines3 import PPO, DQN, A2C
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.callbacks import EvalCallback
python
from stable_baselines3 import PPO, DQN, A2C
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.callbacks import EvalCallback

Vectorized environments for parallel training

Vectorized environments for parallel training

def make_env(): return gym.make('CartPole-v1')
env = DummyVecEnv([make_env for _ in range(4)])
def make_env(): return gym.make('CartPole-v1')
env = DummyVecEnv([make_env for _ in range(4)])

Train PPO agent

Train PPO agent

model = PPO( 'MlpPolicy', env, learning_rate=3e-4, n_steps=2048, batch_size=64, n_epochs=10, gamma=0.99, gae_lambda=0.95, clip_range=0.2, verbose=1, tensorboard_log="./ppo_logs/" )
model = PPO( 'MlpPolicy', env, learning_rate=3e-4, n_steps=2048, batch_size=64, n_epochs=10, gamma=0.99, gae_lambda=0.95, clip_range=0.2, verbose=1, tensorboard_log="./ppo_logs/" )

Evaluation callback

Evaluation callback

eval_env = gym.make('CartPole-v1') eval_callback = EvalCallback( eval_env, best_model_save_path='./best_model/', log_path='./logs/', eval_freq=1000, n_eval_episodes=10 )
eval_env = gym.make('CartPole-v1') eval_callback = EvalCallback( eval_env, best_model_save_path='./best_model/', log_path='./logs/', eval_freq=1000, n_eval_episodes=10 )

Train

Train

model.learn(total_timesteps=100000, callback=eval_callback)
model.learn(total_timesteps=100000, callback=eval_callback)

Save and load

Save and load

model.save("ppo_cartpole") model = PPO.load("ppo_cartpole")
model.save("ppo_cartpole") model = PPO.load("ppo_cartpole")

Inference

Inference

obs = env.reset() for _ in range(1000): action, _ = model.predict(obs, deterministic=True) obs, reward, done, info = env.step(action)
undefined
obs = env.reset() for _ in range(1000): action, _ = model.predict(obs, deterministic=True) obs, reward, done, info = env.step(action)
undefined

Hyperparameter Tuning

超参数调优

python
undefined
python
undefined

Common hyperparameter ranges

Common hyperparameter ranges

rl_hyperparameters = { "learning_rate": [1e-4, 3e-4, 1e-3], "gamma": [0.95, 0.99, 0.999], "batch_size": [32, 64, 128, 256], "n_steps": [128, 256, 512, 2048], "clip_range": [0.1, 0.2, 0.3], "entropy_coef": [0.0, 0.01, 0.05], "hidden_sizes": [(64, 64), (128, 128), (256, 256)] }
rl_hyperparameters = { "learning_rate": [1e-4, 3e-4, 1e-3], "gamma": [0.95, 0.99, 0.999], "batch_size": [32, 64, 128, 256], "n_steps": [128, 256, 512, 2048], "clip_range": [0.1, 0.2, 0.3], "entropy_coef": [0.0, 0.01, 0.05], "hidden_sizes": [(64, 64), (128, 128), (256, 256)] }

Optuna tuning

Optuna tuning

import optuna
def objective(trial): lr = trial.suggest_float('lr', 1e-5, 1e-2, log=True) gamma = trial.suggest_float('gamma', 0.9, 0.9999) n_steps = trial.suggest_int('n_steps', 128, 2048, step=128)
model = PPO('MlpPolicy', env, learning_rate=lr,
            gamma=gamma, n_steps=n_steps)
model.learn(total_timesteps=50000)

# Evaluate
mean_reward = evaluate_policy(model, eval_env, n_eval_episodes=10)
return mean_reward
study = optuna.create_study(direction='maximize') study.optimize(objective, n_trials=50)
undefined
import optuna
def objective(trial): lr = trial.suggest_float('lr', 1e-5, 1e-2, log=True) gamma = trial.suggest_float('gamma', 0.9, 0.9999) n_steps = trial.suggest_int('n_steps', 128, 2048, step=128)
model = PPO('MlpPolicy', env, learning_rate=lr,
            gamma=gamma, n_steps=n_steps)
model.learn(total_timesteps=50000)

# Evaluate
mean_reward = evaluate_policy(model, eval_env, n_eval_episodes=10)
return mean_reward
study = optuna.create_study(direction='maximize') study.optimize(objective, n_trials=50)
undefined

Common Issues & Solutions

常见问题与解决方案

Issue: Training instability
Solutions:
- Reduce learning rate
- Increase batch size
- Use gradient clipping
- Normalize observations and rewards
- Use proper random seeds
Issue: Poor exploration
Solutions:
- Increase epsilon/entropy
- Use curiosity-driven exploration
- Add noise to actions (Gaussian, OU)
- Use count-based exploration bonus
Issue: Reward hacking
Solutions:
- Careful reward design
- Use sparse rewards when possible
- Test with adversarial evaluation
- Monitor for unexpected behaviors
问题:训练不稳定
解决方案:
- 降低学习率
- 增大批次大小
- 使用梯度裁剪
- 归一化观测数据和奖励
- 设置合适的随机种子
问题:探索能力不足
解决方案:
- 增大epsilon/熵值
- 使用好奇心驱动的探索
- 为动作添加噪声(高斯噪声、OU噪声)
- 使用基于计数的探索奖励
问题:奖励欺骗
解决方案:
- 谨慎设计奖励函数
- 尽可能使用稀疏奖励
- 通过对抗性评估进行测试
- 监控异常行为

Best Practices

最佳实践

  1. Environment: Verify env correctness before training
  2. Normalization: Normalize states and rewards
  3. Logging: Track episode rewards, lengths, losses
  4. Reproducibility: Set seeds for all random sources
  5. Evaluation: Separate eval environment, many episodes
  6. Hyperparameters: Start with known good defaults
  7. Baseline: Compare against random policy
  1. 环境验证:训练前确认环境的正确性
  2. 归一化处理:对状态和奖励进行归一化
  3. 日志记录:跟踪每轮奖励、时长、损失等指标
  4. 可复现性:为所有随机源设置种子
  5. 评估机制:使用独立的评估环境,进行多轮测试
  6. 超参数选择:从已知的优质默认值开始
  7. 基准对比:与随机策略进行性能对比