2

Distributed Data-Parallel training of PyTorch

 3 months ago
source link: https://donghao.org/2024/01/17/distributed-data-parallel-training-of-pytorch/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Distributed Data-Parallel training of PyTorch

Let’s get to the point directly:

import os
import time

import torch
import torch.nn as nn
import torch.distributed as dist

from model import resnet152
from dataset import get_data_loaders
from torch.nn.parallel import DistributedDataParallel as DDP

learning_rate = 0.001
num_epochs = 40
momentum = 0.9
weight_decay = 1e-5


def setup():
    # initialize the process group
    dist.init_process_group("nccl")


def cleanup():
    dist.destroy_process_group()


def train(rank, world_size):
    setup()

    model = resnet152().to(rank)
    model = DDP(model)

    if rank == 0 and os.path.exists("last.pth"):
        obj = torch.load("last.pth")
        print(f"Rank{rank} load 'last.pth' with epoch: {obj['epoch']}")
        model.load_state_dict(obj["model"])
        begin = obj["epoch"] + 1
    else:
        begin = 0
    print(f"Rank{rank} begin at {begin}")

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    start = time.time()
    running_loss = 0
    trainloader, testloader = get_data_loaders(rank, world_size)

    for epoch in range(begin, num_epochs):
        trainloader.sampler.set_epoch(epoch)
        for index, (images, labels) in enumerate(trainloader):
            # gpu
            images, labels = images.to(rank), labels.to(rank)

            outputs = model(images)

            loss = criterion(outputs, labels)

            # backward and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        # train
        correct = 0
        total = 0
        with torch.no_grad():
            for data in trainloader:
                images, labels = data

                # gpu
                images, labels = images.to(rank), labels.to(rank)

                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        trainset_accu = 100 * correct / total

        # test
        correct = 0
        total = 0
        with torch.no_grad():
            for data in testloader:
                images, labels = data
                # gpu
                images, labels = images.to(rank), labels.to(rank)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        testset_accu = 100 * correct / total
        if rank == 0:
            print(
                f"[{epoch}] Accu: {trainset_accu:.2f}%, {testset_accu:.2f}% \
                    | {(time.time() - start)/60.0:.1f} mins, loss: {running_loss}"
            )
            torch.save(model.state_dict(), f"cifar100_{epoch}.pth")
            torch.save({"model": model.state_dict(), "epoch": epoch}, "last.pth")
        running_loss = 0.0

    end = time.time()
    stopWatch = end - start
    print("Training is done")
    print("Total Training Time (second):", stopWatch)
    cleanup()


if __name__ == "__main__":
    local_rank = int(os.environ["LOCAL_RANK"])
    world_size = int(os.environ["WORLD_SIZE"])
    train(local_rank, world_size)
Python
import os
import time
import torch
import torch.nn as nn
import torch.distributed as dist
from model import resnet152
from dataset import get_data_loaders
from torch.nn.parallel import DistributedDataParallel as DDP
learning_rate = 0.001
num_epochs = 40
momentum = 0.9
weight_decay = 1e-5
def setup():
    # initialize the process group
    dist.init_process_group("nccl")
def cleanup():
    dist.destroy_process_group()
def train(rank, world_size):
    setup()
    model = resnet152().to(rank)
    model = DDP(model)
    if rank == 0 and os.path.exists("last.pth"):
        obj = torch.load("last.pth")
        print(f"Rank{rank} load 'last.pth' with epoch: {obj['epoch']}")
        model.load_state_dict(obj["model"])
        begin = obj["epoch"] + 1
    else:
        begin = 0
    print(f"Rank{rank} begin at {begin}")
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    start = time.time()
    running_loss = 0
    trainloader, testloader = get_data_loaders(rank, world_size)
    for epoch in range(begin, num_epochs):
        trainloader.sampler.set_epoch(epoch)
        for index, (images, labels) in enumerate(trainloader):
            # gpu
            images, labels = images.to(rank), labels.to(rank)
            outputs = model(images)
            loss = criterion(outputs, labels)
            # backward and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        # train
        correct = 0
        total = 0
        with torch.no_grad():
            for data in trainloader:
                images, labels = data
                # gpu
                images, labels = images.to(rank), labels.to(rank)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        trainset_accu = 100 * correct / total
        # test
        correct = 0
        total = 0
        with torch.no_grad():
            for data in testloader:
                images, labels = data
                # gpu
                images, labels = images.to(rank), labels.to(rank)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        testset_accu = 100 * correct / total
        if rank == 0:
            print(
                f"[{epoch}] Accu: {trainset_accu:.2f}%, {testset_accu:.2f}% \
                    | {(time.time() - start)/60.0:.1f} mins, loss: {running_loss}"
            )
            torch.save(model.state_dict(), f"cifar100_{epoch}.pth")
            torch.save({"model": model.state_dict(), "epoch": epoch}, "last.pth")
        running_loss = 0.0
    end = time.time()
    stopWatch = end - start
    print("Training is done")
    print("Total Training Time (second):", stopWatch)
    cleanup()
if __name__ == "__main__":
    local_rank = int(os.environ["LOCAL_RANK"])
    world_size = int(os.environ["WORLD_SIZE"])
    train(local_rank, world_size)

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK