Scheduled upgrade from November 26, 07:00 UTC to November 26, 17:00 UTC
Kindly note that during the maintenance window, app.hopsworks.ai will not be accessible.
5
View the Changes
arrow back
Back to Blog
Moritz Meister
link to linkedin
Software Engineer
Riccardo Grigoletto
link to linkedin
Software Engineer
Article updated on

From 100 to ZeRO: PyTorch and DeepSpeed ZeRO on any Spark Cluster with Maggy

April 27, 2021
7 min
Read
Moritz Meister
Moritz Meisterlink to linkedin
Software Engineer
Hopsworks
Riccardo Grigoletto
Riccardo Grigolettolink to linkedin
Software Engineer

TL;DR

Maggy is an open source framework that lets you write generic PyTorch training code (as if it is written to run on a single machine) and execute that training distributed across a GPU cluster. Maggy enables you to write and debug PyTorch code on your local machine, and then run the same code at scale without having to change a single line in your program. Going even further, Maggy provides the distribution transparent use of ZeRO, a sharded optimizer recently proposed by Microsoft. You can use ZeRO to improve your memory efficiency with a single change in your Maggy configuration. You can try Maggy in the Hopsworks managed platform for free.

Distributed learning - An introduction

Deep learning has seen a surge in activity with the availability of high level frameworks such as PyTorch to build and train models. A few lines of code in a notebook are sufficient to create powerful classifiers from scratch. However, both the data and model sizes to achieve state-of-the-art performance are ever increasing, so that training on your local GPU becomes a hopeless endeavour.

Enter distributed training. Distributed training allows you to train the same model on multiple GPUs on different shards of your data to speed up training times. In the ideal case, training on 4 GPUs simultaneously should reduce your training time by 75%. In distributed training, each GPU computes a forward and backward pass over its own batch of the data. For the model update, the computed gradients are shared and averaged between the nodes. This way, all models update their parameters with the same combined gradient and stay in sync. This additional communication step introduces additional overhead of course, which is why ideal scaling is never truly achieved.

So if distributed training is such a great tool to accelerate training, why is its use still uncommon among normal PyTorch users? Because it is too tedious to use! A dummy example for starting distributed training might look something like this.

def train(args):
    args.world_size = args.gpus * args.nodes
    os.environ['MASTER_ADDR'] = '10.51.45.25'
    os.environ['MASTER_PORT'] = '8888'
    mp.spawn(train, nprocs=args.gpus, args=(args,))
    rank = args.nr * args.gpus + gpus
    dist.init_process_group(backend='nccl',
                            init_method='env://',
                            world_size=args.world_size,
                            rank=rank)
    torch.manual_seed(0)
    model = args.Module()
    torch.cuda.set_device(gpu)
    model = nn.parallel.DistributedDataParallel(
        model.cuda(gpu),
        device_ids=[gpu])
    ...

Going even further, you would need to launch your code on all of your nodes and take care of graceful shutdowns and collecting the results. This is where Maggy comes in. Maggy allows you to launch your PyTorch training script without any changes on Spark clusters. It takes care of the training processes for each node, the resource isolation and node connections.

Next we will explore what is needed to run distribution transparent training on Maggy as well as the restrictions that still exist with the framework.

Building blocks for distribution transparent training

Configuring Maggy

First of all, Maggy requires its experiment to be configured for distributed training. In the most common use case this means passing your model, hyperparameters and your training/test set. Configuring is as easy as creating a config object. Hyperparameters, train and test set are optional and can also be directly loaded in the training loop. If your training loop consists of more than one module such as in training GANs with a Generator and Discriminator or Policy gradient methods in RL, you can also pass a list of modules.

from maggy.experiment_config import TorchDistributedConfig

config = TorchDistributedConfig(module=models.resnet50,train_set=train_set, test_set=test_set)

Writing the training function

Maggy’s API requires the training function to follow a unified signature. Users have to pass their model class, its hyperparameters and the train and test set to the training function.

