A Step by Step Guide to Building A Distributed, Spot-based Training Platform on AWS Using TorchElastic and Kubernetes

Assaf Pinhasi
PyTorch
Published in
10 min readJun 29, 2021

--

This is part II of a two-part series, describing our solution for running distributed training on spot instances using TorchElastic and Kubernetes.

Part I introduced our overall technology selection, design principles and benchmarks.

In Part II, we will walk you through the process of creating a simplified version of the training environment on AWS.

Note: This post assumes you’ve read Part I of this series, and that you have a decent level of experience working with Kubernetes and AWS infrastructure.

Target architecture

Image by the author

Outline

  1. Build a training container
  2. Set up an EKS cluster
  3. Install TorchElastic infra on the cluster
  4. Set up an EFS PersistentVolumeClaim
  5. Create a TorchElastic training job
  6. Test your training job
  7. Make the cluster elastic
  8. Where to go from here

Prerequisites

  • EC2 Linux box. Optional: 2+ GPU’s
  • Python 3.8+
  • Docker
  • Git client
  • AWS CLI
  • Sufficient permissions to set up the above AWS infra

Build a training container

Our training container will be based on an official PyTorch docker, to which we will add:

  1. TorchElastic v0.2.2
  2. TorchElastic’s Imagenet example training script.
  3. etcd server (Optional component, for testing the container locally)

Note:

  1. To fully understand the training process, see Part I of this series, and read up on the subject in Torchelastic’s documentation.
  2. TorchElastic has been upstreamed and will be available as part PyTorch from version 1.9 onwards (!).
    However, for the time being, this tutorial will rely on TorchElastic v0.22.

Create the workspace

export REGION=<your AWS region>
mkdir torchelastic_test
cd torchelastic_test
git clone https://github.com/pytorch/elastic.git
cd elastic
git checkout tags/v0.2.2
cd ..

Create a Dockerfile

Note:
If you are not interested in running training jobs locally at all, you can drop the part which installs etcd.

Build and test your training docker

Build the docker, and tag it as torchelastic_test:

docker build . -t torchelastic_test

Test the docker:

docker run torchelastic_test

You should see the TorchElastic help:

(base) ubuntu@torchelastic:~/torchelastic_test$ docker run torchelastic_test
[INFO] 2021-05-31 08:54:00,782 launch: Running torchelastic.distributed.launch with args: ['/opt/conda/lib/python3.8/site-packages/torchelastic/distributed/launch.py', '--help']
usage: launch.py [-h] [--nnodes NNODES] [--nproc_per_node NPROC_PER_NODE]...

Optional — test the docker locally on a machine with GPU’s

Download the tiny imagenet dataset

wget -q http://cs231n.stanford.edu/tiny-imagenet-200.zip && unzip -q tiny-imagenet-200.zip -d data && rm tiny-imagenet-200.zip

Run the training job, mounting the data folder into the docker:

docker run -v ${PWD}/data/tiny-imagenet-200:/tiny-imagenet-200 --shm-size=2g torchelastic_test --standalone main.py --arch resnet18 ---epochs 20 --batch-size 32 /tiny-imagenet-200

You should see a training loop starting.

Finally, tag and push your image to ECR (e.g. follow this guide).

Set up the EKS cluster

We will create a new EKS cluster called ‘torchelastic’.

Inside it, we will create two node groups:

  1. Managers — On-demand CPU instance(s), for cluster-level pods
  2. Workers — Spot GPU instances for training pods

The easiest way to get up and running with EKS is to use eksctl.
Save the following to cluster.yaml and run:

eksctl create cluster -f cluster.yaml

A few points to notice:

  • Lines 9–16: By default, EKS creates a separate VPC and subnets for the cluster. This makes sharing the EFS between your dev box and the cluster more difficult — though supposedly possible. To work around this, we create the cluster in existing subnets (EKS requires at least two AZs)
  • Line 21, 36: AWS will pick the suitable AMI (e.g. inc. GPU support)
  • Lines 26, 44: We specify the AZ for the nodegroups to run on — running in the same zone is important to avoid cross AZ network charges which can be significant. More on that later
  • Lines 30, 38, 53–54: The nodegroups are marked so that an autoscaler can control them
  • Line 55: The GPU nodegroup has a taint that prevents cluster infra services from getting scheduled on them

