pytorch使用horovod多gpu训练的实现

pytorch在Horovod上训练步骤分为以下几步:

import torch
import horovod.torch as hvd

# Initialize Horovod 初始化horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process) 分配到每个gpu上
torch.cuda.set_device(hvd.local_rank())

# Define dataset... 定义dataset
train_dataset = ...

# Partition dataset among workers using DistributedSampler 对dataset的采样器进行调整,使用torch.utils.data.distributed.DistributedSampler
train_sampler = torch.utils.data.distributed.DistributedSampler(
  train_dataset, num_replicas=hvd.size(), rank=hvd.rank())

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)

# Build model...
model = ...
model.cuda()

optimizer = optim.SGD(model.parameters())

# Add Horovod Distributed Optimizer 使用Horovod的分布式优化器函数包裹在原先optimizer上
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())

# Broadcast parameters from rank 0 to all other processes. 参数广播到每个gpu上
hvd.broadcast_parameters(model.state_dict(), root_rank=0)

for epoch in range(100):
  for batch_idx, (data, target) in enumerate(train_loader):
    optimizer.zero_grad()
    output = model(data)
    loss = F.nll_loss(output, target)
    loss.backward()
    optimizer.step()
    if batch_idx % args.log_interval == 0:
      print('Train Epoch: {} [{}/{}]\tLoss: {}'.format(
        epoch, batch_idx * len(data), len(train_sampler), loss.item()))

完整示例代码如下,在imagenet上采用resnet50进行训练

  from __future__ import print_function

  import torch
  import argparse
  import torch.backends.cudnn as cudnn
  import torch.nn.functional as F
  import torch.optim as optim
  import torch.utils.data.distributed
  from torchvision import datasets, transforms, models
 import horovod.torch as hvd
 import os
 import math
 from tqdm import tqdm
 from distutils.version import LooseVersion

 # Training settings
 parser = argparse.ArgumentParser(description='PyTorch ImageNet Example',
                  formatter_class=argparse.ArgumentDefaultsHelpFormatter)
 parser.add_argument('--train-dir', default=os.path.expanduser('~/imagenet/train'),
           help='path to training data')
 parser.add_argument('--val-dir', default=os.path.expanduser('~/imagenet/validation'),
           help='path to validation data')
 parser.add_argument('--log-dir', default='./logs',
           help='tensorboard log directory')
 parser.add_argument('--checkpoint-format', default='./checkpoint-{epoch}.pth.tar',
           help='checkpoint file format')
 parser.add_argument('--fp-allreduce', action='store_true', default=False,
           help='use fp compression during allreduce')
 parser.add_argument('--batches-per-allreduce', type=int, default=,
           help='number of batches processed locally before '
              'executing allreduce across workers; it multiplies '
              'total batch size.')
 parser.add_argument('--use-adasum', action='store_true', default=False,
           help='use adasum algorithm to do reduction')

 # Default settings from https://arxiv.org/abs/1706.02677.
 parser.add_argument('--batch-size', type=int, default=32,
           help='input batch size for training')
 parser.add_argument('--val-batch-size', type=int, default=32,
           help='input batch size for validation')
 parser.add_argument('--epochs', type=int, default=90,
           help='number of epochs to train')
 parser.add_argument('--base-lr', type=float, default=0.0125,
 44           help='learning rate for a single GPU')
 45 parser.add_argument('--warmup-epochs', type=float, default=5,
           help='number of warmup epochs')
 parser.add_argument('--momentum', type=float, default=0.9,
           help='SGD momentum')
 parser.add_argument('--wd', type=float, default=0.00005,
           help='weight decay')

 parser.add_argument('--no-cuda', action='store_true', default=False,
           help='disables CUDA training')
 parser.add_argument('--seed', type=int, default=42,
           help='random seed')

 args = parser.parse_args()
 args.cuda = not args.no_cuda and torch.cuda.is_available()

 allreduce_batch_size = args.batch_size * args.batches_per_allreduce

 hvd.init()
 torch.manual_seed(args.seed)

 if args.cuda:
   # Horovod: pin GPU to local rank.
   torch.cuda.set_device(hvd.local_rank())
   torch.cuda.manual_seed(args.seed)

 cudnn.benchmark = True

 # If set > 0, will resume training from a given checkpoint.
 resume_from_epoch = 0
 for try_epoch in range(args.epochs, 0, -1):
   if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
     resume_from_epoch = try_epoch
     break

 # Horovod: broadcast resume_from_epoch from rank 0 (which will have
 # checkpoints) to other ranks.
 resume_from_epoch = hvd.broadcast(torch.tensor(resume_from_epoch), root_rank=0,
                  name='resume_from_epoch').item()

 # Horovod: print logs on the first worker.
 verbose = 1 if hvd.rank() == 0 else 0

 # Horovod: write TensorBoard logs on first worker.
 try:
   if LooseVersion(torch.__version__) >= LooseVersion('1.2.0'):
     from torch.utils.tensorboard import SummaryWriter
   else:
     from tensorboardX import SummaryWriter
   log_writer = SummaryWriter(args.log_dir) if hvd.rank() == 0 else None
 except ImportError:
   log_writer = None

 # Horovod: limit # of CPU threads to be used per worker.
 torch.set_num_threads(4)

 kwargs = {'num_workers': 4, 'pin_memory': True} if args.cuda else {}
 train_dataset = \
   datasets.ImageFolder(args.train_dir,
             transform=transforms.Compose([
               transforms.RandomResizedCrop(224),
               transforms.RandomHorizontalFlip(),
               transforms.ToTensor(),
               transforms.Normalize(mean=[., ., .],
                          std=[0.229, 0.224, 0.225])
             ]))
 # Horovod: use DistributedSampler to partition data among workers. Manually specify
 # `num_replicas=hvd.size()` and `rank=hvd.rank()`.
 train_sampler = torch.utils.data.distributed.DistributedSampler(
   train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
 train_loader = torch.utils.data.DataLoader(
   train_dataset, batch_size=allreduce_batch_size,
   sampler=train_sampler, **kwargs)

 val_dataset = \
   datasets.ImageFolder(args.val_dir,
             transform=transforms.Compose([
               transforms.Resize(256),
               transforms.CenterCrop(224),
               transforms.ToTensor(),
               transforms.Normalize(mean=[0.485, 0.456, 0.406],
                          std=[0.229, 0.224, 0.225])
             ]))
 val_sampler = torch.utils.data.distributed.DistributedSampler(
   val_dataset, num_replicas=hvd.size(), rank=hvd.rank())
 val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=args.val_batch_size,
                     sampler=val_sampler, **kwargs)

 # Set up standard ResNet-50 model.
 model = models.resnet50()

 # By default, Adasum doesn't need scaling up learning rate.
 # For sum/average with gradient Accumulation: scale learning rate by batches_per_allreduce
 lr_scaler = args.batches_per_allreduce * hvd.size() if not args.use_adasum else 1

 if args.cuda:
   # Move model to GPU.
   model.cuda()
   # If using GPU Adasum allreduce, scale learning rate by local_size.
   if args.use_adasum and hvd.nccl_built():
     lr_scaler = args.batches_per_allreduce * hvd.local_size()

 # Horovod: scale learning rate by the number of GPUs.
 optimizer = optim.SGD(model.parameters(),
            lr=(args.base_lr *
              lr_scaler),
            momentum=args.momentum, weight_decay=args.wd)

 # Horovod: (optional) compression algorithm.
 compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

 # Horovod: wrap optimizer with DistributedOptimizer.
 optimizer = hvd.DistributedOptimizer(
   optimizer, named_parameters=model.named_parameters(),
   compression=compression,
   backward_passes_per_step=args.batches_per_allreduce,
   op=hvd.Adasum if args.use_adasum else hvd.Average)

 # Restore from a previous checkpoint, if initial_epoch is specified.
 # Horovod: restore on the first worker which will broadcast weights to other workers.
 if resume_from_epoch > 0 and hvd.rank() == 0:
   filepath = args.checkpoint_format.format(epoch=resume_from_epoch)
   checkpoint = torch.load(filepath)
   model.load_state_dict(checkpoint['model'])
   optimizer.load_state_dict(checkpoint['optimizer'])

 # Horovod: broadcast parameters & optimizer state.
 hvd.broadcast_parameters(model.state_dict(), root_rank=)
 hvd.broadcast_optimizer_state(optimizer, root_rank=)

 def train(epoch):
   model.train()
   train_sampler.set_epoch(epoch)
   train_loss = Metric('train_loss')
   train_accuracy = Metric('train_accuracy')

   with tqdm(total=len(train_loader),
        desc='Train Epoch   #{}'.format(epoch + 1),
        disable=not verbose) as t:
     for batch_idx, (data, target) in enumerate(train_loader):
       adjust_learning_rate(epoch, batch_idx)

       if args.cuda:
         data, target = data.cuda(), target.cuda()
       optimizer.zero_grad()
       # Split data into sub-batches of size batch_size
       for i in range(0, len(data), args.batch_size):
         data_batch = data[i:i + args.batch_size]
         target_batch = target[i:i + args.batch_size]
         output = model(data_batch)
         train_accuracy.update(accuracy(output, target_batch))
         loss = F.cross_entropy(output, target_batch)
         train_loss.update(loss)
         # Average gradients among sub-batches
         loss.div_(math.ceil(float(len(data)) / args.batch_size))
         loss.backward()
       # Gradient is applied across all ranks
       optimizer.step()
       t.set_postfix({'loss': train_loss.avg.item(),
              'accuracy': 100. * train_accuracy.avg.item()})
       t.update(1)

   if log_writer:
     log_writer.add_scalar('train/loss', train_loss.avg, epoch)
     log_writer.add_scalar('train/accuracy', train_accuracy.avg, epoch)

 def validate(epoch):
   model.eval()
   val_loss = Metric('val_loss')
   val_accuracy = Metric('val_accuracy')

   with tqdm(total=len(val_loader),
        desc='Validate Epoch #{}'.format(epoch + ),
        disable=not verbose) as t:
     with torch.no_grad():
       for data, target in val_loader:
         if args.cuda:
           data, target = data.cuda(), target.cuda()
         output = model(data)

         val_loss.update(F.cross_entropy(output, target))
         val_accuracy.update(accuracy(output, target))
         t.set_postfix({'loss': val_loss.avg.item(),
                'accuracy': 100. * val_accuracy.avg.item()})
        t.update(1)

   if log_writer:
     log_writer.add_scalar('val/loss', val_loss.avg, epoch)
     log_writer.add_scalar('val/accuracy', val_accuracy.avg, epoch)

 # Horovod: using `lr = base_lr * hvd.size()` from the very beginning leads to worse final
 # accuracy. Scale the learning rate `lr = base_lr` ---> `lr = base_lr * hvd.size()` during
 # the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
 # After the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
 def adjust_learning_rate(epoch, batch_idx):
   if epoch < args.warmup_epochs:
     epoch += float(batch_idx + 1) / len(train_loader)
     lr_adj = 1. / hvd.size() * (epoch * (hvd.size() - 1) / args.warmup_epochs + 1)
   elif epoch < 30:
     lr_adj = 1.
   elif epoch < 60:
     lr_adj = 1e-1
   elif epoch < 80:
     lr_adj = 1e-2
   else:
     lr_adj = 1e-3
   for param_group in optimizer.param_groups:
     param_group['lr'] = args.base_lr * hvd.size() * args.batches_per_allreduce * lr_adj

 def accuracy(output, target):
   # get the index of the max log-probability
   pred = output.max(1, keepdim=True)[1]
   return pred.eq(target.view_as(pred)).cpu().float().mean()

 def save_checkpoint(epoch):
   if hvd.rank() == 0:
     filepath = args.checkpoint_format.format(epoch=epoch + 1)
     state = {
       'model': model.state_dict(),
       'optimizer': optimizer.state_dict(),
     }
     torch.save(state, filepath)

 # Horovod: average metrics from distributed training.
 class Metric(object):
   def __init__(self, name):
     self.name = name
     self.sum = torch.tensor(0.)
     self.n = torch.tensor(0.)

   def update(self, val):
     self.sum += hvd.allreduce(val.detach().cpu(), name=self.name)
     self.n += 1

   @property
   def avg(self):
     return self.sum / self.n

 for epoch in range(resume_from_epoch, args.epochs):
   train(epoch)
   validate(epoch)
   save_checkpoint(epoch)

到此这篇关于pytorch使用horovod多gpu训练的实现的文章就介绍到这了,更多相关pytorch horovod多gpu训练内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 在pytorch中为Module和Tensor指定GPU的例子

    pytorch指定GPU 在用pytorch写CNN的时候,发现一运行程序就卡住,然后cpu占用率100%,nvidia-smi 查看显卡发现并没有使用GPU.所以考虑将模型和输入数据及标签指定到gpu上. pytorch中的Tensor和Module可以指定gpu运行,并且可以指定在哪一块gpu上运行,方法非常简单,就是直接调用Tensor类和Module类中的 .cuda() 方法. import torch from PIL import Image import torch.nn as

  • Win10+GPU版Pytorch1.1安装的安装步骤

    安装cuda 更新nvidia驱动 打开GeForce Game Ready Driver或在GeForce Experience中下载符合自己gpu的程序. 选择cuda 打开nvidia控制面板 点击帮助.点击系统信息.在点击组件在3D设置中可以看到cuda信息 在我升级过nvidia驱动后,cuda的版本更新到了10.1.接下来下载cuda . cuda10.1安装完毕. 安装cuDNN 在安装了cuda10.1后选择对应的cuDNN版本v7.6.1 解压文件,然后添加bin目录到环境变量

  • Pytorch 多块GPU的使用详解

    注:本文针对单个服务器上多块GPU的使用,不是多服务器多GPU的使用. 在一些实验中,由于Batch_size的限制或者希望提高训练速度等原因,我们需要使用多块GPU.本文针对Pytorch中多块GPU的使用进行说明. 1. 设置需要使用的GPU编号 import os os.environ["CUDA_VISIBLE_DEVICES"] = "0,4" ids = [0,1] 比如我们需要使用第0和第4块GPU,只用上述三行代码即可. 其中第二行指程序只能看到第1

  • pytorch使用指定GPU训练的实例

    本文适合多GPU的机器,并且每个用户需要单独使用GPU训练. 虽然pytorch提供了指定gpu的几种方式,但是使用不当的话会遇到out of memory的问题,主要是因为pytorch会在第0块gpu上初始化,并且会占用一定空间的显存.这种情况下,经常会出现指定的gpu明明是空闲的,但是因为第0块gpu被占满而无法运行,一直报out of memory错误. 解决方案如下: 指定环境变量,屏蔽第0块gpu CUDA_VISIBLE_DEVICES = 1 main.py 这句话表示只有第1块

  • 用Pytorch训练CNN(数据集MNIST,使用GPU的方法)

    听说pytorch使用比TensorFlow简单,加之pytorch现已支持windows,所以今天装了pytorch玩玩,第一件事还是写了个简单的CNN在MNIST上实验,初步体验的确比TensorFlow方便. 参考代码(在莫烦python的教程代码基础上修改)如下: import torch import torch.nn as nn from torch.autograd import Variable import torch.utils.data as Data import tor

  • 解决pytorch GPU 计算过程中出现内存耗尽的问题

    Pytorch GPU运算过程中会出现:"cuda runtime error(2): out of memory"这样的错误.通常,这种错误是由于在循环中使用全局变量当做累加器,且累加梯度信息的缘故,用官方的说法就是:"accumulate history across your training loop".在默认情况下,开启梯度计算的Tensor变量是会在GPU保持他的历史数据的,所以在编程或者调试过程中应该尽力避免在循环中累加梯度信息. 下面举个栗子: 上代

  • 关于pytorch多GPU训练实例与性能对比分析

    以下实验是我在百度公司实习的时候做的,记录下来留个小经验. 多GPU训练 cifar10_97.23 使用 run.sh 文件开始训练 cifar10_97.50 使用 run.4GPU.sh 开始训练 在集群中改变GPU调用个数修改 run.sh 文件 nohup srun --job-name=cf23 $pt --gres=gpu:2 -n1 bash cluster_run.sh $cmd 2>&1 1>>log.cf50_2GPU & 修改 –gres=gpu:

  • pytorch 使用单个GPU与多个GPU进行训练与测试的方法

    如下所示: device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")#第一行代码 model.to(device)#第二行代码 首先是上面两行代码放在读取数据之前. mytensor = my_tensor.to(device)#第三行代码 然后是第三行代码.这句代码的意思是将所有最开始读取数据时的tersor变量copy一份到device所指定的GPU上去,之后的运算都在GPU上

  • pytorch 限制GPU使用效率详解(计算效率)

    问题 用过 tensorflow 的人都知道, tf 可以限制程序在 GPU 中的使用效率,但 pytorch 中没有这个操作. 思路 于是我想到了一个代替方法,玩过单片机点灯的同学都知道,灯的亮度是靠占空比实现的,这实际上也是计算机的运行原理. 那我们是不是也可以通过增加 GPU 不工作的时间,进而降低 GPU 的使用效率 ? 主要代码 import time ... rest_time = 0.15 ... for _ in range( XXX ): ... outputs = all_G

  • win10使用清华源快速安装pytorch-GPU版(推荐)

    检查自己的cuda是否安装好 在anaconda prompt中输入 nvcc -V 显示如上面表示安装好了. 配置清华园下载环境 同样在在anaconda prompt中输入 conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/

  • 将Pytorch模型从CPU转换成GPU的实现方法

    最近将Pytorch程序迁移到GPU上去的一些工作和思考 环境:Ubuntu 16.04.3 Python版本:3.5.2 Pytorch版本:0.4.0 0. 序言 大家知道,在深度学习中使用GPU来对模型进行训练是可以通过并行化其计算来提高运行效率,这里就不多谈了. 最近申请到了实验室的服务器来跑程序,成功将我简陋的程序改成了"高大上"GPU版本. 看到网上总体来说少了很多介绍,这里决定将我的一些思考和工作记录下来. 1. 如何进行迁移 由于我使用的是Pytorch写的模型,网上给

  • pytorch 指定gpu训练与多gpu并行训练示例

    一. 指定一个gpu训练的两种方法: 1.代码中指定 import torch torch.cuda.set_device(id) 2.终端中指定 CUDA_VISIBLE_DEVICES=1 python 你的程序 其中id就是你的gpu编号 二. 多gpu并行训练: torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0) 该函数实现了在module级别上的数据并行使用,注意batch size要大于G

随机推荐