9

单智能体强化学习代码学习总结

 2 years ago
source link: https://jianengli.github.io/2021/03/16/rl/sarl/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
Jianeng

单智能体强化学习代码学习总结

Created2021-03-16|Updated2021-03-23|强化学习
Word count:1.4k|Reading time:6min|Post View:

所有算法实现都是在main()实现算法与环境交换与训练。因此,我们对该函数进行抽象,获得范式。

python
def main():
'''
使用'gym'创建游戏环境
创建buffer
创建并初始化网络
初始化更新interval
为网络建立优化器
主循环(episode次数)
env重置
当done为false:
采样动作,以及对动作做必要处理
env.step(动作)【agent采取动作与env交互】
存储样本于buffer

buffer容量超过阈值:
多轮训练网络,用batch_size的样本对网络做反传,更新参数
关闭环境env
'''

本节对所有算法的伪代码对照,并进行代码学习。

2.1 DQN

NIPS 13版本的DQN:

对于CartPole问题,我们进一步简化: * 无需预处理Preprocessing。也就是直接获取观察Observation作为状态state输入。 * CartPole问题observation的四个元素分别表示:小车位置、小车速度、杆子夹角及角变化率 * 只使用最基本的MLP神经网络,而不使用卷积神经网络。

算法训练对应代码:

python
def train(q, q_target, memory, optimizer):
for i in range(10):
s,a,r,s_prime,done_mask = memory.sample(batch_size) # tensor形式,每个变量装batch_size的样本

q_out = q(s)
q_a = q_out.gather(1,a) # 求当前状态s,执行各个动作的q估计
max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1) # 取q估计最大值
target = r + gamma * max_q_prime * done_mask # 取q估计最大值,通过done_mask考虑了non terminal & terminal state
loss = F.smooth_l1_loss(q_a, target)

optimizer.zero_grad() # 把梯度置零,也就是把loss关于weight的导数变成0.
loss.backward()
optimizer.step() # Performs a single optimization step
python
class Qnet(nn.Module):
def __init__(self):
super(Qnet, self).__init__()
self.fc1 = nn.Linear(4, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, 2)

def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x

def sample_action(self, obs, epsilon):
out = self.forward(obs)
coin = random.random()
if coin < epsilon:
return random.randint(0,1)
else :
return out.argmax().item()
python
epsilon = max(0.01, 0.08 - 0.01*(n_epi/200)) 
#Linear annealing from 8% to 1%:一开始需要更多的探索,所以动作偏随机;渐渐需要动作能够有效,因此减少随机
python
done_mask = 0.0 if done else 1.0 
memory.put((s,a,r/100.0,s_prime, done_mask)) # 设置 done——mask = 1 or 0, 从buffer取样本时,能处理y target在最终state时的不同情况

2.2 Actor critic

2.3 Advantage actor critic [a2c]

算法对应代码:

python
envs = ParallelEnv(n_train_processes)

model = ActorCritic()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

step_idx = 0
s = envs.reset()
print('s',s)
while step_idx < max_train_steps:
s_lst, a_lst, r_lst, mask_lst = list(), list(), list(), list()
for _ in range(update_interval):
prob = model.pi(torch.from_numpy(s).float())
a = Categorical(prob).sample().numpy()
s_prime, r, done, info = envs.step(a)
s_lst.append(s)
a_lst.append(a)
r_lst.append(r/100.0)
mask_lst.append(1 - done)

s = s_prime
step_idx += 1

# 计算 update_interval 内 td target值 【tensor】ie update_interval = 5,torch.Size([5, 3])
s_final = torch.from_numpy(s_prime).float()
v_final = model.v(s_final).detach().clone().numpy()
td_target = compute_target(v_final, r_lst, mask_lst)

# 计算 update_interval 内 advantage
td_target_vec = td_target.reshape(-1)
s_vec = torch.tensor(s_lst).float().reshape(-1, 4) # 4 == Dimension of state
a_vec = torch.tensor(a_lst).reshape(-1).unsqueeze(1)
advantage = td_target_vec - model.v(s_vec).reshape(-1)

