25

【强化学习】DQN 在运筹学中的应用

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzIwMDIzNDI2Ng%3D%3D&%3Bmid=2247488451&%3Bidx=1&%3Bsn=5d8e96f9c37addc6bac714ac2d527d7f
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

前段时间给出了 Q-Learning 在排班调度中的应用,现在给出 DQN 的实现。

1.背景

(搬运下背景)

假设有一个客服排班的任务,我们需要为 100 个人安排一个星期的排班问题,并且有以下约束条件:

  • 一天被划分为 24 个时间段,即每个时间段为 1 个小时;

  • 每个客服一个星期需要上七天班,每次上班八小时;

  • 每个客服两次上班时间需要间隔 12 小时;

  • 客服值班时,一个星期最早是 0,最晚 24*7 - 1。

评判标准:

  • 现在有每个时间段所需客服人数,我们希望每个时段排班后的人数与实际人数尽量相近。

最优化问题可以使用启发式算法来做,上次用强化学习,这次用深度强化学习。

2.代码

对 DQN 不太了解的可以去看先前的文章(我们用的是 2013 版的 DQN,没有双网络)。

相对 Q-Learning 来说,不仅改变了 Agent,还在 Env 方面做了一些改进,具体的改变可以看下代码。

import random
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from copy import deepcopy
from collections import defaultdict, deque

gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
# GPU 随使用量增长
tf.config.experimental.set_memory_growth(gpus[0], True)
# 设定最大显存
tf.config.experimental.set_virtual_device_configuration(
gpus[0],
[tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024*12)]
)

环境这里主要是改变了 list_2_str。原本的把所有的 list 拼接成 string 作为 state,现在需要把二维 list 展开成一维 list 作为神经网络的输入。

person_n = 10  # 给 10 个人排班

class Env():
def __init__(self):
# 10 个人, 7 天,每个 bar 都可以向左向右移动,也可以不移动 '-1'
self.actions_space = ['{}{}L'.format(i, j) for i in range(person_n) for j in range(7)] + \
['{}{}R'.format(i, j) for i in range(person_n) for j in range(7)] + ['-1']
self.n_actions = len(self.actions_space)
self.act_list = [random.randint(int(person_n)/2, int(person_n)) for i in range(24*7)]
self.w_list = [i / sum(self.act_list) for i in self.act_list]
self.state = [[i*24 for i in range(7)] for i in range(person_n)]
self.n_state = person_n * 7 * 24
self.punish = -1000
print(self.act_list)

def list_2_str(self, l):
# 拼接完整的 list
state_list = [[0] * 24 * 7] * person_n
for person in range(person_n):
for i in l[person]:
for j in range(8):
state_list[person][i+j] = 1
return [i for state in state_list for i in state]

def reset(self):
self.state = [[i*24 for i in range(7)] for i in range(person_n)]
return self.list_2_str(self.state)

# 给当前排班打分,考虑权重
def reward(self, tmp_state):
# 判断每个人的排班要间隔 8+12 小时,否则 socre = -1000
for i in range(person_n):
# 星期天和星期一的排班间隔 8+12 小时
if (tmp_state[i][0] + (24*7-1) - tmp_state[i][6]) < 20:
return self.punish
for j in range(6):
if (tmp_state[i][j+1] - tmp_state[i][j]) < 20:
return self.punish
# 拼接完整的 list
state_list = [[0] * 24 * 7] * person_n
for person in range(person_n):
for i in tmp_state[person]:
for j in range(8):
state_list[person][i+j] = 1
plan_list = np.sum(state_list, axis=0).tolist()
s_list = [abs(plan_list[i] - self.act_list[i])/self.act_list[i] for i in range(len(plan_list))]
# 奖励越大越好,所以加个负号
score = -np.sum([s_list[i]*self.w_list[i] for i in range(len(s_list))])
return score

def step(self, action):
actions_str = self.actions_space[action]
if actions_str == '-1':
return self.list_2_str(self.state), self.reward(self.state)
else:
num = int(actions_str[0])
day = int(actions_str[1])
move = actions_str[2]
# 这里做了点改进,不好的状态就不作为当前状态了,所以创建了一个临时状态来判断是否是好状态。
tmp_state = deepcopy(self.state)
if move == 'R':
if tmp_state[num][day] == (24*7-8-1):
tmp_state[num][day] = tmp_state[num][day] + 1
return self.list_2_str(tmp_state), self.punish
tmp_state[num][day] = tmp_state[num][day] + 1
if move == 'L':
if tmp_state[num][day] == 0:
tmp_state[num][day] = tmp_state[num][day] - 1
return self.list_2_str(tmp_state), self.punish
tmp_state[num][day] = tmp_state[num][day] - 1
reward = self.reward(tmp_state)
if reward == self.punish:
return self.list_2_str(tmp_state), self.punish
self.state = tmp_state
return self.list_2_str(self.state), self.reward(self.state)

创建 DQNAgent

class DQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=2000)
self.discount_factor = 0.9
self.epsilon = 1.0 # exploration rate
self.epsilon_min = 0.01
self.epsilon_decay = 0.999
self.learning_rate = 0.01
self.model = self._build_model()

def _build_model(self):
model = tf.keras.Sequential()
model.add(layers.Dense(512, input_dim=self.state_size, activation='relu'))
model.add(layers.BatchNormalization())
model.add(layers.Dropout(0.5))
model.add(layers.Dense(512, activation='relu'))
model.add(layers.BatchNormalization())
model.add(layers.Dropout(0.5))
model.add(layers.Dense(256, activation='relu'))
model.add(layers.BatchNormalization())
model.add(layers.Dropout(0.5))
model.add(layers.Dense(self.action_size, activation='softmax'))
model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(lr=self.learning_rate))
return model

def memorize(self, state, action, reward, next_state):
# 收集样本
self.memory.append((state, action, reward, next_state))

def get_action(self, state):
# 得到行为
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size)
act_values = self.model.predict(state)
act_values = act_values[0]
max_action = np.random.choice(np.where(act_values == np.max(act_values))[0])
return max_action # returns action

def replay(self, batch_size):
# 训练
minibatch = random.sample(self.memory, batch_size)
for state, action, reward, next_state in minibatch:
target = reward + self.discount_factor * np.amax(self.model.predict(next_state)[0])
target_f = self.model.predict(state)
target_f[0][action] = target
self.model.fit(state, target_f, epochs=1, verbose=0)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay

def load(self, name):
self.model.load_weights(name)

def save(self, name):
self.model.save_weights(name)

训练

env = Env()
agent = DQNAgent(env.n_state, env.n_actions)

episodes = 1
batch_size = 32
bst_reward = -500
bst_state = env.state

for e in range(episodes):
state = env.reset()
print('---------- ', e, ' ------------')
# 训练 1w 次就行了,比较慢
for i in range(10000):
state = np.reshape(state, [1, env.n_state])
action = agent.get_action(state)
next_state, reward = env.step(action)
next_state = np.reshape(next_state, [1, env.n_state])
# 不好的状态就不记录了
if reward != -1000:
state = deepcopy(next_state)
agent.memorize(state, action, reward, next_state)
if len(agent.memory) > batch_size:
agent.replay(batch_size)
if bst_reward < reward:
bst_reward = reward
bst_state = deepcopy(env.state)
print('episode: {}/{}, i:{}, reward: {}, e: {:.2}'.format(e, episodes, i, bst_reward, agent.epsilon))

episode: 0/1, i:0, reward: -0.7850318471337578, e: 1.0
episode: 0/1, i:1, reward: -0.7770700636942675, e: 1.0
episode: 0/1, i:2, reward: -0.7722929936305731, e: 1.0
episode: 0/1, i:3, reward: -0.7675159235668789, e: 1.0
episode: 0/1, i:4, reward: -0.7627388535031847, e: 1.0
... ...
episode: 0/1, i:2333, reward: -0.36305732484076436, e: 0.25
episode: 0/1, i:2424, reward: -0.3598726114649682, e: 0.24
episode: 0/1, i:2796, reward: -0.3535031847133758, e: 0.21
episode: 0/1, i:3179, reward: -0.3487261146496815, e: 0.19
episode: 0/1, i:7086, reward: -0.34076433121019106, e: 0.086

最终的拟合度为 0.34。

对 Q-Learning 进行了类似的修改,保证其除了 Agent 外其他策略都一致(除了迭代次数,DQN 为 1 w,Q-Learning 为 10 w),得到的结果为(比之前的 Q-Learning 效果要好很多):

episode: 0/1, i:0, reward: -0.7874015748031497, e: 0.1
episode: 0/1, i:1, reward: -0.7826771653543307, e: 0.1
episode: 0/1, i:3, reward: -0.7795275590551182, e: 0.1
episode: 0/1, i:4, reward: -0.7748031496062993, e: 0.1
episode: 0/1, i:6, reward: -0.7716535433070866, e: 0.1
episode: 0/1, i:7, reward: -0.7685039370078741, e: 0.1
... ...
episode: 0/1, i:7898, reward: -0.352755905511811, e: 0.1
episode: 0/1, i:7906, reward: -0.35118110236220473, e: 0.1
episode: 0/1, i:7916, reward: -0.34488188976377954, e: 0.1
episode: 0/1, i:8077, reward: -0.3401574803149606, e: 0.1
episode: 0/1, i:8130, reward: -0.33700787401574805, e: 0.1
episode: 0/1, i:16242, reward: -0.3291338582677166, e: 0.1

可以看到,Q-Learning 相比于 DQN 来说速度更快(1 分钟迭代 10 W 次),效果更好一点。

当然,这也只是在当前场景下,使用了简单的模型,大家可以进行更多的尝试。

此外,我还实验了用 CNN 来代替 NN,但效果不是太好(-0.44)。

后面可能会去试下 DQN 的诸多改进版。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK