分布式计算

2023-06-28

Ray

官方文档:https://docs.ray.io/en/master/

Install

安装手册:https://docs.ray.io/en/master/ray-overview/installation.html

强化学习环境:

pip install -U "ray[all]"

Cluster deployment

Head node (m10):

ray start --head --port=6333 --dashboard-host=0.0.0.0 --node-ip-address=192.168.1.10 --num-cpus=4 --num-gpus=0

Slave node

(m8):

ray start --address=192.168.1.10:6333 --node-ip-address=192.168.1.80 --num-cpus=16 --num-gpus=0

(m9):

ray start --address=192.168.1.10:6333 --node-ip-address=192.168.1.90 --num-cpus=16 --num-gpus=0

Optimization Algorithms

https://docs.ray.io/en/latest/tune/api/schedulers.html

https://optuna.readthedocs.io/en/stable/reference/samplers/index.html

PBT schedulers (PBT调度器)

Ray PBT实现的参数说明

time_attr: 用于定义训练时长的测度,要求单调递增,比如 training_iteration 或 time_total_s。

metric: 训练结果目标值属性。如果不设用模型定义的如:metric=”episode_reward_mean”

mode: (str, one of “min”, “max”)

perturbation_interval: 模型会以time_attr为间隔来进行perturbation

quantile_fraction: (浮点数介于 0 和 0.5 之间)定义用于利用的顶级和性能最低模型的比例。 决定按多大比例将表现好的头部模型克隆到尾部模型。 较高的值有利于开发顶级模型,较低的值有利于探索。

resample_probability: 当对超参进行exploration时从原分布中重新采样的概率,否则会根据现有的值调整。 值越高,引入新超参数值的机会就越大。 较低的值有利于搜索空间中现有值的扰动。

hyperparam_mutations: 需要变异的超参。它是一个dict,对于每个key对应list或者function。如果没设这个,就需要在custom_explore_fn中指定。

custom_explore_fn: 自定义的exploration函数。

Optimization Cluster parameter

.rollouts()

num_rollout_workers: 这个参数决定了要创建多少个rollout worker actors来进行并行采样。如果将此参数设置为0,那么所有的rollouts将在本地worker中执行(即驱动进程或者在使用Tune时的算法actor)。这意味着不会有额外的并行性,所有的采样都将在一个进程中顺序进行。

num_envs_per_worker: 这个参数指定每个worker将同时评估多少个环境。这允许模型推理批处理,可以提高那些受推理性能瓶颈影响的工作负载的性能。简单来说,如果你的模型推理是计算瓶颈,增加每个worker的环境数量可以让你在每次推理时处理更多的数据,从而提高整体性能。

.training()

train_batch_size:

它控制在模型的内部参数更新之前要处理的训练样本数量。这是梯度下降算法的一个超参数。

具体来说,如果你有1050个训练样本,并且你设置了 train_batch_size 等于100,算法将取训练数据集中的前100个样本(从第1个到第100个)来训练网络。然后,它会取下一个100个样本(从第101个到第200个)来再次训练网络。这个过程会一直重复,直到所有样本都通过网络传播。

使用小于所有样本数量的 train_batch_size 有以下优点:

    1.它需要更少的内存。因为你使用更少的样本来训练网络,所以整个训练过程需要的内存更少。这在你无法将整个数据集放入机器内存时尤其重要。
    2.网络训练通常会更快。这是因为我们在每次传播后都会更新权重。在我们的例子中,我们传播了11个批次(10个批次有100个样本,1个批次有50个样本),并且在每个批次后都更新了网络的参数。如果我们在传播过程中使用了所有样本,那么我们只会对网络的参数进行一次更新。

使用小于所有样本数量的 train_batch_size 的缺点是:

    1.批次越小,对梯度的估计就越不准确。小批量梯度的方向比全批量梯度的方向波动得多。


在设置 train_batch_size 时,需要考虑以下因素:

    数据集的大小和样本分布。如果数据集很大且样本分布均匀,则可以设置较大的 train_batch_size。
    算法的复杂性。如果算法复杂,则需要较大的 train_batch_size 才能有效地学习。
    资源的限制。如果计算资源有限,则需要设置较小的 train_batch_size。
    一般来说,建议从较小的 train_batch_size 开始,然后根据训练效果逐步增加。

以下是 train_batch_size 的几个典型值:

    小型数据集(数千个样本):16-64
    中型数据集(数十万个样本):64-256
    大型数据集(数百万个样本):256-1024

model:

算法模型配置
"fcnet_hiddens": [128, 128, 128] 

PPOConfig.training()

lr_schedule: 学习率计划。格式为 [[timestep, lr-value], [timestep, lr-value], …]。中间时间步将被分配插值后的学习率值。计划通常应从时间步 0 开始。

use_critic: 是否使用 critic 作为基线(否则不使用 value baseline;使用 GAE 时必需)。

use_gae: 如果为 True,则使用具有价值函数的广义优势估计器 (GAE),请参阅 https://arxiv.org/pdf/1506.02438.pdf。

lambda_: GAE (lambda) 参数。

use_kl_loss: 是否在损失函数中使用 KL 项。

kl_coeff: KL 散度的初始系数。

kl_target: KL 散度的目标值。

sgd_minibatch_size: SGD 在所有设备上用于 SGD 的总 SGD 批处理大小。这定义了每个 epoch 内的 minibatch 大小。

num_sgd_iter: 每个外部循环中的 SGD 迭代次数(即,每个训练批处理执行的 epoch 数)。

shuffle_sequences: 是否在训练时对批次中的序列进行洗牌(推荐)。

vf_loss_coeff: 价值函数损失的系数。重要提示:如果您在模型的配置中设置了 vf_share_layers=True,则必须调整此参数。

entropy_coeff: 熵正则化器的系数。

entropy_coeff_schedule: 熵正则化器的衰减计划。

clip_param: PPO clip 参数。

vf_clip_param: 价值函数的 clip 参数。请注意,这对奖励的规模很敏感。如果您期望的 V 很高,请增加它。

grad_clip: 如果指定,则按此量裁剪梯度的全局范数。

.resources()

num_gpus: 分配给算法进程的 GPU 数量。注意,并非所有算法都能利用 GPU。目前,仅支持 tf-[PPO/IMPALA/DQN/PG] 多 GPU 功能。可以是小数(例如,0.3 个 GPU)。

_fake_gpus: 设置为 True 以在 CPU 机器上调试 (多)GPU 功能。在这种情况下,GPU 塔将由位于 CPU 上的图模拟。使用 num_gpus 测试不同数量的假 GPU。

num_cpus_per_worker: 分配给每个工作者的 CPU 数量。

num_gpus_per_worker: 分配给每个工作者的 GPU 数量。可以是小数。通常只有当环境本身需要 GPU(例如,它是 GPU 密集型视频游戏)或模型推理异常昂贵时才需要它。

num_learner_workers: 用于训练的工作者数量。值为 0 意味着将在头节点 CPU 或 1 个 GPU 上的本地工作者上进行训练(由 num_gpus_per_learner_worker 决定)。对于多 GPU 训练,将工作者数量设置为大于 1 并相应设置 num_gpus_per_learner_worker(例如,总共 4 个 GPU,模型需要 2 个 GPU:num_learner_workers = 2 和 num_gpus_per_learner_worker = 2)

num_cpus_per_learner_worker: 分配给每个 Learner 工作者的 CPU 数量。仅在每个 Learner 内部自定义处理管道需要多个 CPU 内核时才需要。如果 num_learner_workers = 0,则忽略。

num_gpus_per_learner_worker: 分配给每个工作者的 GPU 数量。如果 num_learner_workers = 0,任何大于 0 的值都将在头节点上的单个 GPU 上运行训练,而 0 值将在头节点 CPU 内核上运行训练。如果设置了 num_gpus_per_learner_worker,则不能设置 num_cpus_per_learner_worker。

local_gpu_idx: 如果 num_gpus_per_worker>0 且 num_workers<2,则该 gpu 索引将用于训练。这是可用 cuda 设备的索引。例如,如果 os.environ[“CUDA_VISIBLE_DEVICES”] = “1”,则 local_gpu_idx 为 0 将使用节点上 id 为 1 的 gpu。

custom_resources_per_worker: 任何要分配给每个工作者的自定义 Ray 资源。

num_cpus_for_local_worker: 为算法分配的 CPU 数量。注意:这仅在 Tune 中运行时有效。否则,算法在主程序(驱动程序)中运行。

placement_strategy: Algorithm.default_resource_request() 返回的放置组工厂的策略。PlacementGroup 定义了哪些设备(资源)应该始终共存于同一个节点上。

例如,一个有 2 个 rollout worker 的算法,运行时 num_gpus=1 将请求具有以下包的放置组:

    [{"gpu": 1, "cpu": 1}, {"cpu": 1}, {"cpu": 1}],其中第一个包用于驱动程序,其他 2 个包用于两个 worker。

现在可以根据 placement_strategy 的值将这些包“放置”在相同或不同的节点上:

    "PACK": 将包尽可能打包到尽可能少的节点中。

    "SPREAD": 将包尽可能均匀地放置在不同的节点上。

    "STRICT_PACK": 将包打包到一个节点中。不允许组跨越多个节点。

    "STRICT_SPREAD": 将包打包到不同的节点上。

nfs共享

分布式checkpoints需要共享存储,搭建过程:

要在Ubuntu 22.04上搭建nfs,你需要在服务器和客户端上安装不同的软件包,并配置共享目录和挂载选项。具体的步骤如下:

在服务器上,安装nfs-kernel-server软件包,它将允许你共享你的目录。你可以使用以下命令来安装:

sudo apt update
sudo apt install nfs-kernel-server

在客户端上,安装nfs-common软件包,它提供了nfs功能,但不包含任何服务器组件。你可以使用以下命令来安装:

sudo apt update
sudo apt install nfs-common

在服务器上,创建你想要共享的目录,并设置相应的权限。例如,你可以创建一个名为/nfsroot的目录,并赋予所有人读写权限:

sudo mkdir /nfsroot
sudo chmod 777 /nfsroot

在服务器上,编辑/etc/exports文件,指定你的共享目录和它们的属性。你可以使用以下格式来添加一行:

/nfsroot *(rw,sync,no_root_squash,no_subtree_check)

这表示/nfsroot目录可以被任何客户端以读写模式挂载,同步写入内存和硬盘,允许root用户访问,不检查父目录的权限。

在服务器上,重启nfs-kernel-server服务,使配置生效。你可以使用以下命令来重启:

sudo service nfs-kernel-server restart

在客户端上,创建一个用于挂载服务器共享目录的本地目录。例如,你可以创建一个名为/nfsroot的目录:

sudo mkdir /nfsroot

在客户端上,使用mount命令来挂载服务器共享目录。你需要将host_ip替换为服务器的IP地址。你可以使用以下命令来挂载:

sudo mount -t nfs -o nolock host_ip:/nfsroot /nfsroot

这表示使用nfs类型和nolock选项来挂载服务器的/nfsroot目录到客户端的/nfsroot目录。

在客户端上,你可以使用df -h命令来查看挂载的目录的信息。你可以使用以下命令来查看:

df -h /nfsroot

这表示显示/nfsroot目录的磁盘使用情况。

在客户端上,你可以使用umount命令来卸载服务器共享目录。你可以使用以下命令来卸载:

sudo umount /nfsroot

这表示卸载/nfsroot目录。

查看共享:

showmount -e [NFS服务器IP地址]

网卡速度测试

在两台 Linux 机器上安装 iperf。
sudo apt install iperf
在其中一台机器上启动 iperf 服务器。
iperf -s
在另一台机器上启动 iperf 客户端,连接到服务器。
iperf -c <服务器 IP 地址>

issues

1.ray 2.x版本与3.x版本不兼容,启动会报错,而且不同的python版本也不兼容!!!:

RuntimeError: Version mismatch: The cluster was started with:
    Ray: 2.4.0
    Python: 3.8.10
This process on node 10.147.19.20 was started with:
    Ray: 3.0.0.dev0
    Python: 3.8.10

2.遇到过一次WSL2下启动节点,但注册失败,检查环境日志无果,重启系统后成功。

可能是端口占用,或特定文件未被关闭。

optuna

文档:https://optuna.readthedocs.io/

性能优化

optuna在分布式超参优化场景下,对Running状态的trial参数会用竞争策略,这将导致集群环境重复评估,会浪费算力。

改造下采样函数,修复该问题:

# 求最优超参数
def f(trial):
    # 定义要找的超参数,并设置上下限
    params = {
        'minima_size': trial.suggest_int('minima_size', args.minima_size_a, args.minima_size_b,step=minima_size_STEP),
    }

    print()
    t_list = trial.study.get_trials(states=[TrialState.COMPLETE,TrialState.RUNNING])
    n = trial._cached_frozen_trial
    for t in t_list:
        if n.number == t.number:
            continue
        if n.params == t.params:
            if 'HOST' in t.user_attrs and 'host' in t.user_attrs['HOST']:
                host_s = t.user_attrs['HOST']['host']
            else:
                host_s = ""
            HOST_dic = {"host": socket.gethostname(),"msg":None,"number": t.number, "HOST_s": host_s}
            if t.state == TrialState.RUNNING:
                HOST_dic["msg"] = "RUNNING"
                print("超参重复将跳过",t.params,HOST_dic)
                trial.set_user_attr("HOST", HOST_dic)
                raise optuna.TrialPruned
            else:
                HOST_dic["msg"] = "COMPLETE"
                print("超参重复将跳过",t.params,HOST_dic)
                trial.set_user_attr("HOST", HOST_dic)
                raise optuna.TrialPruned
    trial.set_user_attr("HOST", {"host": socket.gethostname()})

    args.opt_times += 1

    # 评估超参数
    return _c.params_test(args,params,MC)

网格搜索使用步进函数

网格搜索需要枚举,用步进函数替代它:

def generate_float_list(start, end, step):
    """生成一个从 start 到 end 的列表,步长为 step。

    Args:
        start: 列表的起始值。
        end: 列表的结束值。
        step: 列表的步长。

    Returns:
        一个从 start 到 end 的列表,步长为 step。
    """

    list = []
    current = start
    while current <= end:
        list.append(current)
        current += step
    return list

search_space = {"minima_size":generate_float_list(args.minima_size_a, args.minima_size_b,step=minima_size_STEP)}

print("优化器:",study_name)

study = optuna.create_study(
    # sampler=TPESampler(seed=args.seedopt),
    sampler=GridSampler(search_space),
    study_name=study_name,
    direction='maximize',
    load_if_exists=True,
    storage=args.sqluri
    )

clusterssh

$ cat ~/.clusterssh/config
terminal_args=-fa 'DejaVu Sans Mono:style=Bold' -fs 8

$ cat ~/.bashrc
alias f1='cssh --fillscreen m8 m9 m10'

$ cat ~/.ssh/config
Host m8
    HostName m4
    User root
    Port 1822