AlphaZero加强版

AlphaZero加强版

AlphaZero 五子棋实现

强化学习(下文统一使用RL代替)进入大众视野应该是2016年3月,DeepMind出品AlphaGo以4:1击败世界围棋高手李世石,RL算法理论的形成却可以追溯到1980年前后。

RL有别于常用于NLP和CV领域的监督学习,监督学习中,数据由输入数据和标签(label)组成,创建于训练前,不会随着训练中的状态变化而更改相应策略,例如文本分类,提供数据(x)和对应label(y)喂入模型(f)进行训练即可, 用公式总结:y = f(x)。

RL更接近人类学习的过程,根据当前所处环境(state S代替),结合已有经验(f)制定对应策略(policy P代替),并作出价值更高的行为(action A代替),此后将会在一定时间内得到回馈(reward R代替)。

监督学习解决的是概率问题,强化学习解决的是策略问题。

RL目前已经在具有明显边界,明确规则和强定义成功失败的领域中取得了超越人类的表现,因为明显边界,S是有限的,哪怕大量,因为规则明确,制定策略并输出更有价值行为是明确的,因为强定义成功失败,R是明确的。大部分游戏具有上述特性,例如本篇文章将会使用到的CartPole和五子棋。


CartPole游戏介绍

如下动图所示,游戏目标是保持杆一直竖直朝上而不倒下,此时需要朝左右移动杆使其平衡。我们需要通过游戏返回给我们的状态(S,一个四维向量)指定策略(P)执行动作(A),取值为0或1,代表向左或向右移动,R值恒定为1,当杆倒下时游戏结束。如果杆竖直向上的时间越长,得到的R就越多。

RL的演进

Deep Q Learning

这里使用CartPole举例,首先我们转换为马尔科夫决策过程(Markov decision process, MDP),将游戏转换为 R = Q(S, A)

这里要引入衰减系数 Discount Factor:γ ∈ [0, 1],γ的引入有多种解释,我比较认可经济学角度的理解,“今天的1元钱价值大于明天的1元钱”, 这样我们的目标就转化为让游戏尽量的长,得到的R就会越大:

在游戏CartPole中我们就可以转化为:优化Q使得Q(S, A)评估当前S得到越合理的A -> 杆竖直向上的时间越长 -> 得到的R越多

如果我们穷举所有的S来使得Q得到最合理A显然是可以的,但是不是最合理的,所以这里我们就引入了深度神经网络(Deep Q)通过在部分S中学习得出合理的A去探索未知S中最合理的A

查看完整代码

思考: Deep Q Learning 本质上是一个接近于确定性输出的算法,即选取价值最大的动作。但是在某些特定状态下,很多动作的选取可能都是可以的。比如CartPole中,某一刻杆与地面成90度,这时,往左或者右移动都是可行的。因此,采用输出概率会更合理一些。而Deep Q Learning并不能直接输出动作的概率。

Policy Gradient

现在已经知道Deep Q Learning是一个基于动作价值的方法,换句话说就是通过计算每一个S下所有A的价值,然后选择价值最大的动作执行,这是一种非黑即白的做法。如下图所示,我们能不能摒弃间接得到动作改为直接获取每个动作策略价值的方法呢?

如果我们希望替换为策略网络,那我们的目标则变为下图所示,更新策略网络参数使整个游戏生命周期R的期望最大化

我们可以这样思考问题,我们希望R最大化,而形成R结果的根因是什么呢?是来自于游戏每一时刻的S下策略网络输出的动作价值导致的游戏时间长短,也就是说我们要鼓励那些使游戏时间尽可能长的动作,反过来要抑制让游戏时间变短的的动作,那什么造成我们对动作的选择呢,策略网络中A的价值输出,所以我们的损失函数如下:

伪代码如下图所示

训练流程解释:

  1. 启动游戏CartPole,输出当前S
  2. 将S传入策略模型得到A的概率,采样并输出A到CartPole中
  3. 返回上一轮R,新一轮S,将其保存用作训练数据
  4. 当游戏结束时,根据轮次加入衰减系数并做归一化,对应每一轮S和A一起更新模型权重

查看完整代码

Actor Critic

通过上述讲解,我们已经知道Policy Gradient的核心思想是通过当前S输出A的价值,用以决定动作的选取。而损失函数:loss = -log(P) * R, 细心的朋友应该已经发现问题了,造成一局游戏最后R大与小是每一轮动作一起的结果,但是其中有些动作偏好,有些动作偏坏,不应该一概而论。

所以我们此时需要一个神经网络来评估根据当前S所给出的策略是好还是坏,这就是Critic,对应的Actor就是Policy Network, 伪代码如下:

和Policy Gradient唯一不同的地方在我们使用Temporal Difference通过优化Critic每一轮针对策略的评价值来更新Critic权重

首先安装依赖包:

  • pytorch 0.2.0
  • gym (0.10.4)

引入我们需要的模块

import argparse
from itertools import count

import gym
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.distributions import Categorical

env = gym.make('CartPole-v0').unwrapped

由于使用pytorch不是最新版本,还在使用Variable包住Tensor才能训练,引入CartPole的模块gym,创建时选择 “CartPole-v0”

模型创建,CartPole输出的S是一个四维向量,所以模型的向量输入长度为4,通过模型网络得到所有动作策略值与策略评估值,最后返回本轮动作

class ActorCritic(nn.Module):
    def __init__(self):
        super().__init__()

        self.affine1 = nn.Linear(4, 128)  # 特征层
        self.action_head = nn.Linear(128, 2) # 策略输出层
        self.value_head = nn.Linear(128, 1)  # 策略评估层

    def forward(self, x):
        x = F.relu(self.affine1(x))
        action_scores = self.action_head(x)
        state_values = self.value_head(x)
        return F.softmax(action_scores, dim=-1), state_values

    def select_action(self, state, values, select_props):
        state = torch.from_numpy(state).float() # 将CartPole输出的特征由np转化为pt.FloatTensor
        props, value = self(Variable(state)) # 得到所有动作策略值与策略评估值
        dist = Categorical(props)
        action = dist.sample()
        log_props = dist.log_prob(action) # 动作采样
        values.append(value) # 保存本轮策略评估值,用以后面更新模型权重
        select_props.append(log_props) # 保存本轮所有动作策略值

        return action.data[0]

优化函数使用Adam,学习率0.03。

optimizer = optim.Adam(model.parameters(), lr=3e-2)

整个训练过程,重置游戏,分别维护每一轮策略评估价值,动作策略概率和对应R,开始本次游戏,通过每轮游戏输出S给出A,并保存策略评估价值,动作策略概率和对应R,接下来是游戏结束后的模型优化,首先加入衰减系数并归一化本次游戏每一轮的R,遍历每一轮,Critic使用Temporal Difference优化,Actor和Policy Gradient优化方式类似,只是R变成了Critic输出的Value,相信有朋友应该看出来了,Actor Critic的优化主要通过Critic来驱动Actor的权重更新。

def main():
    for i_episode in count(1):
        state = env.reset()
        if args.render:
            env.render()
        values, select_props, policy_rewards = [], [], []
        for t in range(10000):
            action = model.select_action(state, values, select_props)
            state, reward, done, _ = env.step(action)
            policy_rewards.append(reward)

            if done:
                break

        R, rewards = 0, []
        for r in policy_rewards[::-1]:
            R = r + args.gamma * R
            rewards.insert(0, R)

        rewards = np.asarray(rewards)
        rewards = (rewards - rewards.mean()) / \
            (rewards.std() + np.finfo(np.float32).eps)

        value_loss, policy_loss = [], []
        for value, prop, r in zip(values, select_props, rewards):
            value_loss.append(F.smooth_l1_loss(
                value, Variable(torch.Tensor([r]))))
            reward = r - value.data[0]
            policy_loss.append(-prop * reward)

        loss = torch.cat(value_loss).sum() + torch.cat(policy_loss).sum()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if i_episode % 10 == 0:
            print('Episode {}\tLast length: {:5d}\t'.format(
                i_episode, t))

Actor Critic代码

思考:学习了Actor Critic,大家可以思考下它和GAN有什么相同和不同的地方 :)

