Fork me on GitHub

强化学习-A3C

这应该是最后一篇关于强化学习的内容了,使用A3C算法玩平衡杆游戏。

代码参考自龙良曲的tensorflow2开源书籍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import matplotlib
from matplotlib import pyplot as plt
matplotlib.rcParams['font.size'] = 18
matplotlib.rcParams['figure.titlesize'] = 18
matplotlib.rcParams['figure.figsize'] = [9, 7]
matplotlib.rcParams['font.family'] = ['KaiTi']
matplotlib.rcParams['axes.unicode_minus']=False
plt.figure()
import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import threading
import gym
import multiprocessing
import numpy as np
from queue import Queue
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers,optimizers,losses
tf.random.set_seed(1231)
np.random.seed(1231)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
class ActorCritic(keras.Model):
# Actor-Critic模型
def __init__(self, state_size, action_size):
super(ActorCritic, self).__init__()
self.state_size = state_size # 状态向量长度
self.action_size = action_size # 动作数量
# 策略网络Actor
self.dense1 = layers.Dense(128, activation='relu')
self.policy_logits = layers.Dense(action_size)
# V网络Critic
self.dense2 = layers.Dense(128, activation='relu')
self.values = layers.Dense(1)
def call(self, inputs):
# 获得策略分布Pi(a|s)
x = self.dense1(inputs)
logits = self.policy_logits(x)
# 获得v(s)
v = self.dense2(inputs)
values = self.values(v)
return logits, values
def record(episode, episode_reward, worker_idx, global_ep_reward, result_queue, total_loss, num_steps):
# 统计工具函数
if global_ep_reward == 0:
global_ep_reward = episode_reward
else:
global_ep_reward = global_ep_reward * 0.99 + episode_reward * 0.01
print(
f"{episode} | "
f"Average Reward: {int(global_ep_reward)} | "
f"Episode Reward: {int(episode_reward)} | "
f"Loss: {int(total_loss / float(num_steps) * 1000) / 1000} | "
f"Steps: {num_steps} | "
f"Worker: {worker_idx}"
)
result_queue.put(global_ep_reward) # 保存回报,传给主线程
return global_ep_reward
class Memory:
def __init__(self):
self.states = []
self.actions = []
self.rewards = []
def store(self, state, action, reward):
self.states.append(state)
self.actions.append(action)
self.rewards.append(reward)
def clear(self):
self.states = []
self.actions = []
self.rewards = []
class Agent:
# 智能体,包含了中央参数网络server
def __init__(self):
# server优化器,client不需要,直接从server拉取参数
self.opt = optimizers.Adam(1e-3)
# 中央模型,类似于参数服务器
self.server = ActorCritic(4, 2) # 状态向量,动作数量
self.server(tf.random.normal((2, 4)))
def train(self):
res_queue = Queue() # 共享队列
# 创建各个交互环境
workers = [Worker(self.server, self.opt, res_queue, i) for i in range(multiprocessing.cpu_count())]
for i, worker in enumerate(workers):
print("Starting worker {}".format(i))
worker.start()
# 统计并绘制总回报曲线
returns = []
while True:
reward = res_queue.get()
if reward is not None:
returns.append(reward)
else: # 结束标志
break
[w.join() for w in workers] # 等待线程退出
print(returns)
plt.figure()
plt.plot(np.arange(len(returns)), returns)
# plt.plot(np.arange(len(moving_average_rewards)), np.array(moving_average_rewards), 's')
plt.xlabel('回合数')
plt.ylabel('总回报')
plt.savefig('a3c-tf-cartpole.svg')
class Worker(threading.Thread):
def __init__(self, server, opt, result_queue, idx):
super(Worker, self).__init__()
self.result_queue = result_queue # 共享队列
self.server = server # 中央模型
self.opt = opt # 中央优化器
self.client = ActorCritic(4, 2) # 线程私有网络
self.worker_idx = idx # 线程id
self.env = gym.make('CartPole-v1').unwrapped
self.ep_loss = 0.0
def run(self):
mem = Memory() # 每个worker自己维护一个memory
for epi_counter in range(500): # 未达到最大回合数
current_state = self.env.reset() # 复位client游戏状态
mem.clear()
ep_reward = 0.
ep_steps = 0
done = False
while not done:
# 获得Pi(a|s),未经softmax
logits, _ = self.client(tf.constant(current_state[None, :], dtype=tf.float32))
probs = tf.nn.softmax(logits)
# 随机采样动作
action = np.random.choice(2, p=probs.numpy()[0])
new_state, reward, done, _ = self.env.step(action) # 交互
ep_reward += reward # 累加奖励
mem.store(current_state, action, reward) # 记录
ep_steps += 1 # 计算回合步数
current_state = new_state # 刷新状态
if ep_steps >= 500 or done: # 最长步数500
# 计算当前client上的误差
with tf.GradientTape() as tape:
total_loss = self.compute_loss(done, new_state, mem)
# 计算误差
grads = tape.gradient(total_loss, self.client.trainable_weights)
# 梯度提交到server,在server上更新梯度
self.opt.apply_gradients(zip(grads, self.server.trainable_weights))
# 从server拉取最新的梯度
self.client.set_weights(self.server.get_weights())
mem.clear() # 清空Memory
# 统计此回合回报
self.result_queue.put(ep_reward)
print(self.worker_idx, ep_reward)
break
self.result_queue.put(None) # 结束线程
def compute_loss(self, done, new_state, memory, gamma=0.99):
if done:
reward_sum = 0. # 终止状态的v(终止)=0
else:
reward_sum = self.client(tf.constant(new_state[None, :], dtype=tf.float32))[-1].numpy()[0]
# 统计折扣回报
discounted_rewards = []
for reward in memory.rewards[::-1]: # reverse buffer r
reward_sum = reward + gamma * reward_sum
discounted_rewards.append(reward_sum)
discounted_rewards.reverse()
# 获取状态的Pi(a|s)和v(s)
logits, values = self.client(tf.constant(np.vstack(memory.states), dtype=tf.float32))
# 计算advantage = R() - v(s)
advantage = tf.constant(np.array(discounted_rewards)[:, None], dtype=tf.float32) - values
# Critic网络损失
value_loss = 0.5 * advantage ** 2
# 策略损失
policy_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=memory.actions, logits=logits)
# 计算策略网络损失时,并不会计算V网络
policy_loss = policy_loss * tf.stop_gradient(advantage)
# Entropy Bonus
entropy = tf.nn.softmax_cross_entropy_with_logits(labels=tf.nn.softmax(logits), logits=logits)
policy_loss = policy_loss - 0.01 * entropy
# 聚合各个误差
total_loss = tf.reduce_mean((value_loss + policy_loss))
return total_loss
if __name__ == '__main__':
master = Agent()
master.train()

loss由两部分组成,计算方式和第二篇其实大同小异,那篇文章里其实已经引入了Actor-Critic的思想。Actor就是策略网络(估计某个状态下每个动作的概率),Critic就是值函数网络(估计某个状态下的期望回报,也就是基准线),过程进行中得到的实际回报可以用MC或者TD计算得到。这些之前都有提到过了。

A3C算法比较特别的地方就是异步更新网络的方式,一个全局网络加若干个子网络(子网络个数一般取CPU核数),每个网络都包含自己Actor和Critic。一开始都从全局网络中拷贝参数值,各自计算好梯度信息后传回全局网络,让全局网络的参数得到更新,然后再从全局网络中拷贝参数值,往复循环。。。注意的是子网络自己只负责算梯度,自己的参数值是不会用梯度更新的。

我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=1iz0ou8sfr9ij

donate the author