def train(module, hparams, train_set, test_set):
...

If you want to load your datasets on each node by yourself, you can also omit passing the datasets in the config. In fact, this is highly recommended when working with larger dataset objects. Additionally, every module used in the training function should be imported within that function. Think of your training function as completely self contained. Last but not least, users should use the PyTorch DataLoader (as is best practice anyways). Alternatively, you can also use Maggy’s custom PetastormDataLoader to load large datasets from Petastorm parquet files. When using the latter, users need to ensure that datasets are even, that is they should have the same number of batches per epoch on all nodes. When using PyTorch’s DataLoader, you do not have to care about this. So to summarize, your training function needs to

  • Implement the correct signature
  • Import all used modules inside the function
  • Use the PyTorch DataLoader (or Maggy’s PetastormDataLoader with even Datasets)

Distributed training on Maggy - A complete example

It’s time to combine all the elements we introduced so far in a complete example of distributed training with Maggy. In this example, we are going to create some arbitrary training data, define a function approximator for scalar fields,write our training loop and launch the distributed training.

Generate some training data

In order to not rely on specific datasets, we are going to create our own dataset. For this example, a scalar field should suffice. So first of all we randomly sample x and y and compute some function we want our neural network to approximate. PyTorch’s TensorDataset can then be used to form a proper dataset from this data.

import torch
import torch.nn as nn
import torch.nn.functional as F


coord = torch.rand((10000,2)) * 10 - 5  # Create random x/y coordinates in [-5,5]
z = torch.sin(coord[:,1]) + torch.cos(coord[:,0])  # Calculate scalar field for all points to get a dataset
train_set = torch.utils.data.TensorDataset(coord[:8000,:], z[:8000])
test_set = torch.utils.data.TensorDataset(coord[8000:,:], z[8000:])

Define the approximator

Next up we define our function approximator. For our example a standard neural network with 3 layers suffices, although in real applications you would of course train much larger networks.

class Approximator(torch.nn.Module):

    def __init__(self):
        super().__init__()
        self.l1 = torch.nn.Linear(2,100)
        self.l2 = torch.nn.Linear(100,100)
        self.l3 = torch.nn.Linear(100,1)
        
    def forward(self, x):

Writing the training loop

At the heart of every PyTorch program lies the training loop. Following the APIs introduced earlier, we define our training function as follows.

def train(module, hparams, train_set, test_set):
    import torch
    model = module()

    n_epochs = 100
    batch_size = 64
    lr = 1e-5    
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    loss_criterion = torch.nn.MSELoss()
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size)
    test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size)    
    def eval_model():
        loss = 0
        model.eval()
        for coord, z in test_loader:
            prediction = model(coord).detach()
            loss += loss_criterion(prediction, z)
        return loss

    for epoch in range(n_epochs):
        model.train()
        for coord, z in train_loader:
            optimizer.zero_grad()
            prediction = model(coord)
            loss = loss_criterion(prediction, z)
            loss.backward()
            optimizer.step()
    return eval_model()

As you can see, there is no additional code for distributed training. Maggy takes care of all the necessary things.

Starting the training

All that remains now is to configure Maggy and run our training. For this, we have to create the config object and run the lagom function.

from maggy import experiment
from maggy.experiment_config import TorchDistributedConfig

config = TorchDistributedConfig(module=Approximator, train_set = train_set, test_set=test_set)
experiment.lagom(train, config)  # Starts the training loop

Evaluating the training

After running the training on 4 nodes, we can see that our approximator has converged to a good estimate of our scalar field. Of course, this would also be possible on a local node. But with more complex models and larger training sets such as the ImageNet dataset, distributed learning becomes necessary to leverage your workloads.

Try it for yourself

Maggy is open-source and documentation is available at maggy.ai. Give us a star or get in touch if you have more questions. Maggy is also available for all Hopsworks users in the managed platform on AWS or Azure. You can get started for free (no credit card required).

References