以上内容是本文需要的知识储备,希望读到这里的朋友能理解上文中所有知识点,由于我能力有限,有些知识大家没有理解极大可能是我没有讲清楚,所以建议大家使用给出代码结合理解,本片文章给出所有代码在安装好三方包的前提下均可训练和预测,方便大家一边调试一边帮助理解。


AlphaGo Zero

AlphaGo Zero放弃AlphaGo Fan和Lee学习人类棋谱的方式,从随机落子完全自由自发的开始,自我对弈学习,到完胜它的前辈AlphaGo。结果是惊人的,而更大的价值在于训练不需要收集任何数据

我们先拆分Alpha Zero的几大部分,逐一解析:

  • 创建五子棋盘
  • 神经网络构建
  • 蒙特卡洛树构建与搜索流程
  • 训练流程

五子棋盘

棋牌是贯穿整个游戏,是蒙特卡洛树搜索(简称mcts)的承载物,也是获取当前最优解落子,到训练数据的产生和收集,所以需要实现棋盘复制(用于mcts),落子,历史特征数据收集,判断当前棋局下是有选手胜利 or 和局 or 继续比赛。

  • 棋盘初始化
    def __init__(self,
                 size=SIZE,
                 hist_num=HISTORY,
                 c_action=-1,
                 player=BLACK):

        self.size = size # 棋盘大小
        self.c_action = c_action # 当下落子
        self.hist_num = hist_num # 当下落子时刻的前几轮双方落子特征数据
        self.valid_moves = list(range(size**2)) # 未落子的位置
        self.invalid_moves = []  # 已落子的位置
        self.board = np.zeros([size, size])  # 棋盘
        self.c_player = player # 当前选手 “黑” or “白”
        self.players = {"black": BLACK, "white": WHITE} # 选手 “黑” or “白”

        # BLACK -> 0 | WHITE -> 1

        # 当下落子时刻的前几轮双方落子特征数据,每一次真正落子(不包括模拟)时更新
        self.history = [np.zeros((hist_num, size, size)),
                        np.zeros((hist_num, size, size))]
  • 复制棋盘,已和未落子,历史特征数据,和盘面数据
    def clone(self):
        c_board = Board(size=self.size,
                        hist_num=self.hist_num,
                        player=self.c_player,
                        c_action=self.c_action)

        c_board.valid_moves = self.valid_moves.copy()
        c_board.invalid_moves = self.invalid_moves.copy()
        c_board.board = self.board.copy()
        c_board.history = [h.copy() for h in self.history]

        return c_board
  • 落子,从一维转换为二维,落子位置从未落子数组中更新到已落子数组,_mask_pieces_by_player 函数实现的是当前选手的历史落子
    def move(self, action):
        x, y = action // self.size, action % self.size
        assert self._is_space(x, y)

        self.valid_moves.remove(action)
        self.invalid_moves.append(action)
        self.c_action = action
        self.board[x, y] = self.c_player

        p_index = int(self.c_player - BLACK)

        # 更新历史特征,将离当前时刻最远的特征替换成此刻的特征,并放到最左边位置
        self.history[p_index] = np.roll(self.history[p_index], 1, axis=0)
        self.history[p_index][0] = self._mask_pieces_by_player(self.c_player)
  • 特征生成,除了两位选手各自历史落子以外,还加入了两维特征:1. 当前落子的特征,为什么加入这一个特征呢?我们可以从五子棋这个游戏角度出发,选手对弈中大部分落子都是针对对手上一次落子周围布局 2.当前选手特征,是“黑” or “白”
    def gen_state(self):
        to_action = np.zeros((1, self.size, self.size))

        # 当前落子的特征
        to_action[0][self.c_action // self.size,
                     self.c_action % self.size] = 1.
        to_play = np.full((1, self.size, self.size), self.c_player - BLACK)
        state = np.concatenate(self.history + [to_play, to_action], axis=0)

        return state

神经网络

我们的网络结构和AlphaZero的结构类似

  • ResBlockNet