If you check your running pods, you will see that EKS kindly installed the NVIDIA driver on all nodes. This driver enables Kubernetes to recognize GPUs in the cluster.

Install TorchElastic infra on the cluster

In order to run on Kubernetes, TorchElastic requires a few pieces of infrastructure:

  1. A custom resource definition representing a TorchElastic Job
  2. The TorchElastic controller which monitors the CRD
  3. Some roles
  4. An etcd service which serves as a backend for the Rendezvous

TorchElastic comes with default deployments that work out of the box. These will be created in a namespace called elastic-job:

kubectl apply -k ${PWD}/elastic/kubernetes/config/default
kubectl apply -f ${PWD}/elastic/kubernetes/config/samples/etcd.yaml

You should be able to see the controller and the etcd running
in the elastic-job namespace:

                                                                  NAME                             READY   STATUS    RESTARTS   AGEelastic-job-k8s-controller-XXX   1/1     Running   0          126metcd                             1/1     Running   0          126m

Set up an EFS PersistentVolumeClaim

In order to feed training jobs with data, and collect their outputs, it’s useful to have a central file system that is available on the training workers as well as your dev. box.

EFS is AWS’s managed NFS and is a good candidate for such a shared filesystem. There are multiple ways to connect EFS to EKS, but AWS recommends using EFS CSI driver.

Follow this guide to set up an EFS file system and connect it to your EKS cluster. Continue to the optional section, to the “static” path.

To finish off, save the following into pvc.yaml, and apply it.
This will create a PersistentVolumeClaim for EFS, in the elastic-job namespace:

kubectl apply -f pvc.yaml

Next, on your dev. box, mount the EFS to your local /mnt/efs.

From your dev. box, create the following folders on the newly mounted EFS:

  1. /mnt/efs/data for storing the training data
  2. /mnt/efs/output — for capturing output: checkpoints, debug images, Tensorbaord files etc.

Then, place the training data in the /data folder

mkdir /mnt/efs/data
mkdir /mnt/efs/output
wget -q http://cs231n.stanford.edu/tiny-imagenet-200.zip && unzip -q tiny-imagenet-200.zip -d /mnt/efs/data && rm tiny-imagenet-200.zip

Create a TorchElastic training job

Now we are ready to create the TorchElastic training job.
Save the following into imagenet.yaml:

This deployment resembles a standard replicaset deployment, with a few subtle differences:

  • Line 4: configured the job name, which is used as the name of the pods and the job resource.
  • Line 8–9: configure how many workers will our job wishes to have (range)
  • Line 18–21: tolerate the taint we put on all GPU nodes to enable the pods to be scheduled on these nodes
  • Line 39–40, 44–46: mount EFS using the PVC we created earlier into the pods, under /mnt/efs inside the container.
  • Line 34: point the training to load data from /mnt/efs/data/tiny-imagenet-200
  • Line 32: Store checkpoints in /mnt/efs/output/checkpoint.pth
  • Line 41–42, 47–49: Enable /dev/shm to support memory sharing, used by the multi-processing data loaders

Test your training job

Now that we have set up the basic pieces, let’s submit a job to the cluster and watch it run:

kubectl apply -f imagenet.yaml

Check that the pods entered a running state:

kubectl get pods -n elastic-job

Look at the training logs:

(base) ubuntu@torchelastic:~/torchelastic_test$ Kubectl logs -f imagenet-worker-0 -n elastic-job
[INFO] 2021-05-30 13:17:27,917 api: [default] Starting worker group
=> set cuda device = 0
=> creating model: resnet18
=> no workers have checkpoints, starting from epoch 0
=> start_epoch: 0, best_acc1: 0
Epoch: [0][ 0/3125] Time 2.255 ( 2.255) Data 0.299 ( 0.299) Loss 6.9839e+00 (6.9839e+00) Acc@1 0.00 ( 0.00) Acc@5 0.00 ( 0.00)
Epoch: [0][ 10/3125] Time 0.222 ( 0.389) Data 0.000 ( 0.028) Loss 7.1638e+00 (7.1783e+00) Acc@1 3.12 ( 1.42) Acc@5 3.12 ( 3.69)

