Azure and AWS Machine Learning Pipeline

Automate machine learning training with reusable steps

Xin Cheng
6 min readDec 16, 2023

You can create a GPU machine, install dependencies, run Python code train a model. However, if you need to do this again and again, you need to use some workflow orchestration software to automate these steps.

In App/Data world, these repeated stuffs are usually called workflow. In ML, they are usually called pipeline.

https://www.kubeflow.org/docs/components/pipelines/v2/introduction/

A pipeline is a definition of a workflow that composes one or more components together to form a computational directed acyclic graph (DAG). At runtime, each component execution corresponds to a single container execution, which may create ML artifacts.

Common elements in ML pipeline

Generally machine learning pipeline needs to support the following:

Data: tabular data, file (text, image, audio, video, etc.)

compute: CPU, GPU, xPU, are they in distributed fashion?

environment: nowadays containers are used to provide packaged environment, to reproduce the environment needed for ML model training, it needs to support

various base container image supporting ML framework (PyTorch, TensorFlow, SKLearn, XGBoost)

dependencies: model code may require additional dependency not in base container image

model code (data preprocessing, training, hyperparameter tuning, experiment tracking)

Model: machine learning model usually takes long time to train, in order to reproduce, you need to track experiments, save model artifact and supporting artifact (metrics, metric chart, etc.)

Multiple steps communication

Ordering and dependency of steps: if the workflow contains preprocessing (e.g. data transformation, split into train, validation, test dataset), training, evaluation. You can want to preprocessing, training, evaluation, and finally register model into model catalog.

Data pass between steps (input, output): Training step input could be dependent on the preprocessing step output, while evaluation step input is the model output from training step and test dataset output from preprocessing step.

Control logic: conditional execution of certain steps.

Let’s review how these patterns look like in Azure ML and Amazon Sagemaker

Azure ML

credit default prediction

Data asset

Azure ML has concept of data asset, the following code can be used to retrieve data asset (which can be file, folder, tabular; can be in Azure blob storage, Azure data lake store), which can be versioned

# get a handle of the data asset and print the URI
credit_data = ml_client.data.get(name="credit-card", version="initial")
print(f"Data asset URI: {credit_data.path}")

Environment and dependency

Azure has concept of environment which maps to container image and can use conda.yaml to install additional Python package.

A dedicated environment construct can be used to define a versioned execution environment.

Environment definition

from azure.ai.ml.entities import Environment

custom_env_name = "aml-scikit-learn"