class ResBlockNet(nn.Module):
  def __init__(self,
               ind=RES_BLOCK_FILLTERS,
               block_filters=RES_BLOCK_FILLTERS,
               kr_size=3,
               stride=1,
               padding=1,
               bias=False):

    super().__init__()

    self.layers = nn.Sequential(
        nn.Conv2d(ind, block_filters, kr_size,
                  stride=stride,
                  padding=padding,
                  bias=bias),
        nn.BatchNorm2d(block_filters),
        nn.ReLU(),
        nn.Conv2d(block_filters, block_filters, kr_size,
                  stride=stride,
                  padding=padding,
                  bias=bias),
        nn.BatchNorm2d(block_filters),
    )

  def forward(self, x):
    res = x
    out = self.layers(x) + x

    return F.relu(out)
  • 策略网络
class Policy(nn.Module):
  def __init__(self,
               ind=RES_BLOCK_FILLTERS,
               outd=OUTD,
               kernels=2):

    super().__init__()

    self.out = outd * kernels

    self.conv = nn.Sequential(
        nn.Conv2d(ind, kernels, kernel_size=1),
        nn.BatchNorm2d(kernels),
        nn.ReLU(),
    )

    self.linear = nn.Linear(kernels * outd, outd)
    self.linear.weight.data.uniform_(-.1, .1)

  def forward(self, x):
    x = self.conv(x)
    x = x.view(-1, self.out)
    x = self.linear(x)

    return F.log_softmax(x, dim=-1)
  • 价值网络
class Value(nn.Module):
  def __init__(self,
               ind=RES_BLOCK_FILLTERS,
               outd=OUTD,
               hsz=256,
               kernels=1):
    super().__init__()

    self.outd = outd

    self.conv = nn.Sequential(
        nn.Conv2d(ind, kernels, kernel_size=1),
        nn.BatchNorm2d(kernels),
        nn.ReLU(),
    )

    self.linear = nn.Sequential(
        nn.Linear(outd, hsz),
        nn.ReLU(),
        nn.Linear(hsz, 1),
        nn.Tanh(),
    )

    self._reset_parameters()

  def forward(self, x):
    x = self.conv(x)
    x = x.view(-1, self.outd)

    return self.linear(x)

  def _reset_parameters(self):
    for layer in self.modules():
      if type(layer) == nn.Linear:
        layer.weight.data.uniform_(-.1, .1)

思考: 模型中使用到了residual,并且价值模型和策略模型的高维特征抽取共用,AlphaZero论文中也做出了对比,使用residual与否,共用价值模型和策略模型的高维特征抽取与否的效果。

使用residual效果较优应该比较好理解,由于模型比较“深”,并且使用relu作为激活函数,如果不引入residual很有可能造成大量死点,以至于bp时权重不更新

共用价值模型和策略模型的高维特征抽取,可以参考上面Actor Critic中提到“通过Critic来驱动Actor的更新”,虽然在AlphaZero中没有这么强的关联性,不过也会相互推动使收敛加速

蒙特卡洛树搜索

蒙特卡洛树搜索是AlphaZero的核心组件,不难看出即使去掉mcts,仅使用神经网络也可以完成当前局势的评估并输出落子策略,那为什么一定要加入mcts呢?

首先我们来区分下蒙特卡洛算法和蒙特卡洛树搜索的区别,蒙特卡洛算法:采样越多,越近似最优解,举个别人的例子:

筐里有100个苹果,让我每次闭眼拿1个,挑出最大的。于是我随机拿1个,再随机拿1个跟它比,留下大的,再随机拿1个……我每拿一次,留下的苹果都至少不比上次的小。拿的次数越多,挑出的苹果就越大,但我除非拿100次,否则无法肯定挑出了最大的。这个挑苹果的算法,就属于蒙特卡罗算法——尽量找好的,但不保证是最好的。

如上图,假设现在轮到黑棋,黑棋有b1和b2两手可选,白棋对于b1有w1和w2两手可选,白棋对于b2有w3 w4 w5三手可选,然后假设走完w1/w2/w3/w4/w5后,经过局面评估,黑棋的未来胜率分别是 50%/48%/62%/45%/58%。

请问,黑棋此时最佳的着法是b1还是b2?如果是蒙特卡洛算法,会走b2,因为右边平均值高,但是mcts在不断模拟后会收敛到左边胜率48%,右边胜率45%而选择最优解b1。所以mcts更接近人在棋类游戏中用到的方式,将精力用在更值得搜索的地方, 先评估当前落子可能性,再根据每种落子推演后续的对弈情况,最后等到最优解,可以说它结合了广度优先搜索(BFS)和深度优先搜索(DFS),类似于启发式搜索,至于更侧重BFS还是DFS,我们后面会讲到。

我们在来深入理解mcts四步,如上图,每个节点表示当前经过多次模拟后的情况。节点上的数字表示 黑棋胜利次数/模拟总次数。现在开始新一轮的模拟:

  1. Selection:从根节点开始,每次都选更值得搜索,价值更高的节点,直到叶节点,如图中的 3/3 节点。
  2. Expansion:到了Selection中的叶节点,我们把除了根节点到当前叶节点外其他可能的节点扩展到该叶节点下方,并选择价值最高的。
  3. Simluation:从Expansion中选择的节点开始以此类推,选择可能节点中价值最高的,直到游戏结束。
  4. Backpropagation: 从游戏结束的叶节点开始,把本次模拟结果一路往回更新父节点,直到根节点。

为方便说明,我们使用一个简化公式 policy = x_value + C * N_parent / N_x, 我们从黑棋视角出发,x_value是当前节点黑棋胜率,那轮到白棋子时需要 1-x_value 作为白棋胜率,N_parent是父节点访问次数,N_x是当前节点访问次数,从根节点出发,首先轮到黑棋落子,使用公式评估:

  • 7/10:0.7 + 2.1C
  • 5/8:0.625 + 2.625C
  • 0/3:7C

可以看出,C的大小决定我们的搜索侧重BFS还是DFS,从图上看这里我们选择了7/10,侧重DFS,这时我们切换到白棋:

  • 1-2/4:0.5 + 2.5C
  • 1-5/6:0.17 + 1.7C

从图上看选择了更小的 0.17 + 1.7C ,得到 0/0,最后从0/0叶节点一路更新节点数据并返回到根节点。

现在回到我们五子棋实现上,首先树节点上存储的信息更多,如下图所示, N是当前节点访问次数,W是历史模拟中策略价值总等分,Q是平均策略价值,类似上面例子中的x_value,P是神经网络给出的策略分数

而我们的节点选取公式也替换成policy = Q + PUCT,其中PUCT如下图所示,这里说明一下,PUCT公式右边分子根号下为当前节点和兄弟节点访问次数总和,其实就是父节点访问次数(有疑问的朋友可以自行思考一下)

mcts模拟和上面例子基本一致,选取节点时使用了更合理的policy = Q + PUCT公式,Backpropagation时除了更新节点的访问次数外,还需要更新策略价值

以上就是整个mcts的讲解,接下来我们再结合代码来加深理解,构建节点,需要实现的方法:当前节点是否叶节点,选择最后价值节点,expand_node(Expansion)和backup(Backpropagation)

class TreeNode(object):
    def __init__(self,
                 action=None,
                 props=None,
                 parent=None):

        self.parent = parent
        self.action = action
        self.children = []

        self.N = 0  # visit count
        self.Q = .0  # mean action value
        self.W = .0  # total action value
        self.P = props  # prior probability

    def is_leaf(self):
        return len(self.children) == 0

    def select_child(self):
        index = np.argmax(np.asarray([c.uct() for c in self.children]))
        return self.children[index]

    def uct(self):
        return self.Q + self.P * CPUCT * (np.sqrt(self.parent.N) / (1 + self.N))

    def expand_node(self, props):
        self.children = [TreeNode(action=action, props=p, parent=self)
                         for action, p in enumerate(props) if p > 0.]

    def backup(self, v):
        self.N += 1
        self.W += v
        self.Q = self.W / self.N

最后是mcts的整个流程,我们会不停重复多次模拟过程,这里注意,第一次模拟时我们要在神经网络给出的策略向量上加入噪音,目的是为了让模拟时关注更多的选择,可以理解为BFS的一种方式,每次模拟都需要重置棋盘,然后从根节点一路搜索更有价值节点直到叶节点,然后使用神经网络输出当前数据的策略,使用policy = Q + PUCT得到价值最高的节点,最后Backpropagation,直到模拟结束。根据节点访问次数作为直接依据来选择落子,为什么用访问次数呢?其实很好理解,访问次数越多就意味着这个节点作为根节点下面的某条路径胜率是最高的。

class MonteCarloTreeSearch(object):
    def __init__(self, net,
                 ms_num=MCTSSIMNUM):

        self.net = net # 神经网络
        self.ms_num = ms_num # 模拟次数

    def search(self, borad, node, temperature=.001):
        self.borad = borad
        self.root = node

        for _ in range(self.ms_num):
            node = self.root
            borad = self.borad.clone()

            # Selection
            while not node.is_leaf():
                node = node.select_child()
                borad.move(node.action)
                borad.trigger()

            # be carefull - opponent state
            value, props = self.net(
                to_tensor(borad.gen_state(), unsqueeze=True))
            value = to_numpy(value, USECUDA)[0]
            props = np.exp(to_numpy(props, USECUDA))

            # add dirichlet noise for root node
            if node.parent is None:
                props = self.dirichlet_noise(props)

            # normalize
            props[borad.invalid_moves] = 0.
            total_p = np.sum(props)
            if total_p > 0:
                props /= total_p

            # winner, draw or continue
            if borad.is_draw():
                value = 0.
            else:
                done = borad.is_game_over(player=borad.last_player)
                if done:
                    value = -1.
                else:
                    node.expand_node(props)

            while node is not None:
                value = -value
                node.backup(value)
                node = node.parent

        action_times = np.zeros(borad.size**2)
        for child in self.root.children:
            action_times[child.action] = child.N

        action, pi = self.decision(action_times, temperature)
        for child in self.root.children:
            if child.action == action:
                return pi, child

    @staticmethod
    def dirichlet_noise(props, eps=DLEPS, alpha=DLALPHA):
        return (1 - eps) * props + eps * np.random.dirichlet(np.full(len(props), alpha))

    @staticmethod
    def decision(pi, temperature):
        pi = (1.0 / temperature) * np.log(pi + 1e-10)
        pi = np.exp(pi - np.max(pi))
        pi /= np.sum(pi)
        action = np.random.choice(len(pi), p=pi)
        return action, pi

训练流程

训练就比较简单了,mcts结果作为动作输出,一步步落子直到游戏结束,每一步落子时会收集当前局势特征,落子概率分布和R

class Game(object):
    def __init__(self, net, evl_net):
        self.net = net
        self.evl_net = evl_net
        self.board = Board()

    def play(self):
        datas, node = [], TreeNode()
        mc = MonteCarloTreeSearch(self.net)
        move_count = 0

        while True:
            if move_count < TEMPTRIG:
                pi, next_node = mc.search(self.board, node, temperature=1)
            else:
                pi, next_node = mc.search(self.board, node)

            datas.append([self.board.gen_state(), pi, self.board.c_player])

            self.board.move(next_node.action)
            next_node.parent = None
            node = next_node

            if self.board.is_draw():
                reward = 0.
                break

            if self.board.is_game_over():
                reward = 1.
                break

            self.board.trigger()
            move_count += 1

        datas = np.asarray(datas)
        datas[:, 2][datas[:, 2] == self.board.c_player] = reward
        datas[:, 2][datas[:, 2] != self.board.c_player] = -reward

        return datas

训练时通过棋盘旋转和反转做数据增强

    def sample(self, datas):
        for state, pi, reward in datas:
            c_state = state.copy()
            c_pi = pi.copy()
            for i in range(4):
                c_state = np.array([np.rot90(s, i) for s in c_state])
                c_pi = np.rot90(c_pi.reshape(SIZE, SIZE), i)
                self.sample_data.append([c_state, c_pi.flatten(), reward])

                c_state = np.array([np.fliplr(s) for s in c_state])
                c_pi = np.fliplr(c_pi)
                self.sample_data.append([c_state, c_pi.flatten(), reward])

        return len(datas)

最后来看看效果,相互对弈2400局后感觉和我技术差不多:

最后的最后,完整代码查看

如果本篇文章让你有所收获,请star项目或者follow我,因为我会经常更新代码,谢谢

Nevermore Written by:

步步生姿,空锁满庭花雨。胜将娇花比。