This is a sequel to a recent post on the topic of building custom, cloud-based solutions for machine learning (ML) model development using low-level instance provisioning services. Our focus in this post will be on Amazon EC2.
Cloud service providers (CSPs) typically offer fully managed solutions for training ML models in the cloud. Amazon SageMaker, for example, Amazon’s managed service offering for ML development, simplifies the process of training significantly. Not only does SageMaker automate the end-to-end training execution — from auto-provisioning the requested instance types, to setting up the training environment, to running your training workload, to saving the training artifacts and shutting everything down — but it also offers a number of auxiliary services that support ML development, such as automatic model tuning, platform optimized distributed training libraries, and more. However, as is often the case with high-level solutions, the increased ease-of-use of SageMaker training is coupled with a certain level of loss of control over the underlying flow.
In our previous post we noted some of the limitations sometimes imposed by managed training services such as SageMaker, including reduced user privileges, inaccessibility of some instance types, reduced control over multi-node device placement, and more. Some scenarios require a higher level of autonomy over the environment specification and training flow. In this post, we illustrate one approach to addressing these cases by creating a custom training solution built on top of Amazon EC2.
Many thanks to Max Rabin for his contributions to this post.
In our previous post we listed a minimal set of features that we would require from an automated training solution and proceeded to demonstrate, in a step-by-step manner, one way of implementing these in Google Cloud Platform (GCP). And although the same sequence of steps would apply to any other cloud platform, the details can be quite different due to the unique nuances of each one. Our intention in this post will be to propose an implementation based on Amazon EC2 using the create_instances command of the AWS Python SDK (version 1.34.23). As in our previous post, we will begin with a simple EC2 instance creation command and gradually supplement it with additional components that will incorporate our desired management features. The create_instances command supports many controls. For the purposes of our demonstration, we will focus only on the ones that are relevant to our solution. We will assume the existence of a default VPC and an IAM instance profile with appropriate permissions (including access to Amazon EC2, S3, and CloudWatch services).
Note that there are multiple ways of using Amazon EC2 to fulfill the minimal set of features that we defined. We have chosen to demonstrate one possible implementation. Please do not interpret our choice of AWS, EC2, or any details of the specific implementation we have chosen as an endorsement. The best ML training solution for you will greatly depend on the specific needs and details of your project.
We begin with a minimal example of a single EC2 instance request. We have chosen a GPU accelerated g5.xlarge instance type and a recent Deep Learning AMI (with an Ubuntu 20.4 operating system).
import boto3region = 'us-east-1'
job_id = 'my-experiment' # replace with unique id
num_instances = 1
image_id = 'ami-0240b7264c1c9e6a9' # replace with image of choice
instance_type = 'g5.xlarge' # replace with instance of choice
ec2 = boto3.resource('ec2', region_name=region)
instances = ec2.create_instances(
MaxCount=num_instances,
MinCount=num_instances,
ImageId=image_id,
InstanceType=instance_type,
)
The first enhancement we would like to apply is for our training workload to automatically start as soon as our instance is up and running, without any need for manual intervention. Towards this goal, we will utilize the UserData argument of the create_instances API that enables you to specify what to run at launch. In the code block below, we propose a sequence of commands that sets up the training environment (i.e., updates the PATH environment variable to point to the prebuilt PyTorch environment included in our image), downloads our training code from Amazon S3, installs the project dependencies, runs the training script, and syncs the output artifacts to persistent S3 storage. The demonstration assumes that the training code has already been created and uploaded to the cloud and that it contains two files: a requirements file (requirements.txt) and a stand-alone training script (train.py). In practice, the precise contents of the startup sequence will depend on the project. We include a pointer to our predefined IAM instance profile which is required for accessing S3.
import boto3region = 'us-east-1'
job_id = 'my-experiment' # replace with unique id
num_instances = 1
image_id = 'ami-0240b7264c1c9e6a9' # replace with image of choice
instance_type = 'g5.xlarge' # replace with instance of choice
instance_profile_arn = 'instance-profile-arn' # replace with profile arn
ec2 = boto3.resource('ec2', region_name=region)
script = """#!/bin/bash
# environment setup
export PATH=/opt/conda/envs/pytorch/bin/python:$PATH
# download and unpack code
aws s3 cp s3://my-s3-path/my-code.tar .
tar -xvf my-code.tar
# install dependencies
python3 -m pip install -r requirements.txt
# run training workload
python3 train.py
# sync output artifacts
aws s3 sync artifacts s3://my-s3-path/artifacts
"""
instances = ec2.create_instances(
MaxCount=num_instances,
MinCount=num_instances,
ImageId=image_id,
InstanceType=instance_type,
IamInstanceProfile={'Arn':instance_profile_arn},
UserData=script
)
Note that the script above syncs the training artifacts only at the end of training. A more fault-tolerant solution would sync intermediate model checkpoints throughout the training job.
When you train using a managed service, your instances are automatically shut down as soon as your script completes to ensure that you only pay for what you need. In the code block below, we append a self-destruction command to the end of our UserData script. We do this using the AWS CLI terminate-instances command. The command requires that we know the instance-id and the hosting region of our instance which we extract from the instance metadata. Our updated script assumes that our IAM instance profile has appropriate instance-termination authorization.
script = """#!/bin/bash
# environment setup
TOKEN=$(curl -s -X PUT "http://169.254.169.254/latest/api/token" -H \
"X-aws-ec2-metadata-token-ttl-seconds: 21600")
INST_MD=http://169.254.169.254/latest/meta-data
CURL_FLAGS="-H \"X-aws-ec2-metadata-token: ${TOKEN}\" -s"
INSTANCE_ID=$(curl $CURL_FLAGS $INST_MD/instance-id)
REGION=$(curl $CURL_FLAGS $INST_MD/placement/region)
export PATH=/opt/conda/envs/pytorch/bin/python:$PATH# download and unpack code
aws s3 cp s3://my-s3-path/my-code.tar .
tar -xvf my-code.tar
# install dependencies
python3 -m pip install -r requirements.txt
# run training workload
python3 train.py
# sync output artifacts
aws s3 sync artifacts s3://my-s3-path/artifacts
# self-destruct
aws ec2 terminate-instances --instance-ids $INSTANCE_ID \
--region $REGION
"""
We highly recommend introducing additional mechanisms for verifying appropriate instance deletion to avoid the possibility of having unused (“orphan”) instances in the system racking up unnecessary costs. In a recent post we showed how serverless functions can be used to address this kind of problem.
Amazon EC2 enables you to apply custom metadata to your instance using EC2 instance tags. This enables you to pass information to the instance regarding the training workload and/or the training environment. Here, we use the TagSpecifications setting to pass in an instance name and a unique training job id. We use the unique id to define a dedicated S3 path for our job artifacts. Note that we need to explicitly enable the instance to access the metadata tags via the MetadataOptions setting.
import boto3region = 'us-east-1'
job_id = 'my-experiment' # replace with unique id
num_instances = 1
image_id = 'ami-0240b7264c1c9e6a9' # replace with image of choice
instance_type = 'g5.xlarge' # replace with instance of choice
instance_profile_arn = 'instance-profile-arn' # replace with profile arn
ec2 = boto3.resource('ec2', region_name=region)
script = """#!/bin/bash
# environment setup
TOKEN=$(curl -s -X PUT "http://169.254.169.254/latest/api/token" -H \
"X-aws-ec2-metadata-token-ttl-seconds: 21600")
INST_MD=http://169.254.169.254/latest/meta-data
CURL_FLAGS="-H \"X-aws-ec2-metadata-token: ${TOKEN}\" -s"
INSTANCE_ID=$(curl $CURL_FLAGS $INST_MD/instance-id)
REGION=$(curl $CURL_FLAGS $INST_MD/placement/region)
JOB_ID=$(curl $CURL_FLAGS $INST_MD/tags/instance/JOB_ID)
export PATH=/opt/conda/envs/pytorch/bin/python:$PATH
# download and unpack code
aws s3 cp s3://my-s3-path/$JOB_ID/my-code.tar .
tar -xvf my-code.tar
# install dependencies
python3 -m pip install -r requirements.txt
# run training workload
python3 train.py
# sync output artifacts
aws s3 sync artifacts s3://my-s3-path/$JOB_ID/artifacts
# self-destruct
aws ec2 terminate-instances --instance-ids $INSTANCE_ID \
--region $REGION
"""
instances = ec2.create_instances(
MaxCount=num_instances,
MinCount=num_instances,
ImageId=image_id,
InstanceType=instance_type,
IamInstanceProfile={'Arn':instance_profile_arn},
UserData=script,
MetadataOptions={"InstanceMetadataTags":"enabled"},
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': [
{'Key': 'NAME', 'Value': 'test-vm'},
{'Key': 'JOB_ID', 'Value': f'{job_id}'}
]
}],
)
Using metadata tags to pass information to our instances will be particularly useful in the next sections.
Naturally, we require the ability to analyze our application’s output logs both during and after training. This requires that they be periodically synced to persistent logging. In this post we implement this using Amazon CloudWatch. Below we define a minimum JSON configuration file for enabling CloudWatch log collection which we add to our source code tar-ball as cw_config.json. Please see the official documentation for details on CloudWatch setup and configuration.
{
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/output.log",
"log_group_name": "/aws/ec2/training-jobs",
"log_stream_name": "job-id"
}
]
}
}
}
}
In practice, we would like the log_stream_name to uniquely identify the training job. Towards that end, we use the sed command to replace the generic “job-id” string with the job id metadata tag from the previous section. Our enhanced script also includes the CloudWatch start up command and modifications for piping the standard output to the designated output.log defined in the CloudWatch config file.
script = """#!/bin/bash
# environment setup
TOKEN=$(curl -s -X PUT "http://169.254.169.254/latest/api/token" -H \
"X-aws-ec2-metadata-token-ttl-seconds: 21600")
INST_MD=http://169.254.169.254/latest/meta-data
CURL_FLAGS="-H \"X-aws-ec2-metadata-token: ${TOKEN}\" -s"
INSTANCE_ID=$(curl $CURL_FLAGS $INST_MD/instance-id)
REGION=$(curl $CURL_FLAGS $INST_MD/placement/region)
JOB_ID=$(curl $CURL_FLAGS $INST_MD/tags/instance/JOB_ID)
export PATH=/opt/conda/envs/pytorch/bin/python:$PATH# download and unpack code
aws s3 cp s3://my-s3-path/$JOB_ID/my-code.tar .
tar -xvf my-code.tar
# configure cloudwatch
sed -i "s/job-id/${JOB_ID}/g" cw_config.json
/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl \
-a fetch-config -m ec2 -c file:cw_config.json -s
# install dependencies
python3 -m pip install -r requirements.txt 2>&1 | tee -a output.log
# run training workload
python3 train.py 2>&1 | tee -a output.log
# sync output artifacts
aws s3 sync artifacts s3://my-s3-path/$JOB_ID/artifacts
# self-destruct
aws ec2 terminate-instances --instance-ids $INSTANCE_ID \
--region $REGION
"""
Nowadays, it is quite common for training jobs to run on multiple nodes in parallel. Modifying our instance request code to support multiple nodes is a simple matter of modifying the num_instances setting. The challenge is how to configure the environment in a manner that supports distributed training, i.e., a manner that enables — and optimizes — the transfer of data between the instances.
To minimize the network latency between the instances and maximize throughput, we add a pointer to a predefined cluster placement group in the Placement field of our ec2 instance request. The following command line demonstrates the creation of a cluster placement group.
aws ec2 create-placement-group --group-name cluster-placement-group \
--strategy cluster
For our instances to communicate with one another, they need to be aware of each other’s presence. In this post we will demonstrate a minimal environment configuration required for running data parallel training in PyTorch. For PyTorch DistributedDataParallel (DDP), each instance needs to know the IP of the master node, the master port, the total number of instances, and its serial rank amongst all of the nodes. The script below demonstrates the configuration of a data parallel training job using the environment variables MASTER_ADDR, MASTER_PORT, NUM_NODES, and NODE_RANK.
import os, ast, socket
import torch
import torch.distributed as dist
import torch.multiprocessing as mpdef mp_fn(local_rank, *args):
# discover topology settings
num_nodes = int(os.environ.get('NUM_NODES',1))
node_rank = int(os.environ.get('NODE_RANK',0))
gpus_per_node = torch.cuda.device_count()
world_size = num_nodes * gpus_per_node
node_rank = nodes.index(socket.gethostname())
global_rank = (node_rank * gpus_per_node) + local_rank
print(f'local rank {local_rank} '
f'global rank {global_rank} '
f'world size {world_size}')
# init_process_group assumes the existence of MASTER_ADDR
# and MASTER_PORT environment variables
dist.init_process_group(backend='nccl',
rank=global_rank,
world_size=world_size)
torch.cuda.set_device(local_rank)
# Add training logic
if __name__ == '__main__':
mp.spawn(mp_fn,
args=(),
nprocs=torch.cuda.device_count())
The node rank can be retrieved from the ami-launch-index. The number of nodes and the master port are known at the time of create_instances invocation and can be passed in as EC2 instance tags. However, the IP address of the master node is only determined once the master instance is created and can only be communicated to the instances following the create_instances call. In the code block below, we chose to pass the master address to each of the instances using a dedicated call to the AWS Python SDK create_tags API. We use the same call to update the name tag of each instance according to its launch-index value.
The full solution for multi-node training appears below:
import boto3region = 'us-east-1'
job_id = 'my-multinode-experiment' # replace with unique id
num_instances = 4
image_id = 'ami-0240b7264c1c9e6a9' # replace with image of choice
instance_type = 'g5.xlarge' # replace with instance of choice
instance_profile_arn = 'instance-profile-arn' # replace with profile arn
placement_group = 'cluster-placement-group' # replace with placement group
ec2 = boto3.resource('ec2', region_name=region)
script = """#!/bin/bash
# environment setup
TOKEN=$(curl -s -X PUT "http://169.254.169.254/latest/api/token" -H \
"X-aws-ec2-metadata-token-ttl-seconds: 21600")
INST_MD=http://169.254.169.254/latest/meta-data
CURL_FLAGS="-H \"X-aws-ec2-metadata-token: ${TOKEN}\" -s"
INSTANCE_ID=$(curl $CURL_FLAGS $INST_MD/instance-id)
REGION=$(curl $CURL_FLAGS $INST_MD/placement/region)
JOB_ID=$(curl $CURL_FLAGS $INST_MD/tags/instance/JOB_ID)
export NODE_RANK=$(curl $CURL_FLAGS $INST_MD/ami-launch-index)
export NUM_NODES=$(curl $CURL_FLAGS $INST_MD/NUM_NODES)
export MASTER_PORT=$(curl $CURL_FLAGS $INST_MD/tags/instance/MASTER_PORT)
export PATH=/opt/conda/envs/pytorch/bin/python:$PATH
# download and unpack code
aws s3 cp s3://my-s3-path/$JOB_ID/my-code.tar .
tar -xvf my-code.tar
# configure cloudwatch
sed -i "s/job-id/${JOB_ID}_${NODE_RANK}/g" cw_config.json
/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl \
-a fetch-config -m ec2 -c file:cw_config.json -s
# install dependencies
python3 -m pip install -r requirements.txt 2>&1 | tee -a output.log
# retrieve master address
# should be available but just in case tag application is delayed...
while true; do
export MASTER_ADDR=$(curl $CURL_FLAGS $INST_MD/tags/instance/MASTER_ADDR)
if [[ $MASTER_ADDR == "<?xml"* ]]; then
echo 'tags missing, sleep for 5 seconds' 2>&1 | tee -a output.log
sleep 5
else
break
fi
done
# run training workload
python3 train.py 2>&1 | tee -a output.log
# sync output artifacts
aws s3 sync artifacts s3://my-s3-path/$JOB_ID/artifacts
# self-destruct
aws ec2 terminate-instances --instance-ids $INSTANCE_ID \
--region $REGION
"""
instances = ec2.create_instances(
MaxCount=num_instances,
MinCount=num_instances,
ImageId=image_id,
InstanceType=instance_type,
IamInstanceProfile={'Arn':instance_profile_arn},
UserData=script,
MetadataOptions={"InstanceMetadataTags":"enabled"},
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': [
{'Key': 'NAME', 'Value': 'test-vm'},
{'Key': 'JOB_ID', 'Value': f'{job_id}'},
{'Key': 'MASTER_PORT', 'Value': '7777'},
{'Key': 'NUM_NODES', 'Value': f'{num_instances}'}
]
}],
Placement={'GroupName': placement_group}
)
if num_instances > 1:
# find master_addr
for inst in instances:
if inst.ami_launch_index == 0:
master_addr = inst.network_interfaces_attribute[0]['PrivateIpAddress']
break
# update ec2 tags
for inst in instances:
res = ec2.create_tags(
Resources=[inst.id],
Tags=[
{'Key': 'NAME', 'Value': f'test-vm-{inst.ami_launch_index}'},
{'Key': 'MASTER_ADDR', 'Value': f'{master_addr}'}]
)
A popular way of reducing training costs is to use discounted Amazon EC2 Spot Instances. Utilizing Spot instances effectively requires that you implement a way of detecting interruptions (e.g., by listening for termination notices) and taking the appropriate action (e.g., resuming incomplete workloads). Below, we show how to modify our script to use Spot instances using the InstanceMarketOptions API setting.
import boto3region = 'us-east-1'
job_id = 'my-spot-experiment' # replace with unique id
num_instances = 1
image_id = 'ami-0240b7264c1c9e6a9' # replace with image of choice
instance_type = 'g5.xlarge' # replace with instance of choice
instance_profile_arn = 'instance-profile-arn' # replace with profile arn
placement_group = 'cluster-placement-group' # replace with placement group
instances = ec2.create_instances(
MaxCount=num_instances,
MinCount=num_instances,
ImageId=image_id,
InstanceType=instance_type,
IamInstanceProfile={'Arn':instance_profile_arn},
UserData=script,
MetadataOptions={"InstanceMetadataTags":"enabled"},
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': [
{'Key': 'NAME', 'Value': 'test-vm'},
{'Key': 'JOB_ID', 'Value': f'{job_id}'},
]
}],
InstanceMarketOptions = {
'MarketType': 'spot',
'SpotOptions': {
"SpotInstanceType": "one-time",
"InstanceInterruptionBehavior": "terminate"
}
}
)
Please see our previous posts (e.g., here and here) for some ideas for how to implement a solution for Spot instance life-cycle management.
Managed cloud services for AI development can simplify model training and lower the entry bar for potential incumbents. However, there are some situations where greater control over the training process is required. In this post we have illustrated one approach to building a customized managed training environment on top of Amazon EC2. Of course, the precise details of the solution will greatly depend on the specific needs of the projects at hand.
As always, please feel free to respond to this post with comments, questions, or corrections.