Distributed Training with PyTorch
a summary of some experiences about distributed training with PyTorch.
1-Background
This part consists of some basic concepts about the distributed training.
In the following part of this article, we consider the case where we have 2 nodes, and there are 4 GPUs on each node (2 nodes * 4 GPUs/node = 8 GPUs
in total).
Some concepts for DDP with PyTorch
-
group/world
: process group,a group of all processe. In default, there is usually one process group for one job. -
world_size
: number of processes in a group\world; -
rank
: The global index for each process. Within a process group, each process has an independent index for the inter-process communication. The node withrank=0
is the master node. -
local_rank
: The local index of a certain process on the node that it exists. For example,rank=1,local_rank=3
indicates the 4-th GPU on the 2-th node.
Remark: The relationship between rank/process and GPU: one rank or process can contain either one GPU or multiple GPUs. The recommended manner is “one process for one GPU”.
Core arguments of Slurm
#SBATCH --nodes=2: number of the nodes (or machines)
#SBATCH --ntasks=8: number of the tasks/processes. Usually set as the number of GPUs, one GPU per task/process.
#SBATCH --ntasks-per-node=4: number of the tasks/processes on each node. Usually set as the number of GPUs on each node, one GPU per task/process. (Specify by one of `--ntasks` and `--ntasks-per-node`)
#SBATCH --cpus-per-task=2: number of CPU cores for each task/process.
#### How to request GPUs
#SBATCH --gpus-per-task=v100:1 : request n GPUs for each task. Usually set to 1 (one GPU per task).
#SBATCH --gpus-per-node=v100:n : request n GPUs for each node. Usually set to the number of tasks on each node (one GPU per task).
#### How to request RAM
#SBATCH --mem=32G: allocate 32GB on each node
#SBATCH --mem-per-gpu=8G: allocate memory w.r.t. each GPU
#SBATCH --mem-per-cpu=4G: allocate memory w.r.t. each CPU core
# "srun" will execute "main.py" for "ntask" times, each time will run with a process.
srun main.py
Some useful environment variables after submitting a Slurm job
-
SLURM_JOBID
: job ID -
SLURM_NTASKS/SLURM_NPROCS
: number of tasks or processes -
SLURM_NNODES/SLURM_JOB_NUM_NODES
: number of nodes -
SLURM_NTASKS_PER_NODE
: number of tasks/processes on each node -
SLURM_GPUS_ON_NODE
:在 step 中当前 node 上的可用 GPU 数目 -
SLURM_PROCID
:the (global) rank -
SLURM_LOCALID
:the locak rank -
SLURM_NODEID
:the rank of the node (node_rank
)
How to get the IP address of the master node
-
SLURMD_NODELIST
: all allocated in a job -
SLURMD_NODENAME
: to get the node name on which the current process is running on -
SLURM_LAUNCH_NODE_IPADDR
: the IP address of the launch node (on which the task launch was initiated or thesrun
command was executed). -
SLURM_SRUN_COMM_HOST
: the node for slurm communication -
SLURM_SRUN_COMM_PORT
: the port for slurm communication. (Attention: do not pass this port todist.init_process_group()
)
A tricky way to get the IP address of the master node
subprocess.getoutput(f"scontrol show hostname ${SLURMD_NODELIST} | head -n1")
The architecture of distributed training
RingALLReduce
[To be updated.]
2-torch.nn.DataParallel
Features:
- a single process for all GPUs
- only support Single-Node Nulti-GPU mode
This method is too slow and is not recommended.
3-torch.distributed & torch.nn.DistributedDataParallel
Features:
- Support Multi-Node Multi-GPU
- Each GPU has its own process, each process has its own model and optimizer;
- Only limited data (i.e., gradient) need to be communicated among processes;
- After the gradient computation within each process,
RingALLReduce
was used to average the gradient information, then the gradient was broadcasted from the node withrank=0
to other nodes
Some functions/classes in PyTorch
-
torch.nn.parallel.DistributedDataParallel()
: wrap the model -
torch.nn.SyncBatchNorm.convert_sync_batchnorm()
: convert the BN layers in the model -
torch.utils.data.distributed.DistributedSampler()
: split the data into each processe ([Attention] do not foget to callsampler.set_epoch(current_epoch)
orloader.sampler.set_epoch(current_epoch)
within each epoch to guarantee the data splitting with in each epoch is different) -
torch.distributed.init_process_group()
: initialize the process group. Tell each process “how large the process group is”, “which rank you are”
3.0-[Take-away] unified code
Flexible conversion between Single-Node Multi-GP and Multi-Node Multi-GPU
def init_distributed(backend="nccl", port=None):
### check the launch method
# if launched with 'torch.distributed.launch/torchrun' on single node
if "RANK" in os.environ:
local_rank = int(os.environ["LOCAL_RANK"])
rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
# otherwise, using 'slurm' as default
elif "SLURM_JOB_ID" in os.environ:
rank = int(os.environ["SLURM_PROCID"])
local_rank = int(os.environ["SLURM_LOCALID"])
world_size = int(os.environ["SLURM_NTASKS/SLURM_NPROCS"])
node_list = os.environ["SLURM_NODELIST"]
addr = subprocess.getoutput(f"scontrol show hostname {node_list} | head -n1")
# specify master port
if port is not None:
os.environ["MASTER_PORT"] = str(port)
elif "MASTER_PORT" in os.environ:
pass # use MASTER_PORT in the environment variable
else:
os.environ["MASTER_PORT"] = "29005"
if "MASTER_ADDR" not in os.environ:
os.environ["MASTER_ADDR"] = addr
os.environ["WORLD_SIZE"] = os.environ["SLURM_NTASKS"]
os.environ["LOCAL_RANK"] = os.environ["SLURM_LOCALID"]
os.environ["RANK"] = os.environ["SLURM_PROCID"]
else:
raise EnvironmentError("Neither slurm or torchrun/torch.distributed.launch")
torch.cuda.set_device(local_rank)
dist.init_process_group(backend=backend)
return local_rank, rank, world_size
def main(args):
local_rank, rank, world_size = init_distributed()
print(f"launching: local_rank={local_rank}, rank={rank}, world_size={world_size}")
main_worker(local_rank, rank, world_size, args)
How to launch
### Single-Node Multi-GPU:'torchrun/torch.distributed.launch'
#!/bin/bash
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=4
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-task=1
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
torchrun \
--nnodes=1 \
--nproc_per_node=${SLURM_NTASKS_PER_TASK} \
--node_rank=0 \
main.py
### Multi-Node Multi-GPU:'slurm'
#!/bin/bash
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=4
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-task=1
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
srun python main.py
3.1-Launched with torch.distrubuted.launch
(need to launch on each separate node, torch<=1.9.0
)
Access the environment variables:
local_rank = int(os.environ["LOCAL_RANK"]) ##adopted "--use_env"
rank = dist.get_rank()
# or
rank = int(os.environ["RANK"])
world_size = dist.get_world_size()
# or
world_size = int(os.environ["WORLD_SIZE"])
An example for main.py
# main.py
import torch
import argparse
import torch.distributed as dist
parser = argparse.ArgumentParser()
### Pass an argument to "local_rank"
parser.add_argument('--local_rank', default=-1, type=int,
help='node rank for distributed training')
def main_worker(rank, world_size, args):
local_rank = int(os.environ["LOCAL_RANK"])
# set device
torch.cuda.set_device(local_rank)
# set backend for GPU communication
dist.init_process_group(backend='nccl')
### apply "torch.utils.data.distributed.DistributedSampler" to dataset
train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=...,
sampler=train_sampler)
### wrap the model with "torch.nn.parallel.DistributedDataParallel"
model = ...
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
optimizer = optim.SGD(model.parameters())
for epoch in range(100):
train_sampler.set_epoch(epoch) #
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
### destroy the process group
dist.destroy_process_group()
def main():
args = parser.parse_args()
rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
main_worker(rank, world_size, args)
How to launch
### execute on Node 0:
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch \
--nnodes=2 --node_rank=0 --nproc_per_node=4 \
--master_addr='172.18.39.122' --master_port='29500' \
--use_env \
main.py
### execute on Node 1:
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch \
--nnodes=2 --node_rank=1 --nproc_per_node=4 \
--master_addr='172.18.39.122' --master_port='29500' \
--use_env \
main.py
python -m torch.distributed.launch --help
usage: launch.py [-h] [--nnodes NNODES] [--node_rank NODE_RANK] [--nproc_per_node NPROC_PER_NODE] [--master_addr MASTER_ADDR]
[--master_port MASTER_PORT] [--use_env] [-m] [--no_python]
[--logdir LOGDIR]
training_script ...
core arguments:
-
--init_method
: how to initialize the process group. If “init_method” and “store” are not specified, set the default value “env://” that indicated “initialize by loading environmental variables”-
init_method="env://"
: initialize by loading environmental variables -
init_method="tcp://192.168.1.1:1234"
: initialize by the master IP and port of the node withrank=0
-
init_method="file://xxxx"
: initialize through a shared filesystem
-
-
--nnodes
: number of the nodes -
--node_rank
: the ranks of the nodes, start with “0” -
--nproc_per_node
: number of the processes (or GPUs) on each node -
--use_env
: if loading the arguments fron the emvironmental variables or not, If used, will load the following environmental variables:-
MASTER_ADDR
: the IP address of the master node -
MASTER_PORT
: an idle port on the master node -
WORLD_SIZE
: size of the process group -
RANK
: the rank of the current node
-
3.2-Launched with torchrun
(need to launch on each separate node, torch>1.9.0
, recommended)
New features compared to torch.distributed.launch
:
- Failover:can auto-relaunch all the workers when a worker-failuer happened;
- Elastic: can dynamically add or delete a node.
### execute on Node 0:
torchrun --nnodes=2 --node_rank=0 --nproc_per_node=4 \
--master_addr='172.18.39.122' --master_port='29500' \
main.py
### execute on Node 2:
torchrun --nnodes=2 --node_rank=1 --nproc_per_node=4 \
--master_addr='172.18.39.122' --master_port='29500' \
main.py
$ torchrun -h
usage: torchrun [-h] [--nnodes NNODES] [--nproc_per_node NPROC_PER_NODE] [--rdzv_backend RDZV_BACKEND] [--rdzv_endpoint RDZV_ENDPOINT]
[--rdzv_id RDZV_ID] [--rdzv_conf RDZV_CONF] [--standalone] [--max_restarts MAX_RESTARTS] [--monitor_interval MONITOR_INTERVAL]
[--start_method {spawn,fork,forkserver}] [--role ROLE] [-m] [--no_python] [--run_path] [--log_dir LOG_DIR] [-r REDIRECTS]
[-t TEE] [--node_rank NODE_RANK] [--master_addr MASTER_ADDR] [--master_port MASTER_PORT]
training_script ...
Torch Distributed Elastic Training Launcher
positional arguments: training_script
: Full path to the (single GPU) training program/script to be launched in parallel, followed by all the arguments for the training script. training_script_args
:
Some core arguments (only the new features compared to torch.distributed.launch
)
-
--nnodes=1:3
: the range of the nodes used for training (e.g., minimum 1 node, maximum 3 nodes). (Can dynamically add the nodes if necessary) -
--max_restarts=3
: The maximum number for rebooting the process group. Please note that node fail, node scale down and node scale up will cause the restart. -
--rdzv_id=1
: all nodes will use the same job id -
--rdzv_backend
: the backend for rendezvous. Support “c10d” and “etcd” in default. “rendezvous” was used for the communication and scheduling among multiple nodes. -
--rdzv_endpoint
:the address of rendezvous. Should be “host ip” and “port” of a node.
3.3-Launched with torch.multiprocessing.spawn
# main.py
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def main_worker(local_rank, world_size, args):
rank = args.node_rank * args.nproc_per_node + local_rank
# initialization
dist.init_process_group(backend='nccl',
init_method='tcp://{}:{}'.format(args.master_addr, args.master_port),
world_size=world_size,
rank=rank)
# set evice
torch.cuda.set_device(local_rank)
### apply "torch.utils.data.distributed.DistributedSampler" to dataset
train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=...,
sampler=train_sampler)
### wrap the model with "torch.nn.parallel.DistributedDataParallel"
model = ...
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
optimizer = optim.SGD(model.parameters())
for epoch in range(100):
train_sampler.set_epoch(epoch) ###
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()
def main():
parser.add_argument(
"--nnodes", default=1, type=int, help="number of nodes for distributed training"
)
parser.add_argument(
"--nproc_per_node",
default=1,
type=int,
help="number of processes(GPUs_ per node for distributed training",
)
parser.add_argument(
"--master_addr", default="127.0.0.1", type=str, help="master node IP address"
)
parser.add_argument("--master_port", default="12345", type=str, help="master node free port")
parser.add_argument(
"--node_rank", default=0, type=int, help="node rank for distributed training"
)
args = parser.parse_args()
args.world_size = args.nnodes args.nproc_per_node
### launch (please note "nproces" is the number of processes on each node, since "mp.spawn()" will allocate "local_rank" automatically)
mp.spawn(main_worker, nprocs=args.nproc_per_node, args=(args.world_size, args))
How to launch
# execute on Node 0:
python main.py \
--nnodes=2 --node_rank=0 --nproc_per_node=4 \
--master_addr='172.18.39.122' --master_port='29500'
# execute on Node 1:
python main.py \
--nnodes=2 --node_rank=1 --nproc_per_node=4 \
--master_addr='172.18.39.122' --master_port='29500'
3.4-Launched with Slurm
Many clusters are managed by Slurm. With the command like srun
, we do not need to execute/launch the script on each node.
How to organize main.py
def main_worker(local_rank, world_size, args):
rank = int(os.environ["RANK"])
print(f"launching: local_rank={local_rank}, rank={rank}, world_size={world_size}")
dist.init_process_group(backend="nccl")
# set device
torch.cuda.set_device(local_rank)
### apply "torch.utils.data.distributed.DistributedSampler: to dataset
train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=...,
sampler=train_sampler)
### wrap the model with "torch.nn.parallel.DistributedDataParallel"
model = ...
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
optimizer = optim.SGD(model.parameters())
for epoch in range(100):
train_sampler.set_epoch(epoch) #
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()
def main(cfg, args):
# args.ntasks_per_node = int(os.environ["SLURM_NTASKS_PER_NODE"])
# args.world_size = int(os.environ["SLURM_NTASKS"])
# args.node_rank = int(os.environ["SLURM_NODEID"])
# args.node_list = os.environ["SLURM_NODELIST"]
# args.master_addr = subprocess.getoutput(f"scontrol show hostname {args.node_list} | head -n1")
args.master_addr = os.environ["SLURM_LAUNCH_NODE_IPADDR"]
# save the following variables into environment for dist.init_process_group()
os.environ["RANK"] = os.environ["SLURM_PROCID"]
os.environ["LOCAL_RANK"] = os.environ["SLURM_LOCALID"]
os.environ["WORLD_SIZE"] = os.environ["SLURM_NTASKS"]
os.environ["MASTER_ADDR"] = os.environ["SLURM_LAUNCH_NODE_IPADDR"]
os.environ["MASTER_PORT"] = "22789"
local_rank = int(os.environ["SLURM_LOCALID"])
rank = int(os.environ["SLURM_PROCID"])
world_size = int(os.environ["SLURM_NTASKS"])
main_worker(local_rank, world_size, cfg, args)
slurm script for job submission (submitter.shell)
#!/bin/bash
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=2
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-task=2
export OMP_NUM_THREADS=${SLURM_CPUS_PER_TASK}
srun python main.py
Then run with
sbatch submitter.sh
4-Third-party tools
PyTorch-Lightning
Features:
- support multiple strategies
Example:
# train on 32 GPUs (4 nodes)
trainer = Trainer(accelerator="gpu", devices=8, strategy="ddp", num_nodes=4)
Apex:
Horovod:
5-Distributed evaluation or test
References:
Core methods:
-
torch.distributed.all_reduce()
: Documentation
output = model(images)
loss = criterion(output, target)
# synchronize the sliced data within each GPU for distributed evaluation
def reduce_tensor(tensor):
rt = tensor.clone()
dist.all_reduce(rt, op=dist.reduce_op.SUM)
rt /= args.world_size
return rt
output = model(images)
loss = criterion(output, target)
#
torch.distributed.barrier()
reduced_loss = loss.data.clone()
dist.all_reduce(reduced_loss , op=torch.distributed.reduce_op.SUM)
reduced_loss /= world_size
Enjoy Reading This Article?
Here are some more articles you might like to read next: