Azure and AWS Machine Learning Pipeline
You can create a GPU machine, install dependencies, run Python code train a model. However, if you need to do this again and again, you need to use some workflow orchestration software to automate these steps.
In App/Data world, these repeated stuffs are usually called workflow. In ML, they are usually called pipeline.
https://www.kubeflow.org/docs/components/pipelines/v2/introduction/
A pipeline is a definition of a workflow that composes one or more components together to form a computational directed acyclic graph (DAG). At runtime, each component execution corresponds to a single container execution, which may create ML artifacts.
Common elements in ML pipeline
Generally machine learning pipeline needs to support the following:
Data: tabular data, file (text, image, audio, video, etc.)
compute: CPU, GPU, xPU, are they in distributed fashion?
environment: nowadays containers are used to provide packaged environment, to reproduce the environment needed for ML model training, it needs to support
various base container image supporting ML framework (PyTorch, TensorFlow, SKLearn, XGBoost)
dependencies: model code may require additional dependency not in base container image
model code (data preprocessing, training, hyperparameter tuning, experiment tracking)
Model: machine learning model usually takes long time to train, in order to reproduce, you need to track experiments, save model artifact and supporting artifact (metrics, metric chart, etc.)
Multiple steps communication
Ordering and dependency of steps: if the workflow contains preprocessing (e.g. data transformation, split into train, validation, test dataset), training, evaluation. You can want to preprocessing, training, evaluation, and finally register model into model catalog.
Data pass between steps (input, output): Training step input could be dependent on the preprocessing step output, while evaluation step input is the model output from training step and test dataset output from preprocessing step.
Control logic: conditional execution of certain steps.
Let’s review how these patterns look like in Azure ML and Amazon Sagemaker
Azure ML
credit default prediction
Data asset
Azure ML has concept of data asset, the following code can be used to retrieve data asset (which can be file, folder, tabular; can be in Azure blob storage, Azure data lake store), which can be versioned
# get a handle of the data asset and print the URI
credit_data = ml_client.data.get(name="credit-card", version="initial")
print(f"Data asset URI: {credit_data.path}")
Environment and dependency
Azure has concept of environment which maps to container image and can use conda.yaml to install additional Python package.
A dedicated environment construct can be used to define a versioned execution environment.
Environment definition
from azure.ai.ml.entities import Environment
custom_env_name = "aml-scikit-learn"
pipeline_job_env = Environment(
name=custom_env_name,
description="Custom environment for Credit Card Defaults pipeline",
tags={"scikit-learn": "0.24.2"},
conda_file=os.path.join(dependencies_dir, "conda.yaml"),
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
version="0.2.0",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)
print(
f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)
conda.yaml
%%writefile {dependencies_dir}/conda.yaml
name: model-env
channels:
- conda-forge
dependencies:
- python=3.8
- numpy=1.21.2
- pip=21.2.4
- scikit-learn=0.24.2
- scipy=1.7.1
- pandas>=1.1,<1.2
- pip:
- inference-schema[numpy-support]==1.3.0
- xlrd==2.0.1
- mlflow== 2.4.1
- azureml-mlflow==1.51.0
Script to execute
The most basic way is to use command object and use “code”, “command” to tell Azure ML pipeline which local code needs to be executed
from azure.ai.ml import command
from azure.ai.ml import Input, Output
data_prep_component = command(
name="data_prep_credit_defaults",
display_name="Data preparation for training",
description="reads a .xl input, split the input to train and test",
inputs={
"data": Input(type="uri_folder"),
"test_train_ratio": Input(type="number"),
},
outputs=dict(
train_data=Output(type="uri_folder", mode="rw_mount"),
test_data=Output(type="uri_folder", mode="rw_mount"),
),
# The source folder of the component
code=data_prep_src_dir,
command="""python data_prep.py \
--data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}} \
--train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}} \
""",
environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)
Input and Output
The above code defines inputs and outputs type, but code below does the real mapping. You can see credit_data data asset is mapped to train_data component (or step), other parameter values are also passed in
# the dsl decorator tells the sdk that we are defining an Azure Machine Learning pipeline
from azure.ai.ml import dsl, Input, Output
@dsl.pipeline(
compute="serverless", # "serverless" value runs pipeline on serverless compute
description="E2E data_perp-train pipeline",
)
def credit_defaults_pipeline(
pipeline_job_data_input,
pipeline_job_test_train_ratio,
pipeline_job_learning_rate,
pipeline_job_registered_model_name,
):
# using data_prep_function like a python call with its own inputs
data_prep_job = data_prep_component(
data=pipeline_job_data_input,
test_train_ratio=pipeline_job_test_train_ratio,
)
# using train_func like a python call with its own inputs
train_job = train_component(
train_data=data_prep_job.outputs.train_data, # note: using outputs from previous step
test_data=data_prep_job.outputs.test_data, # note: using outputs from previous step
learning_rate=pipeline_job_learning_rate, # note: using a pipeline input as parameter
registered_model_name=pipeline_job_registered_model_name,
)
# a pipeline returns a dictionary of outputs
# keys will code for the pipeline output identifier
return {
"pipeline_job_train_data": data_prep_job.outputs.train_data,
"pipeline_job_test_data": data_prep_job.outputs.test_data,
}
registered_model_name = "credit_defaults_model"
# Let's instantiate the pipeline with the parameters of our choice
pipeline = credit_defaults_pipeline(
pipeline_job_data_input=Input(type="uri_file", path=credit_data.path),
pipeline_job_test_train_ratio=0.25,
pipeline_job_learning_rate=0.05,
pipeline_job_registered_model_name=registered_model_name,
)
Azure ML uses @dsl.pipeline Python attribute to create pipeline including data_prep_job and train_job. Since train_job inputs has explicit dependency on data_prep_job.outputs, Azure ML will execute these steps sequentially and make sure the outputs are correctly passed to corresponding inputs.
You can imagine, After pipeline execution, the code is executed in remote container, and executes sth like “python data_prep.py — data $<azure_ml_dir>/credits.csv — test_train_ratio 0.25”
This is another article on creating Azure ML pipeline. It is using file on Azure blob storage as input
from azure.ai.ml import Input
fashion_ds = Input(
path="wasbs://demo@data4mldemo6150520719.blob.core.windows.net/mnist-fashion/"
)
A job has a type. Most jobs are command jobs that run a command, like python main.py. What runs in a job is agnostic to any programming language, so you can run bash scripts, invoke python interpreters, run a bunch of curl commands, or anything else.
Azure ML get started
Amazon Sagemaker
Data
Sagemaker does not have data asset concept, you just pass S3 location to the pipeline step
Environment
There is no environment concept, and no conda.yaml. In order to customize container image
- You need to mount requirements.txt as code into container. In Python code, use “subprocess.check_call” to execute “pip install -r requirements.txt”
- If contain environment needs further customization, use separate tool to build container image and push to ECR
Generally you use task-specific construct to define container image to execute the task, e.g.
from sagemaker.sklearn.processing import SKLearnProcessor
framework_version = "0.23-1"
sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type="ml.m5.xlarge",
instance_count=processing_instance_count,
base_job_name="sklearn-abalone-process",
role=role,
)
from sagemaker.estimator import Estimator
image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type="ml.m5.xlarge"
)
xgb_train = Estimator(
image_uri=image_uri,
instance_type="ml.m5.xlarge",
instance_count=1,
output_path=model_path,
role=role,
)
Job/step definition
Sagemaker has separate construct to define task (which selects container image) and step (which defines input/output, entry code)
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
step_process = ProcessingStep(
name="AbaloneProcess",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
],
code="abalone/preprocessing.py",
)
and
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
name="AbaloneTrain",
estimator=xgb_train,
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv"
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv"
)
},
)
The step dependency is also set when there is relation between step 2 input and step 1 output
Pipeline definition
Use Sagemaker Python SDK to create pipeline
from sagemaker.workflow.pipeline import Pipeline
pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_type,
processing_instance_count,
training_instance_type,
model_approval_status,
input_data,
batch_data,
],
steps=[step_process, step_train, step_eval, step_cond],
)