Eventually, you should see a checkpoint file created in your /mnt/efs/output folder:

(base) ubuntu@torchelastic:~/torchelastic_test$ ls /mnt/efs/output/checkpoint.pth  model_best.pth.tar

Make the cluster elastic

Recall that we’ve configured our workers nodegroup to have two GPU nodes. What if we want to run larger training jobs? Or several training jobs at the same time?

To support this use case, we would like the cluster to automatically scale the number of nodes such that all training pods, across all training jobs, will have the GPU resources they need in order to run.

When the training pods complete or exit, we would like the cluster to scale down automatically to conserve resources.

The way to achieve this is by using a cluster-autoscaler. To set it up, you can follow either the more complex AWS guide, or this much lighter guide.

Notes:

  1. See more about cluster-autoscaler’s interaction with TorchElastic
    in part I
  2. Use the auto-discover-asg flavor of the autoscaler tutorial
  3. Make sure to edit the YAML in the guide with your cluster name

Scale-up

Now that we have installed an autoscaler let’s see it in action. We could scale up the imagenet training job, but it would be more fun to start another training job instead. Let’s create a copy of the training job’s deployment file, and change:

  1. The name of the job to be imagenet2
  2. The checkpoint path to be checkpoint2.pth
apiVersion: elastic.pytorch.org/v1alpha1
kind: ElasticJob
metadata:
name: imagenet2

" — checkpoint-file=/mnt/efs/output/checkpoint2.pth"

Save this as imagenet2.yaml and Apply the job to the cluster:

kubectl apply -f imagenet2.yaml

And you will see new pods in pending state.

Let’s take a look at the cluster auto-scaler logs and see what is happening:

kubectl logs -f cluster-autoscaler-XXXX -n kube-system

At some point, you should see the lines:

I0531 07:00:23.692159       1 scale_up.go:435] Estimated 2 nodes needed in eksctl-torchelastic-nodegroup-workers-NodeGroup-xxx
I0531 07:00:23.692177 1 scale_up.go:539] Final scale-up plan: [{eksctl-torchelastic-nodegroup-workers-NodeGroup-xxx 2->4 (max: 10)}]
I0531 07:00:23.692193 1 scale_up.go:700] Scale-up: setting group eksctl-torchelastic-nodegroup-workers-NodeGroup-xxx size to 4
I0531 07:00:23.692241 1 auto_scaling_groups.go:219] Setting asg eksctl-torchelastic-nodegroup-workers-NodeGroup-xxx size to 4

Soon, you should be able to see the new nodes have joined the cluster:

(base) ubuntu@torchelastic:~/tochelastic_test$ kubectl get nodes --show-labels | grep "workers" | wc -l4

Finally, the new training pods should get scheduled on these nodes and enter a running state.

Scale down

To cause the cluster to scale down, let’s kill one of the training jobs, like so:

kubectl delete ElasticJob/imagenet2 -n elastic-job

Looking at the autoscaler logs, we can see that it waits for 10 minutes of “cool down” before scaling down un-utilized nodes:

I0531 07:42:09.468430 1 scale_down.go:716] ip-xxx.us-west-2.compute.internal was unneeded for 3m51.560173298s
I0531 07:42:09.468439 1 scale_down.go:716] ip-yyy.us-west-2.compute.internal was unneeded for 3m41.542111195s
I0531 07:51:13.262373 1 node_tree.go:100] Removed node “ip-xxx.us-west-2.compute.internal” in group “us-west-2:xxx:us-west-2a” from NodeTree
I0531 07:51:18.362296 1 node_tree.go:100] Removed node “ip-yyy.us-west-2.compute.internal” in group “us-west-2:xxx:us-west-2a” from NodeTree

Finally, you should see that the cluster scaled down to its original size:

(base) ubuntu@torchelastic:~/torchelastic_test$ kubectl get nodes -- show-labels | grep “workers” | wc -l2

Scale down to zero

Recall that when we set up the EKS cluster, we specified that the minimal and desired node count for the workers nodegroup is 2.
However, this is wasteful in case there are no training jobs running.

Let's edit the worker nodegroup and set:

eksctl scale nodegroup --cluster torchelastic --nodes=0 --nodes-min=0 --name workers

Of course, you could configure this in the torchelastic.yaml file ahead of time.

Now, kill the remaining training job.

After 10 minutes or so, the GPU nodes should be scaled down to zero.

Handle node interruptions

One of TorchElastic’s unique benefits is its ability to handle training elasticity — i.e. nodes joining and departing from the training job itself during training. This is particularly useful when running on spot instances.

To see this in action, let’s change our training job to require 4 workers, and kick it off. After a few minutes, you should see the cluster scaled up to 4 nodes and the training job is running.

However, instead of waiting for a spot interruption, let’s manually delete a node — via the EC2 console, for example.

Looking at the logs of the training job we can see one of the training pods has stopped:

[E ProcessGroupNCCL.cpp:485] [Rank 1] Watchdog caught collective operation timeout: WorkNCCL(OpType=ALLREDUCE, Timeout(ms)=10000) ran for 13530 milliseconds before timing out.Traceback (most recent call last):
File “main.py”, line 594, in <module>
main()
File “main.py”, line 183, in main
train(train_loader, model, criterion, optimizer, epoch, device_id, print_freq)
File “main.py”, line 462, in train
loss.backward()
File “/opt/conda/lib/python3.8/site-packages/torch/tensor.py”, line 245, in backward
torch.autograd.backward(self, gradient, retain_graph, create_graph, inputs=inputs)
File “/opt/conda/lib/python3.8/site-packages/torch/autograd/__init__.py”, line 145, in backward
Variable._execution_engine.run_backward(
RuntimeError: NCCL communicator was aborted.
[ERROR] 2021–05–31 08:31:55,344 api: failed (exitcode: 1) local_rank: 0 (pid: 11) of binary: /opt/conda/bin/python
[ERROR] 2021–05–31 08:31:55,345 local_elastic_agent: [default] Worker group failed

Here is what happens next:

  1. Torchelastic automatically creates a replacement pod, which will enter a pending state
  2. The cluster-autoscaler will add a replacement node to the cluster to satisfy the unscheduled pod
  3. The pod will get started on the new node
  4. The TorchElastic job will detect the joined pod, and create a new rendezvous
  5. The new pod will join the training job, which will resume with 4 pods

Where to go from here

Hopefully, by now you have a spot-based training cluster, which can scale up and down based on the demand from training jobs. To get here, we took some shortcuts which made sense for a tutorial, but are worth addressing in longer-term solutions:

  1. Ideally, we would like to be able to utilize spot availability in different zones. To do so, the cluster needs to be carefully configured to force all pods of a single training job to be scheduled inside the same AZ, otherwise, the network communication across AZ will be both slow and extremely expensive.
    This is achieved by setting up more nodegroups (one per zone), decorate them with the right labels, and apply some pod affinity rules which will ensure the right spread.
  2. Another attractive feature is to allow training jobs to specify what type of GPU cards they would like to run on. This can be achieved by adding additional node groups per GPU type, and more taints and tolerations to help the autoscaler and the pods to collaborate on this.
  3. You will want to set up some centralized logging and monitoring for the cluster and the jobs, as well as tighten the network and security setup.
    If you don’t already have a centralized logging and monitoring solution, you can use AWS’s rather unsatisfying fluent-bit setup that ships logs to cloudwatch.
  4. Working with deployment YAMLs is ok, but not so convenient.
    You may want to consider creating a CLI tool that enables a more intuitive job submission API, like the one below — or even hook up the submission API to a tool like Hydra that enables you to submit multiple jobs based on parameter sweeps.
screencast by the author

Conclusion

In this post, we demonstrated how to build an auto-scaling, distributed training environment over spot instances on AWS, using TorchElastic and Kubernetes.

We have been using a similar setup to run multiple experiments, each experiment utilizing 4–32 GPU’s each for the past few months, and overall enjoyed the stability and simplicity of this solution.

We hope you found this guide useful, and wish you Happy Training!

--

--