Pytorch 多卡训练

总结Pytorch 多卡训练的原理与实现

Posted by JMX on September 12, 2021

Pytorch 多卡训练

一、多卡训练原理

多卡训练流程一般如下:

  1. 指定主机节点
  2. 主机节点划分数据,一个batch数据平均分到每个机器上
  3. 模型从主机拷贝到各个机器
  4. 每个机器进行前向传播
  5. 每个机器计算loss损失
  6. 主机收集所有loss结果,进行参数更新
  7. 将更新后参数模型拷贝给各个机器

image

image

二、单机多卡训练

使用torch.nn.DataParallel(module, device_ids)模块,module为模型,device_ids为并行的GPU id列表

使用方式:将模型调用该接口执行操作

model = torch.nn.DataParallel(model)

示例:我们假设模型输入为(32, input_dim),这里的 32 表示batch_size,模型输出为(32, output_dim),使用 4 个GPU训练。nn.DataParallel起到的作用是将这 32 个样本拆成 4 份,发送给 4 个GPU 分别做 forward,然后生成 4 个大小为(8, output_dim)的输出,然后再将这 4 个输出都收集到cuda:0上并合并成(32, output_dim)。

可以看出,nn.DataParallel没有改变模型的输入输出,因此其他部分的代码不需要做任何更改,非常方便。但弊端是,后续的loss计算只会在cuda:0上进行,没法并行,因此会导致负载不均衡的问题。

通过在模型内置loss计算可以解决上述负载不均衡的情况,最后所得loss进行取平均。

class Net:
    def __init__(self,...):
        # code
    
    def forward(self, inputs, labels=None)
        # outputs = fct(inputs)
        # loss_fct = ...
        if labels is not None:
            loss = loss_fct(outputs, labels)  # 在训练模型时直接将labels传入模型,在forward过程中计算loss
            return loss
        else:
            return outputs

按照我们上面提到的模型并行逻辑,在每个GPU上会计算出一个loss,这些loss会被收集到cuda:0上并合并成长度为 4 的张量。这个时候在做backward的之前,必须对将这个loss张量合并成一个标量,一般直接取mean就可以。这在Pytorch官方文档nn.DataParallel函数中有提到:

When module returns a scalar (i.e., 0-dimensional tensor) in forward(), this wrapper will return a vector of length equal to number of devices used in data parallelism, containing the result from each device.

三、多机多卡训练

该方式也可以实现单机多卡

使用torch.nn.parallel.DistributedDataParalleltorch.utils.data.distributed.DistributedSampler结合多进程实现。

  1. 从一开始就会启动多个进程(进程数小于等于GPU数),每个进程独享一个GPU,每个进程都会独立地执行代码。这意味着每个进程都独立地初始化模型、训练,当然,在每次迭代过程中会通过进程间通信共享梯度,整合梯度,然后独立地更新参数。

  2. 每个进程都会初始化一份训练数据集,当然它们会使用数据集中的不同记录做训练,这相当于同样的模型喂进去不同的数据做训练,也就是所谓的数据并行。这是通过torch.utils.data.distributed.DistributedSampler函数实现的,不过逻辑上也不难想到,只要做一下数据partition,不同进程拿到不同的parition就可以了,官方有一个简单的demo,感兴趣的可以看一下代码实现:Distributed Training

  3. 进程通过local_rank变量来标识自己,local_rank为0的为master,其他是slave。这个变量是torch.distributed包帮我们创建的,使用方法如下:

    import argparse  # 必须引入 argparse 包
    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int, default=-1)
    args = parser.parse_args()
    

    必须以如下方式运行代码:

    python -m torch.distributed.launch --nproc_per_node=2 --nnodes=1 train.py
    

    这样的话,torch.distributed.launch就以命令行参数的方式将args.local_rank变量注入到每个进程中,每个进程得到的变量值都不相同。比如使用 4 个GPU的话,则 4 个进程获得的args.local_rank值分别为0、1、2、3。

上述命令行参数nproc_per_node表示每个节点需要创建多少个进程(使用几个GPU就创建几个);nnodes表示使用几个节点,做单机多核训练设为1。

  1. 因为每个进程都会初始化一份模型,为保证模型初始化过程中生成的随机权重相同,需要设置随机种子。方法如下:
    def set_seed(seed):
     random.seed(seed)
     np.random.seed(seed)
     torch.manual_seed(seed)
     torch.cuda.manual_seed_all(seed)
    

使用方式如下:

from torch.utils.data.distributed import DistributedSampler  # 负责分布式dataloader创建,也就是实现上面提到的partition。

# 负责创建 args.local_rank 变量,并接受 torch.distributed.launch 注入的值
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int, default=-1)
args = parser.parse_args()

# 每个进程根据自己的local_rank设置应该使用的GPU
torch.cuda.set_device(args.local_rank)
device = torch.device('cuda', args.local_rank)

# 初始化分布式环境,主要用来帮助进程间通信
torch.distributed.init_process_group(backend='nccl')

# 固定随机种子
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

# 初始化模型
model = Net()
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

# 只 master 进程做 logging,否则输出会很乱
if args.local_rank == 0:
    tb_writer = SummaryWriter(comment='ddp-training')

# 分布式数据集
train_sampler = DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset, sampler=train_sampler, batch_size=batch_size)  # 注意这里的batch_size是每个GPU上的batch_size

# 分布式模型
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank, find_unused_parameters=True)

torch.distributed.init_process_group()包含四个常用的参数:

  • backend: 后端, 实际上是多个机器之间交换数据的协议
  • init_method: 机器之间交换数据, 需要指定一个主节点, 而这个参数就是指定主节点的
  • world_size: 介绍都是说是进程, 实际就是机器的个数, 例如两台机器一起训练的话, world_size就设置为2
  • rank: 区分主节点和从节点的, 主节点为0, 剩余的为了1-(N-1), N为要使用的机器的数量, 也就是world_size

后端初始化

pytorch提供下列常用后端:

image

初始化init_method

  1. TCP初始化
import torch.distributed as dist

dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
                        rank=rank, world_size=world_size)

注意这里使用格式为tcp://ip:端口号, 首先ip地址是你的主节点的ip地址, 也就是rank参数为0的那个主机的ip地址, 然后再选择一个空闲的端口号, 这样就可以初始化init_method了.

  1. 共享文件系统初始化
import torch.distributed as dist

dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile',
                        rank=rank, world_size=world_size)

初始化rank和world_size

这里其实没有多难, 你需要确保, 不同机器的rank值不同, 但是主机的rank必须为0, 而且使用init_method的ip一定是rank为0的主机, 其次world_size是你的主机数量, 你不能随便设置这个数值, 你的参与训练的主机数量达不到world_size的设置值时, 代码是不会执行的.

四、模型保存

模型的保存与加载,与单GPU的方式有所不同。这里通通将参数以cpu的方式save进存储, 因为如果是保存的GPU上参数,pth文件中会记录参数属于的GPU号,则加载时会加载到相应的GPU上,这样就会导致如果你GPU数目不够时会在加载模型时报错

或者模型保存时控制进程,只在主进程进行保存。 模型保存都是一致的,不过分布式运行有多个进程在同时跑,所以会保存多个模型到存储上,如果使用共享存储就要注意文件名的问题,当然一般只在rank0进程上保存参数即可,因为所有进程的模型参数是同步的。

torch.save(model.module.cpu().state_dict(), "model.pth")

参数加载:

param=torch.load("model.pth")

以下是huggingface/transformers代码中用到的模型保存代码

if torch.distributed.get_rank() == 0:
    model_to_save = model.module if hasattr(model, "module") else model  # Take care of distributed/parallel training
    model_to_save.save_pretrained(args.output_dir)
    tokenizer.save_pretrained(args.output_dir)

参考链接

pytorch多gpu并行训练

PyTorch 单机多GPU 训练方法与原理整理