# actor critic loss一并计算
pi = model.pi(s_vec, softmax_dim=1)
pi_a = pi.gather(1, a_vec).reshape(-1)
loss = -(torch.log(pi_a) * advantage.detach()).mean() +\
F.smooth_l1_loss(model.v(s_vec).reshape(-1), td_target_vec)

# 梯度更新
optimizer.zero_grad()
loss.backward()
optimizer.step()

# 训练后在其他环境测试
if step_idx % PRINT_INTERVAL == 0:
test(step_idx, model)

envs.close()
python
class ActorCritic(nn.Module):
def __init__(self):
super(ActorCritic, self).__init__()
self.fc1 = nn.Linear(4, 256)
self.fc_pi = nn.Linear(256, 2)
self.fc_v = nn.Linear(256, 1)

def pi(self, x, softmax_dim=1):
x = F.relu(self.fc1(x))
x = self.fc_pi(x)
prob = F.softmax(x, dim=softmax_dim)
return prob

def v(self, x):
x = F.relu(self.fc1(x))
v = self.fc_v(x)
return v

2.4 a3c

2.5 DDPG

算法训练对应代码:

python
# 训练包括了2部分:actor 和 critic 梯度更新;target network更新参数
train(mu, mu_target, q, q_target, memory, q_optimizer, mu_optimizer)
soft_update(mu, mu_target)
soft_update(q, q_target)

def train(mu, mu_target, q, q_target, memory, q_optimizer, mu_optimizer):
s,a,r,s_prime,done_mask = memory.sample(batch_size)

target = r + gamma * q_target(s_prime, mu_target(s_prime)) * done_mask # target policy network决定了target value network
q_loss = F.smooth_l1_loss(q(s,a), target.detach()) # 从而target policy network和target value network决定了q梯度
q_optimizer.zero_grad()
q_loss.backward()
q_optimizer.step()

mu_loss = -q(s,mu(s)).mean() # That's all for the policy loss. 目标函数是q值,q越大越好
mu_optimizer.zero_grad()
mu_loss.backward()
mu_optimizer.step()

def soft_update(net, net_target):
for param_target, param in zip(net_target.parameters(), net.parameters()):
param_target.data.copy_(param_target.data * (1.0 - tau) + param.data * tau)
python
class MuNet(nn.Module):
def __init__(self):
super(MuNet, self).__init__()
self.fc1 = nn.Linear(3, 128)
self.fc2 = nn.Linear(128, 64)
self.fc_mu = nn.Linear(64, 1)

def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
mu = torch.tanh(self.fc_mu(x))*2 # Multipled by 2 because the action space of the Pendulum-v0 is [-2,2]
return mu

class QNet(nn.Module):
def __init__(self):
super(QNet, self).__init__()
self.fc_s = nn.Linear(3, 64)
self.fc_a = nn.Linear(1,64)
self.fc_q = nn.Linear(128, 32)
self.fc_out = nn.Linear(32,1)

def forward(self, x, a):
h1 = F.relu(self.fc_s(x))
h2 = F.relu(self.fc_a(a))
cat = torch.cat([h1,h2], dim=1)
q = F.relu(self.fc_q(cat))
q = self.fc_out(q)
return q
python
class OrnsteinUhlenbeckNoise:
def __init__(self, mu):
self.theta, self.dt, self.sigma = 0.1, 0.01, 0.1
self.mu = mu
self.x_prev = np.zeros_like(self.mu)

def __call__(self):
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
self.x_prev = x
return x

ou_noise = OrnsteinUhlenbeckNoise(mu=np.zeros(1))

while not done:
a = mu(torch.from_numpy(s).float())
a = a.item() + ou_noise()[0] # 根据当前policy和exploration noise选取动作

2.6 PPO

理解:critic更新方式与ac一样,loss是估计值与实际值对比,做mean squared error 反向传播梯度。

actor更新用advantage,即 每一步多少优势。对loss理解:新policy与老policy比。若优势大,新老policy比大,更新的幅度也大些,后项是减去kl penalty,新老policy差的数,差的太多,当成对loss的惩罚,限制了新策略更新幅度

load_state_dict

python
q_target.load_state_dict(q.state_dict()) 
# 将q网络中state_dict的parameters和buffers复制到q_target网络中
# 网络继承于module

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK