Distributed Training with PyTorch

a summary of some experiences about distributed training with PyTorch.

1-Background

This part consists of some basic concepts about the distributed training.

In the following part of this article, we consider the case where we have 2 nodes, and there are 4 GPUs on each node (2 nodes * 4 GPUs/node = 8 GPUs in total).

Some concepts for DDP with PyTorch

  • group/world: process group,a group of all processe. In default, there is usually one process group for one job.
  • world_size: number of processes in a group\world;
  • rank: The global index for each process. Within a process group, each process has an independent index for the inter-process communication. The node with rank=0 is the master node.
  • local_rank: The local index of a certain process on the node that it exists. For example, rank=1,local_rank=3 indicates the 4-th GPU on the 2-th node.

Remark: The relationship between rank/process and GPU: one rank or process can contain either one GPU or multiple GPUs. The recommended manner is “one process for one GPU”.

Core arguments of Slurm

#SBATCH --nodes=2: number of the nodes (or machines)
#SBATCH --ntasks=8: number of the tasks/processes. Usually set as the number of GPUs, one GPU per task/process.
#SBATCH --ntasks-per-node=4: number of the tasks/processes on each node. Usually set as the number of GPUs on each node, one GPU per task/process. (Specify by one of `--ntasks` and `--ntasks-per-node`)
#SBATCH --cpus-per-task=2: number of CPU cores for each task/process.

#### How to request GPUs
#SBATCH --gpus-per-task=v100:1 : request n GPUs for each task. Usually set to 1 (one GPU per task).
#SBATCH --gpus-per-node=v100:n : request n GPUs for each node. Usually set to the number of tasks on each node (one GPU per task).

#### How to request RAM
#SBATCH --mem=32G: allocate 32GB on each node
#SBATCH --mem-per-gpu=8G: allocate memory w.r.t. each GPU
#SBATCH --mem-per-cpu=4G: allocate memory w.r.t. each CPU core

# "srun" will execute "main.py" for "ntask" times, each time will run with a process.
srun main.py

Some useful environment variables after submitting a Slurm job

  • SLURM_JOBID: job ID
  • SLURM_NTASKS/SLURM_NPROCS: number of tasks or processes
  • SLURM_NNODES/SLURM_JOB_NUM_NODES: number of nodes
  • SLURM_NTASKS_PER_NODE: number of tasks/processes on each node
  • SLURM_GPUS_ON_NODE:在 step 中当前 node 上的可用 GPU 数目
  • SLURM_PROCID:the (global) rank
  • SLURM_LOCALID:the locak rank
  • SLURM_NODEID:the rank of the node (node_rank)

How to get the IP address of the master node

  • SLURMD_NODELIST: all allocated in a job
  • SLURMD_NODENAME: to get the node name on which the current process is running on
  • SLURM_LAUNCH_NODE_IPADDR: the IP address of the launch node (on which the task launch was initiated or the srun command was executed).
  • SLURM_SRUN_COMM_HOST: the node for slurm communication
  • SLURM_SRUN_COMM_PORT: the port for slurm communication. (Attention: do not pass this port to dist.init_process_group())

A tricky way to get the IP address of the master node

subprocess.getoutput(f"scontrol show hostname ${SLURMD_NODELIST} | head -n1")

The architecture of distributed training

RingALLReduce

[To be updated.]

2-torch.nn.DataParallel

Features:

  • a single process for all GPUs
  • only support Single-Node Nulti-GPU mode

This method is too slow and is not recommended.

3-torch.distributed & torch.nn.DistributedDataParallel

Features:

  • Support Multi-Node Multi-GPU
  • Each GPU has its own process, each process has its own model and optimizer;
  • Only limited data (i.e., gradient) need to be communicated among processes;
  • After the gradient computation within each process, RingALLReduce was used to average the gradient information, then the gradient was broadcasted from the node with rank=0 to other nodes

Some functions/classes in PyTorch

  • torch.nn.parallel.DistributedDataParallel(): wrap the model
  • torch.nn.SyncBatchNorm.convert_sync_batchnorm(): convert the BN layers in the model
  • torch.utils.data.distributed.DistributedSampler(): split the data into each processe ([Attention] do not foget to call sampler.set_epoch(current_epoch) or loader.sampler.set_epoch(current_epoch) within each epoch to guarantee the data splitting with in each epoch is different)
  • torch.distributed.init_process_group(): initialize the process group. Tell each process “how large the process group is”, “which rank you are”

3.0-[Take-away] unified code

Flexible conversion between Single-Node Multi-GP and Multi-Node Multi-GPU

def init_distributed(backend="nccl", port=None):
    ### check the launch method
    # if launched with 'torch.distributed.launch/torchrun' on single node
    if "RANK" in os.environ:
        local_rank = int(os.environ["LOCAL_RANK"])
        rank = int(os.environ["RANK"])
        world_size = int(os.environ["WORLD_SIZE"])
    # otherwise,  using 'slurm' as default
    elif "SLURM_JOB_ID" in os.environ:
        rank = int(os.environ["SLURM_PROCID"])
        local_rank = int(os.environ["SLURM_LOCALID"])
        world_size = int(os.environ["SLURM_NTASKS/SLURM_NPROCS"])
        node_list = os.environ["SLURM_NODELIST"]
        addr = subprocess.getoutput(f"scontrol show hostname {node_list} | head -n1")
        # specify master port
        if port is not None:
            os.environ["MASTER_PORT"] = str(port)
        elif "MASTER_PORT" in os.environ:
            pass  # use MASTER_PORT in the environment variable
        else:
            os.environ["MASTER_PORT"] = "29005"
        if "MASTER_ADDR" not in os.environ:
            os.environ["MASTER_ADDR"] = addr
        os.environ["WORLD_SIZE"] = os.environ["SLURM_NTASKS"]
        os.environ["LOCAL_RANK"] = os.environ["SLURM_LOCALID"]
        os.environ["RANK"] = os.environ["SLURM_PROCID"]
    else:
        raise EnvironmentError("Neither slurm or torchrun/torch.distributed.launch")

    torch.cuda.set_device(local_rank)

    dist.init_process_group(backend=backend)
    return local_rank, rank, world_size

def main(args):
    local_rank, rank, world_size = init_distributed()
    print(f"launching: local_rank={local_rank}, rank={rank}, world_size={world_size}")
    main_worker(local_rank, rank, world_size, args)

How to launch

### Single-Node Multi-GPU:'torchrun/torch.distributed.launch'

#!/bin/bash
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=4
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-task=1
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
torchrun \
    --nnodes=1 \
    --nproc_per_node=${SLURM_NTASKS_PER_TASK} \
    --node_rank=0 \
    main.py


### Multi-Node Multi-GPU:'slurm'

#!/bin/bash
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=4
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-task=1
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
srun python main.py

3.1-Launched with torch.distrubuted.launch (need to launch on each separate node, torch<=1.9.0)

Access the environment variables:

local_rank = int(os.environ["LOCAL_RANK"]) ##adopted "--use_env"

rank = dist.get_rank()
# or
rank = int(os.environ["RANK"])

world_size = dist.get_world_size()
# or
world_size = int(os.environ["WORLD_SIZE"])

An example for main.py

# main.py
import torch
import argparse
import torch.distributed as dist

parser = argparse.ArgumentParser()
### Pass an argument to "local_rank"
parser.add_argument('--local_rank', default=-1, type=int,
                    help='node rank for distributed training')

def main_worker(rank, world_size, args):
    local_rank = int(os.environ["LOCAL_RANK"])
    # set device
    torch.cuda.set_device(local_rank)
    # set backend for GPU communication
    dist.init_process_group(backend='nccl')

    ### apply "torch.utils.data.distributed.DistributedSampler" to dataset
    train_dataset = ...
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=...,
        sampler=train_sampler)
    ### wrap the model with "torch.nn.parallel.DistributedDataParallel"
    model = ...
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])

    optimizer = optim.SGD(model.parameters())

    for epoch in range(100):
            train_sampler.set_epoch(epoch) #
        for batch_idx, (data, target) in enumerate(train_loader):
            images = images.cuda(non_blocking=True)
            target = target.cuda(non_blocking=True)
            ...

    ### destroy the process group
    dist.destroy_process_group()

def main():
    args = parser.parse_args()
    rank = int(os.environ["RANK"])
    world_size = int(os.environ["WORLD_SIZE"])
    main_worker(rank, world_size, args)

How to launch

### execute on Node 0:
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch \
    --nnodes=2 --node_rank=0 --nproc_per_node=4 \
    --master_addr='172.18.39.122' --master_port='29500' \
    --use_env \
        main.py
### execute on Node 1:
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch \
    --nnodes=2 --node_rank=1 --nproc_per_node=4 \
    --master_addr='172.18.39.122' --master_port='29500' \
    --use_env \
        main.py
python -m torch.distributed.launch --help
usage: launch.py [-h] [--nnodes NNODES] [--node_rank NODE_RANK] [--nproc_per_node NPROC_PER_NODE] [--master_addr MASTER_ADDR]
                                    [--master_port MASTER_PORT] [--use_env] [-m] [--no_python]
                    [--logdir LOGDIR]
                    training_script ...

