Fork me on GitHub

强化学习-DQN

之前两篇文章介绍的内容其实都属于策略网络,即用神经网络去模拟在给定状态s下,每个动作a的执行概率。这篇用到的DQN则属于值函数网络,在这一大类里又可以分为:状态值函数和状态-动作值函数,DQN属于后者,即用神经网络去模拟在给定状态s和动作a的情况下,回报的期望。

代码参考自龙良曲的tensorflow2开源书籍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import collections
import random
import gym,os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers,optimizers,losses
env = gym.make('CartPole-v1') # 创建游戏环境
env.seed(1234)
tf.random.set_seed(1234)
np.random.seed(1234)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
# Hyperparameters
learning_rate = 0.0002
gamma = 0.99
buffer_limit = 50000
batch_size = 32
class ReplayBuffer():
# 经验回放池
def __init__(self):
# 双向队列
self.buffer = collections.deque(maxlen=buffer_limit)
def put(self, transition):
self.buffer.append(transition)
def sample(self, n):
# 从回放池采样n个5元组
mini_batch = random.sample(self.buffer, n)
s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
# 按类别进行整理
for transition in mini_batch:
s, a, r, s_prime, done_mask = transition
s_lst.append(s)
a_lst.append([a])
r_lst.append([r])
s_prime_lst.append(s_prime)
done_mask_lst.append([done_mask])
# 转换成Tensor
return tf.constant(s_lst, dtype=tf.float32),\
tf.constant(a_lst, dtype=tf.int32), \
tf.constant(r_lst, dtype=tf.float32), \
tf.constant(s_prime_lst, dtype=tf.float32), \
tf.constant(done_mask_lst, dtype=tf.float32)
def size(self):
return len(self.buffer)
class Qnet(keras.Model):
def __init__(self):
# 创建Q网络,输入为状态向量,输出为动作的Q值
super(Qnet, self).__init__()
self.fc1 = layers.Dense(256, kernel_initializer='he_normal')
self.fc2 = layers.Dense(256, kernel_initializer='he_normal')
self.fc3 = layers.Dense(2, kernel_initializer='he_normal')
def call(self, x, training=None):
x = tf.nn.relu(self.fc1(x))
x = tf.nn.relu(self.fc2(x))
x = self.fc3(x)
return x
def sample_action(self, s, epsilon):
# 送入状态向量,获取策略: [4]
s = tf.constant(s, dtype=tf.float32)
# s: [4] => [1,4]
s = tf.expand_dims(s, axis=0)
out = self(s)[0]
coin = random.random()
# 策略改进:e-贪心方式
if coin < epsilon:
# epsilon大的概率随机选取
return random.randint(0, 1)
else: # 选择Q值最大的动作
return int(tf.argmax(out))
def train(q, q_target, memory, optimizer):
# 通过Q网络和影子网络来构造贝尔曼方程的误差,
# 并只更新Q网络,影子网络的更新会滞后Q网络
huber = losses.Huber()
for i in range(10): # 训练10次
# 从缓冲池采样
s, a, r, s_prime, done_mask = memory.sample(batch_size)
with tf.GradientTape() as tape:
# s: [b, 4]
q_out = q(s) # 得到Q(s,a)的分布
# 由于TF的gather_nd与pytorch的gather功能不一样,需要构造
# gather_nd需要的坐标参数,indices:[b, 2]
# pi_a = pi.gather(1, a) # pytorch只需要一行即可实现
indices = tf.expand_dims(tf.range(a.shape[0]), axis=1)
indices = tf.concat([indices, a], axis=1)
q_a = tf.gather_nd(q_out, indices) # 动作的概率值, [b]
q_a = tf.expand_dims(q_a, axis=1) # [b]=> [b,1]
# 得到Q(s',a)的最大值,它来自影子网络! [b,4]=>[b,2]=>[b,1]
max_q_prime = tf.reduce_max(q_target(s_prime),axis=1,keepdims=True)
# 构造Q(s,a_t)的目标值,来自贝尔曼方程
target = r + gamma * max_q_prime * done_mask
# 计算Q(s,a_t)与目标值的误差
loss = huber(q_a, target)
# 更新网络,使得Q(s,a_t)估计符合贝尔曼方程
grads = tape.gradient(loss, q.trainable_variables)
optimizer.apply_gradients(zip(grads, q.trainable_variables))
def main():
env = gym.make('CartPole-v1') # 创建环境
q = Qnet() # 创建Q网络
q_target = Qnet() # 创建影子网络
q.build(input_shape=(2,4))
q_target.build(input_shape=(2,4))
for src, dest in zip(q.variables, q_target.variables):
dest.assign(src) # 影子网络权值来自Q
memory = ReplayBuffer() # 创建回放池
print_interval = 20
score = 0.0
optimizer = optimizers.Adam(lr=learning_rate)
for n_epi in range(10000): # 训练次数
# epsilon概率也会8%到1%衰减,越到后面越使用Q值最大的动作
epsilon = max(0.01, 0.08 - 0.01 * (n_epi / 200))
s = env.reset() # 复位环境
for t in range(600): # 一个回合最大时间戳
# 根据当前Q网络提取策略,并改进策略
a = q.sample_action(s, epsilon)
# 使用改进的策略与环境交互
s_prime, r, done, info = env.step(a)
done_mask = 0.0 if done else 1.0 # 结束标志掩码
# 保存5元组
memory.put((s, a, r / 100.0, s_prime, done_mask))
s = s_prime # 刷新状态
score += r # 记录总回报
if done: # 回合结束
break
if memory.size() > 2000: # 缓冲池只有大于2000就可以训练
train(q, q_target, memory, optimizer)
if n_epi % print_interval == 0 and n_epi != 0:
for src, dest in zip(q.variables, q_target.variables):
dest.assign(src) # 影子网络权值来自Q
print("# of episode :{}, avg score : {:.1f}, buffer size : {}, " \
"epsilon : {:.1f}%" \
.format(n_epi, score / print_interval, memory.size(), epsilon * 100))
score = 0.0
env.close()
if __name__ == '__main__':
main()

DQN中也是有几点要注意的:

  • 使用了 经验回放池 来减轻数据之间的强相关性,这个在第二篇文章中我们也使用了。
  • 在选择动作的时候,并不是一定选择期望回报最大的那个动作,而是有一定概率选择别的,即 $\epsilon-greedy$ 算法,这个也是蛮常用的。
  • 用到了影子网络,影子网络的更新会滞后Q网络,用来计算目标值。这是因为如果训练目标值和预测值都来自同一网络,那么数据之间就会存在很强的相关性。
  • 用的是时序差分方法(TD)而不是蒙特卡罗方法(MC)来计算回报值。这样只需要交互一步即可获得值函数的误差,并优化更新值函数网络,因此比蒙特卡罗方法计算效率更高。但是一般来说,蒙特卡罗方法得到的结果更为准确,毕竟是算到头的。
donate the author