Data and AI on Power

 View Only

Training on Steroids – Leveraging Distributed Training in Kubeflow

By Sebastian Lehrig posted Wed May 17, 2023 10:44 AM

  

In the previous two blog posts, we introduced Kubeflow on Power and showed how data scientists can burst training to arbitrary private or public clouds. In this article, you will learn how you can put your model training literally on steroids by distributing the training process over multiple workers inside your OpenShift or vanilla Kubernetes cluster! Such a distribution is key when working with large language models or foundation models in general.

Let’s start with some theory on distributed training. If you don’t like theory, just skip ahead to the example – you can also make distribution work without all the theory.

Theory: Distributed Training

Distributed training parallelizes a training process over multiple workers. The benefit is a reduced training time. Disadvantages include an increased training code complexity and data synchronization overheads between workers. We will see that our integration into Kubeflow lowers the increase in code complexity to the bare minimum. For coping with data synchronization, there are two general approaches for distributed training: parameter-server-based and Allreduce-based.

Distributed Training based on Parameter-Servers with TFJob

Figure 1 depicts how distributed training with parameter-servers works inside a Kubernetes environment. When Kubeflow is installed, a so-called TFJob resource can be deployed, which realizes the concept of distributed training with parameter-servers for TensorFlow workloads. The TFJob will nominate a master pod to orchestrate and checkpoint the distributed training. Further, several worker pods are spawned, each participating in the training process. Each of those workers trains independently per training epoch on a shard of the training data set. However, at the end of each epoch, trained parameters (e.g., the weights of a neural network) need to be synchronized. This is where the parameter server pods become important.

Figure 1 At the end of each epoch, parameter server pods synchronize the parameters received from several workers. The synchronization result serves each worker as basis for the next epoch.

Figure 1 At the end of each epoch, parameter server pods synchronize the parameters received from several workers. The synchronization result serves each worker as basis for the next epoch.


For each worker, a parameter server pod is created. These pods “know” how parameters can be merged to produce an overall result for parameters. The resulting parameters are then sent back to each worker, so the individual training can start again in the next epoch based on a shared understanding of parameters.

The required communication between pods uses normal Kubernetes mechanisms. That is, each pod communicates to other pods using the services exposed by those pods.

Parameter servers are a simple concept for realizing synchronization. However, they require additional resources and communication overheads for dedicated parameter server pods. Imagine that workers could instead directly synchronize themselves. This would save some resources, right? This is where Allreduce-based comes into the play.

Distributed Training based on Allreduce with MPIJob

Figure 2 depicts how Allreduce-based distributed training works inside a Kubernetes environment. When Kubeflow is installed, also an MPIJob resource can be deployed (instead of the discussed TFJob), which realizes the concept of distributed training with Allreduce-style workloads (which can be specified, for example, via TensorFlow or PyTorch). The MPIJob, too, will nominate a master pod to orchestrate and checkpoint the distributed training. Again, several worker pods are spawned, each participating in the training process as before. And again, at the end of each epoch, trained parameters are synchronized. But this time, workers directly synchronize themselves (hence also the associations to each worker’s service) using a technique called Allreduce.

Figure 2 At the end of each epoch, worker pods synchronize their parameters directly with each other. The synchronization result serves each worker as basis for the next epoch.

Figure 2 At the end of each epoch, worker pods synchronize their parameters directly with each other. The synchronization result serves each worker as basis for the next epoch.


Allreduce starts in a situation where all workers have learned different model parameters. For synchronizing parameters, each worker sends a fraction of its parameters to every other worker (first step in
Figure 3). Next, each worker aggregates all parameter fraction available it, for example, by summing the parameters up (second step in Figure 3). Finally, each worker shares its aggregate with the other workers (final step in Figure 3). Therefore, at the end of this process, all workers are in sync regarding their model parameters and can then again train independently on the next shards of data.

Figure 3 Allreduce achieves synchronization by distributing a data portion from each worker to the other workers, calculating an aggregate such as a sum, and sharing the aggregates among the workers.

Figure 3 Allreduce achieves synchronization by distributing a data portion from each worker to the other workers, calculating an aggregate such as a sum, and sharing the aggregates among the workers.

Distributed Training in Kubeflow

Both TFJob and MPIJob require that data scientists slightly modify their training code. TFJob code needs to use native TensorFlow functions for distribution while MPIJob code uses Horovod bindings for Keras, PyTorch, etc. The monkey classification and music genre classification examples in our GitHub repository exemplify these modifications, respectively (see the cells entitled “Distributed Training”). You will see that training code has only to be modified slightly. The most important modifications take care that a distributed optimizer is applied and that only a “master” worker executes some tasks like checkpointing, saving the final model, etc.

Once the training code is modified, we can simply use the very same training component that we have already used for train bursting – we just have to add a so-called distribution_specification to it:

distribution_specification = {
        "distribution_type": "MPI",
        "number_of_workers": 3,
}

train_comp(
	train_specification=train_specification,
	train_parameters=train_parameters,
	distribution_specification=distribution_specification,
)


The distribution_specification itself configures how training is distributed. In the example, we configure “MPI” as distribution type (to use an Allreduce-style MPIJob) and a number of 3 workers for training. Instead of “MPI”, we can also switch to “TF” (to use a TFJob with parameter servers). The
training component specification documents a complete list of parameters and options:

Optional dictionary specifying the distribution behavior. By default, no distributed training is executed, which results in an ordinary Kubernetes Job for training. Otherwise, dictionary entries determine the distribution behavior. The "distribution_type" entry determines the distribution type: "Job" (no distribution; ordinary Kubernetes job), "MPI" (all-reduce style distribution via Horovod), or "TF" (parameter-server style distribution via distributed training with TensorFlow). Depending on the distribution type, additional dictionary entries can be processed. For distributed training jobs, the "number_of_workers" (e.g., 2) determines the number of worker replicas for training. Individual resource limits can be controlled via "worker_cpus" (e.g., "1000m") and "worker_memory" (e.g., "1Gi"). MPI additionally provides a fine-grained control of launcher cpu and memory limits via "launcher_cpus" (e.g., "1000m") and "launcher_memory" (e.g., "1Gi"). Full example with MPI: {"distribution_type": "MPI", "number_of_workers": 2, "worker_cpus": "8", "worker_memory": "32Gi", "launcher_cpus": "2", "launcher_memory": "8Gi"}.

Try it out!

That’s really it – no complex MPI configuration, fighting with the way GPUs communicate, or setting up workers manually. Give it a try based on the monkey classification and music genre classification examples and stay tuned for more exciting blogs on Kubeflow – we have more to come!

Permalink