core arguments:

  • --init_method: how to initialize the process group. If “init_method” and “store” are not specified, set the default value “env://” that indicated “initialize by loading environmental variables”
    • init_method="env://": initialize by loading environmental variables
    • init_method="tcp://192.168.1.1:1234": initialize by the master IP and port of the node with rank=0
    • init_method="file://xxxx": initialize through a shared filesystem
  • --nnodes: number of the nodes
  • --node_rank: the ranks of the nodes, start with “0”
  • --nproc_per_node: number of the processes (or GPUs) on each node
  • --use_env: if loading the arguments fron the emvironmental variables or not, If used, will load the following environmental variables:
    • MASTER_ADDR: the IP address of the master node
    • MASTER_PORT: an idle port on the master node
    • WORLD_SIZE: size of the process group
    • RANK: the rank of the current node

New features compared to torch.distributed.launch:

  • Failover:can auto-relaunch all the workers when a worker-failuer happened;
  • Elastic: can dynamically add or delete a node.
### execute on Node 0:
torchrun --nnodes=2 --node_rank=0 --nproc_per_node=4 \
	--master_addr='172.18.39.122' --master_port='29500' \
		main.py
### execute on Node 2:
torchrun --nnodes=2 --node_rank=1 --nproc_per_node=4 \
	--master_addr='172.18.39.122' --master_port='29500' \
		main.py
$ torchrun -h
usage: torchrun [-h] [--nnodes NNODES] [--nproc_per_node NPROC_PER_NODE] [--rdzv_backend RDZV_BACKEND] [--rdzv_endpoint RDZV_ENDPOINT]
                [--rdzv_id RDZV_ID] [--rdzv_conf RDZV_CONF] [--standalone] [--max_restarts MAX_RESTARTS] [--monitor_interval MONITOR_INTERVAL]
                [--start_method {spawn,fork,forkserver}] [--role ROLE] [-m] [--no_python] [--run_path] [--log_dir LOG_DIR] [-r REDIRECTS]
                [-t TEE] [--node_rank NODE_RANK] [--master_addr MASTER_ADDR] [--master_port MASTER_PORT]
                training_script ...

Torch Distributed Elastic Training Launcher

positional arguments: training_script : Full path to the (single GPU) training program/script to be launched in parallel, followed by all the arguments for the training script. training_script_args:

Some core arguments (only the new features compared to torch.distributed.launch)

  • --nnodes=1:3 : the range of the nodes used for training (e.g., minimum 1 node, maximum 3 nodes). (Can dynamically add the nodes if necessary)
  • --max_restarts=3: The maximum number for rebooting the process group. Please note that node fail, node scale down and node scale up will cause the restart.
  • --rdzv_id=1: all nodes will use the same job id
  • --rdzv_backend: the backend for rendezvous. Support “c10d” and “etcd” in default. “rendezvous” was used for the communication and scheduling among multiple nodes.
  • --rdzv_endpoint:the address of rendezvous. Should be “host ip” and “port” of a node.

3.3-Launched with torch.multiprocessing.spawn

# main.py
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def main_worker(local_rank, world_size, args):
    rank = args.node_rank * args.nproc_per_node + local_rank
    # initialization
    dist.init_process_group(backend='nccl',
			init_method='tcp://{}:{}'.format(args.master_addr, args.master_port),
			world_size=world_size,
			rank=rank)
    # set evice
    torch.cuda.set_device(local_rank)

    ### apply "torch.utils.data.distributed.DistributedSampler" to dataset
    train_dataset = ...
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(train_dataset,
			batch_size=...,
			sampler=train_sampler)

    ### wrap the model with "torch.nn.parallel.DistributedDataParallel"
    model = ...
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])

    optimizer = optim.SGD(model.parameters())

    for epoch in range(100):
        train_sampler.set_epoch(epoch) ###
        for batch_idx, (data, target) in enumerate(train_loader):
            images = images.cuda(non_blocking=True)
            target = target.cuda(non_blocking=True)
            ...
            output = model(images)
            loss = criterion(output, target)
            ...
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

def main():
    parser.add_argument(
        "--nnodes", default=1, type=int, help="number of nodes for distributed training"
    )
    parser.add_argument(
        "--nproc_per_node",
        default=1,
        type=int,
        help="number of processes(GPUs_ per node for distributed training",
    )
    parser.add_argument(
        "--master_addr", default="127.0.0.1", type=str, help="master node IP address"
    )
    parser.add_argument("--master_port", default="12345", type=str, help="master node free port")
    parser.add_argument(
        "--node_rank", default=0, type=int, help="node rank for distributed training"
    )
    args = parser.parse_args()
    args.world_size = args.nnodes  args.nproc_per_node
    ### launch (please note "nproces" is the number of processes on each node, since "mp.spawn()" will allocate "local_rank" automatically)
    mp.spawn(main_worker, nprocs=args.nproc_per_node, args=(args.world_size, args))

How to launch

# execute on Node 0:
python main.py \
	--nnodes=2 --node_rank=0 --nproc_per_node=4 \
	--master_addr='172.18.39.122' --master_port='29500'

# execute on Node 1:
python main.py \
	--nnodes=2 --node_rank=1 --nproc_per_node=4 \
	--master_addr='172.18.39.122' --master_port='29500'

3.4-Launched with Slurm

Many clusters are managed by Slurm. With the command like srun, we do not need to execute/launch the script on each node.

How to organize main.py

def main_worker(local_rank, world_size, args):
    rank = int(os.environ["RANK"])
    print(f"launching: local_rank={local_rank}, rank={rank}, world_size={world_size}")
    dist.init_process_group(backend="nccl")

    # set device
    torch.cuda.set_device(local_rank)

    ### apply "torch.utils.data.distributed.DistributedSampler: to dataset
    train_dataset = ...
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
    train_loader = torch.utils.data.DataLoader(train_dataset,
			batch_size=...,
			sampler=train_sampler)

    ### wrap the model with "torch.nn.parallel.DistributedDataParallel"
    model = ...
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])

    optimizer = optim.SGD(model.parameters())

    for epoch in range(100):
        train_sampler.set_epoch(epoch) #
      for batch_idx, (data, target) in enumerate(train_loader):
          images = images.cuda(non_blocking=True)
          target = target.cuda(non_blocking=True)
          ...
          output = model(images)
          loss = criterion(output, target)
          ...
          optimizer.zero_grad()
          loss.backward()
          optimizer.step()

def main(cfg, args):
    # args.ntasks_per_node = int(os.environ["SLURM_NTASKS_PER_NODE"])
    # args.world_size = int(os.environ["SLURM_NTASKS"])
    # args.node_rank = int(os.environ["SLURM_NODEID"])
    # args.node_list = os.environ["SLURM_NODELIST"]
    # args.master_addr = subprocess.getoutput(f"scontrol show hostname {args.node_list} | head -n1")

    args.master_addr = os.environ["SLURM_LAUNCH_NODE_IPADDR"]
    # save the following variables into environment for dist.init_process_group()
    os.environ["RANK"] = os.environ["SLURM_PROCID"]
    os.environ["LOCAL_RANK"] = os.environ["SLURM_LOCALID"]
    os.environ["WORLD_SIZE"] = os.environ["SLURM_NTASKS"]
    os.environ["MASTER_ADDR"] = os.environ["SLURM_LAUNCH_NODE_IPADDR"]
    os.environ["MASTER_PORT"] = "22789"

    local_rank = int(os.environ["SLURM_LOCALID"])
    rank = int(os.environ["SLURM_PROCID"])
    world_size = int(os.environ["SLURM_NTASKS"])
    main_worker(local_rank, world_size, cfg, args)

slurm script for job submission (submitter.shell)

#!/bin/bash
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=2
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-task=2
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}

srun python main.py

Then run with

sbatch submitter.sh

4-Third-party tools

PyTorch-Lightning

PyTorch-Lightning

Features:

  • support multiple strategies

Example:

# train on 32 GPUs (4 nodes)
trainer = Trainer(accelerator="gpu", devices=8, strategy="ddp", num_nodes=4)

Apex:

Example

Horovod:

Example

5-Distributed evaluation or test

References:

Core methods:

output = model(images)
loss = criterion(output, target)

# synchronize the sliced data within each GPU for distributed evaluation
def reduce_tensor(tensor):
    rt = tensor.clone()
    dist.all_reduce(rt, op=dist.reduce_op.SUM)
    rt /= args.world_size
    return rt

output = model(images)
loss = criterion(output, target)

#
torch.distributed.barrier()

reduced_loss = loss.data.clone()
dist.all_reduce(reduced_loss , op=torch.distributed.reduce_op.SUM)
reduced_loss /= world_size



    Enjoy Reading This Article?

    Here are some more articles you might like to read next:

  • Container for Deep Learning Environment
  • Vision Transformer from scratch
  • Transformer model from scratch