pipeline_job_env = Environment(
name=custom_env_name,
description="Custom environment for Credit Card Defaults pipeline",
tags={"scikit-learn": "0.24.2"},
conda_file=os.path.join(dependencies_dir, "conda.yaml"),
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
version="0.2.0",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

conda.yaml

%%writefile {dependencies_dir}/conda.yaml
name: model-env
channels:
- conda-forge
dependencies:
- python=3.8
- numpy=1.21.2
- pip=21.2.4
- scikit-learn=0.24.2
- scipy=1.7.1
- pandas>=1.1,<1.2
- pip:
- inference-schema[numpy-support]==1.3.0
- xlrd==2.0.1
- mlflow== 2.4.1
- azureml-mlflow==1.51.0

Script to execute

The most basic way is to use command object and use “code”, “command” to tell Azure ML pipeline which local code needs to be executed

from azure.ai.ml import command
from azure.ai.ml import Input, Output

data_prep_component = command(
name="data_prep_credit_defaults",
display_name="Data preparation for training",
description="reads a .xl input, split the input to train and test",
inputs={
"data": Input(type="uri_folder"),
"test_train_ratio": Input(type="number"),
},
outputs=dict(
train_data=Output(type="uri_folder", mode="rw_mount"),
test_data=Output(type="uri_folder", mode="rw_mount"),
),
# The source folder of the component
code=data_prep_src_dir,
command="""python data_prep.py \
--data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}} \
--train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}} \
""",
environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

Input and Output

The above code defines inputs and outputs type, but code below does the real mapping. You can see credit_data data asset is mapped to train_data component (or step), other parameter values are also passed in

# the dsl decorator tells the sdk that we are defining an Azure Machine Learning pipeline
from azure.ai.ml import dsl, Input, Output


@dsl.pipeline(
compute="serverless", # "serverless" value runs pipeline on serverless compute
description="E2E data_perp-train pipeline",
)
def credit_defaults_pipeline(
pipeline_job_data_input,
pipeline_job_test_train_ratio,
pipeline_job_learning_rate,
pipeline_job_registered_model_name,
):
# using data_prep_function like a python call with its own inputs
data_prep_job = data_prep_component(
data=pipeline_job_data_input,
test_train_ratio=pipeline_job_test_train_ratio,
)

# using train_func like a python call with its own inputs
train_job = train_component(
train_data=data_prep_job.outputs.train_data, # note: using outputs from previous step
test_data=data_prep_job.outputs.test_data, # note: using outputs from previous step
learning_rate=pipeline_job_learning_rate, # note: using a pipeline input as parameter
registered_model_name=pipeline_job_registered_model_name,
)

# a pipeline returns a dictionary of outputs
# keys will code for the pipeline output identifier
return {
"pipeline_job_train_data": data_prep_job.outputs.train_data,
"pipeline_job_test_data": data_prep_job.outputs.test_data,
}

registered_model_name = "credit_defaults_model"

# Let's instantiate the pipeline with the parameters of our choice
pipeline = credit_defaults_pipeline(
pipeline_job_data_input=Input(type="uri_file", path=credit_data.path),
pipeline_job_test_train_ratio=0.25,
pipeline_job_learning_rate=0.05,
pipeline_job_registered_model_name=registered_model_name,
)

Azure ML uses @dsl.pipeline Python attribute to create pipeline including data_prep_job and train_job. Since train_job inputs has explicit dependency on data_prep_job.outputs, Azure ML will execute these steps sequentially and make sure the outputs are correctly passed to corresponding inputs.

You can imagine, After pipeline execution, the code is executed in remote container, and executes sth like “python data_prep.py — data $<azure_ml_dir>/credits.csv — test_train_ratio 0.25”

This is another article on creating Azure ML pipeline. It is using file on Azure blob storage as input

from azure.ai.ml import Input

fashion_ds = Input(
path="wasbs://demo@data4mldemo6150520719.blob.core.windows.net/mnist-fashion/"
)

A job has a type. Most jobs are command jobs that run a command, like python main.py. What runs in a job is agnostic to any programming language, so you can run bash scripts, invoke python interpreters, run a bunch of curl commands, or anything else.

Azure ML get started

Amazon Sagemaker

Data

Sagemaker does not have data asset concept, you just pass S3 location to the pipeline step

Environment

There is no environment concept, and no conda.yaml. In order to customize container image

  1. You need to mount requirements.txt as code into container. In Python code, use “subprocess.check_call” to execute “pip install -r requirements.txt”
  2. If contain environment needs further customization, use separate tool to build container image and push to ECR

Generally you use task-specific construct to define container image to execute the task, e.g.

from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type="ml.m5.xlarge",
instance_count=processing_instance_count,
base_job_name="sklearn-abalone-process",
role=role,
)
from sagemaker.estimator import Estimator


image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type="ml.m5.xlarge"
)
xgb_train = Estimator(
image_uri=image_uri,
instance_type="ml.m5.xlarge",
instance_count=1,
output_path=model_path,
role=role,
)

Job/step definition

Sagemaker has separate construct to define task (which selects container image) and step (which defines input/output, entry code)

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_process = ProcessingStep(
name="AbaloneProcess",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
],
code="abalone/preprocessing.py",
)

and

from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
name="AbaloneTrain",
estimator=xgb_train,
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv"
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv"
)
},
)

The step dependency is also set when there is relation between step 2 input and step 1 output

Pipeline definition

Use Sagemaker Python SDK to create pipeline

from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_type,
processing_instance_count,
training_instance_type,
model_approval_status,
input_data,
batch_data,
],
steps=[step_process, step_train, step_eval, step_cond],
)

Appendix

--

--

Xin Cheng
Xin Cheng

Written by Xin Cheng

Multi/Hybrid-cloud, Kubernetes, cloud-native, big data, machine learning, IoT developer/architect, 3x Azure-certified, 3x AWS-certified, 2x GCP-certified