AWS CDK: Deploying Python Jobs to SageMaker

No Image
23 min read

Motivation

Often, when exploring practical use cases in machine learning, we find ourselves needing to deploy Python jobs to SageMaker CPU instances. In this guide, we'll take a more structured approach using AWS CDK. Unlike other tutorials, we'll address the challenge of deploying custom libraries, such as Stanza, which can require more than a simple pip install stanza due to potential dependency issues. We'll walk through the process of submitting our job on SageMaker and monitoring its status while it runs. Leveraging AWS CDK, we'll establish a clean method for resource cleanup. Each step in this blog will be accompanied by shared code snippets. The entire process will cost approximately $1 and can be easily undone with a single command.

AWS CDK - Setup

Before proceeding, ensure you have an AWS User account. Follow these steps to set up AWS CLI (Command Line Interface):

. Install AWS CLI: Depending on your operating system, follow the installation instructions outlined here.

. Configure AWS CLI: After installation, run aws configure and input your AWS account details when prompted. This will create two files (config and credentials) in the .aws folder (for Linux OS). Your config file should resemble the following:

[default]
region = eu-north-1
output = json

For the credentials file, add your S3 access and secret keys:

[default]
aws_access_key_id = **********
aws_secret_access_key = ******************************

Retrieve these keys from your AWS account. This step ensures smooth S3 operations from the beginning of the tutorial, minimizing potential early issues.

. Install Node.js: Ensure Node.js is installed on your system. Run node --version to confirm you have version >= 10.13.0, which is required for AWS CDK.

. Install AWS CDK: Use npm (Node.js package manager) to install AWS CDK globally:

npm install -g aws-cdk

Verify the installation by running:

cdk --version

. Initialize CDK Application: Let's create our CDK application using Python. Run the following command:

cdk init lemma-app --language python

This command initializes a new CDK application named lemma-app using Python as the language.

With these preparations completed, you're ready for the next steps in deploying Python jobs to SageMaker using AWS CDK.

AWS CDK Concept

Now that AWS CDK is installed, it offers support for various programming languages to develop applications that interact with AWS services. It's particularly valuable for managing infrastructure as code, enabling easier addition, removal, updating, or deletion of AWS resources.

In our case, we'll use AWS CDK to set up the infrastructure required for our Python job. However, executing the job itself isn't treated as a resource; it's more about orchestrating resources. Therefore, we need to trigger an action to run our job. This trigger could be various events, such as uploading to S3, scheduling a time in AWS, or creating a trigger in our CI/CD pipeline when a branch is merged.

In this blog, we'll manually trigger our job using AWS CLI. We'll distinguish between two phases:

. Compile Time: Using AWS CDK to compile or create all necessary resources. . Run Time: Manually triggering our job using AWS CLI.

It's worth noting that there are numerous ways to trigger job execution, each requiring specific definitions and configurations.

Build Custom Container

Let's structure the container folder within your CDK project as described:

Create the following directory structure and files:

container
    | code
        |--- __init__.py
        |--- inference.py
    | app.py
    | Dockerfile
    | requirements.txt

Here's a breakdown of each file's content:

. code Folder: - __init__.py: This is an empty file. It signifies that the code directory is a Python module. - inference.py: Contains functions for model loading (model_fn) and prediction (predict_fn) using the Swedish stanza model.

# inference.py

import json
import logging
import os
import sys

import stanza

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))

def model_fn(model_dir):
    pipe = stanza.Pipeline(lang='sv',
                           lemma_model_path=os.path.join(model_dir, 'sv_suc_lemmatizer.pt'),
                           pos_pretrain_path=os.path.join(model_dir, 'sv_talbanken.pretrain.pt'),
                           pos_model_path=os.path.join(model_dir, 'sv_talbanken_tagger.pt'),
                           depparse_pretrain_path=os.path.join(model_dir, 'sv_talbanken.pretrain.pt'),
                           depparse_model_path=os.path.join(model_dir, 'sv_talbanken_parser.pt'),
                           depparse_max_sentence_size=200,
                           tokenize_pretokenized=True,
                           depparse_batch_size=3000,
                           pos_batch_size=3000,
                           lemma_batch_size=3000,
                           verbose=False,
                           processors='tokenize,pos,lemma,depparse')
    return pipe

def predict_fn(data, pipe):
    if isinstance(data, dict):
        data_dict = data
    elif isinstance(data, list):
        words = []
        for el in data:
            if isinstance(el, str):
                words.append(json.loads(el)["ner_sentences"]["word"])
            else:
                for each_sentence in eval(json.loads(el["ner_sentences"])):
                    words.append(each_sentence["word"])
        data_dict = {
            "words": words
        }
    else:
        data_dict = json.loads(data)

    doc = pipe(' '.join(data_dict["words"]))
    lemmas = [word.lemma for t in doc.iter_tokens() for word in t.words]

    if isinstance(data, list):
        return [{"lemma_words": json.dumps(str(lem))} for lem in lemmas]

    return {"lemma_words": json.dumps(str(lemmas))}

. app.py: - This file sets up a Flask web service with two endpoints (/ping and /invocations) for SageMaker.

# app.py

import os
from flask import Flask, request
from werkzeug.middleware.proxy_fix import ProxyFix
from code.inference import model_fn, predict_fn

app = Flask(__name__)

# Load the model by reading the `SM_MODEL_DIR` environment variable
model = model_fn(os.environ["SM_MODEL_DIR"])

# Fix for running behind a proxy
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)

@app.route("/ping", methods=["GET"])
def ping():
    return "pong"

@app.route("/invocations", methods=["POST"])
def invocations():
    if request.headers.get('Content-Type') == 'application/json':
        body = request.json
    else:
        body = request.get_data().decode('utf-8').strip().split('\n')
    return predict_fn(body, model)

. Dockerfile: - This Dockerfile defines the environment and sets up the application.

# Dockerfile

FROM python:3.10.4

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

ENV SM_MODEL_DIR /opt/ml/model
ENV PYTHONUNBUFFERED 1

ENTRYPOINT ["gunicorn", "-b", "0.0.0.0:8080", "app:app", "-n"]

. requirements.txt: - List of Python packages required for your application.

stanza==1.3.1
Flask==2.0.2
gunicorn==20.1.0

Ensure these files are properly structured within the container folder of your CDK project. This setup prepares your Docker container to serve the Flask application with the specified endpoints (/ping and /invocations) required by SageMaker for model hosting.

CDK Stack

After running cdk init, you'll notice the LemmaStack class which represents your CDK stack. To prepare for the rest of the tutorial, import the necessary AWS modules at the beginning of your app_stack.py file:

import datetime

from aws_cdk import (
    aws_lambda,
    aws_ecr_assets,
    aws_s3,
    aws_iam,
    aws_ec2,
    aws_s3_deployment,
    aws_sagemaker,
    aws_stepfunctions_tasks as tasks,
    aws_stepfunctions as sfn,
    Stack,
    RemovalPolicy,
    Duration,
    CfnOutput
)
from constructs import Construct

To automate the Docker build and push process using AWS CDK, add the following code snippet inside the __init__ method of your LemmaStack class. This snippet defines an ECR asset that builds the Docker image from the container directory and pushes it to the Amazon ECR repository:

# ECR asset for our Docker image
asset = aws_ecr_assets.DockerImageAsset(
    self, "LemmaImage", directory="./container"
)

After adding this snippet to your stack class, when you run cdk deploy, AWS CDK will automatically build the Docker image defined in the container directory and push it to your specified Amazon ECR repository (LemmaImage in this case). This ensures that your custom Docker image is ready to be used for deploying your SageMaker endpoint.

Working With S3

In the given code snippet, we assume that there is an existing S3 bucket in your AWS region named bucket_name, and we want to use this bucket to upload the stanza model with the specified prefix s3_task_prefix. Since we are dealing with large files for uploading, we configure a separate VPC (vpc) with increased memory limit for efficient operations.

Here's a breakdown of the code and its purpose within your CDK stack:

. Import Existing S3 Bucket: We import the existing S3 bucket named bucket_name into our CDK stack using aws_s3.Bucket.from_bucket_name.

. Configure VPC for Large Asset Uploading: We create a new VPC (vpc) to facilitate the uploading of large assets (use_efs=True).

. Bucket Deployment: The BucketDeployment resource (models_deployment) uploads the contents of the stanza_models directory to the specified S3 bucket (models_bucket) with the designated prefix (s3_task_prefix).

. Memory Limit and Removal Policy: We increase the memory limit (memory_limit=1024) to accommodate large file transfers. The retain_on_delete=False setting ensures that the uploaded contents are not retained when the stack is deleted.

Here's the adjusted code snippet for your CDK stack:

# Importing existing S3 bucket
models_bucket = aws_s3.Bucket.from_bucket_name(self, "existingBucket", bucket_name)

# Create a VPC for large asset uploading
vpc = aws_ec2.Vpc(
    self,
    "Vpc",
    ip_addresses=aws_ec2.IpAddresses.cidr("10.0.0.0/16")
)
vpc.apply_removal_policy(RemovalPolicy.DESTROY)

# Deployment that uploads contents of stanza_models directory to the S3 bucket
models_deployment = aws_s3_deployment.BucketDeployment(
    self,
    "Models",
    sources=[aws_s3_deployment.Source.asset("./stanza_models")],
    destination_bucket=models_bucket,
    destination_key_prefix=f"{s3_task_prefix}/models",
    use_efs=True,
    vpc=vpc,
    memory_limit=1024,
    retain_on_delete=False
)

In this setup, the BucketDeployment ensures that the contents of the stanza_models directory are efficiently uploaded to the specified S3 bucket (models_bucket) under the specified prefix (s3_task_prefix/models). The use of a separate VPC and increased memory limit (memory_limit=1024) optimizes the process for handling large assets during deployment.

Additionally, setting retain_on_delete=False ensures that the deployed contents are removed when the stack is deleted, preventing orphaned resources and potential costs associated with them. This approach enables robust management of your deployment lifecycle within AWS CDK.

Sagemaker Model

To integrate your Python job with Amazon SageMaker, you'll need to create a SageMaker model and define an IAM role that grants the necessary permissions for model deployment and accessing S3 resources. Let's use AWS CDK to set up these components in your stack.

. Create IAM Role for SageMaker: Define an IAM role (sagemaker_role) that SageMaker will use to deploy models and access S3 resources:

# Role used by SageMaker to deploy models from S3
sagemaker_role = aws_iam.Role(
    self,
    "automodeldeploy-sagemaker-role",
    assumed_by=aws_iam.ServicePrincipal("sagemaker.amazonaws.com"),
)
sagemaker_role.add_managed_policy(
    aws_iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSageMakerFullAccess")
)
sagemaker_role.add_to_policy(
    aws_iam.PolicyStatement(
        resources=["arn:aws:s3:::*"],
        actions=[
            "s3:GetObject",
            "s3:PutObject",
            "s3:DeleteObject",
            "s3:ListBucket",
        ],
    )
)

This IAM role (sagemaker_role) grants SageMaker permissions to perform necessary actions on S3 buckets.

. Define SageMaker Model: Use the CfnModel class to define the SageMaker model, specifying the Docker image URI and the location of the model data in S3:

# SageMaker Model definition
cfn_model = aws_sagemaker.CfnModel(
    self,
    "LemmaModel",
    execution_role_arn=sagemaker_role.role_arn,
    containers=[
        aws_sagemaker.CfnModel.ContainerDefinitionProperty(
            image=asset.image_uri,
            mode="SingleModel",
            model_data_url=f"s3://{bucket_name}/{s3_task_prefix}/models/model.tar.gz",
        )
    ],
    enable_network_isolation=False,
)

In this code: - asset.image_uri points to the Docker image URI created earlier using aws_ecr_assets.DockerImageAsset. - model_data_url specifies the S3 location (s3://{bucket_name}/{s3_task_prefix}/models/model.tar.gz) where the model data is stored.

. Define Dependencies: Ensure that the creation of the SageMaker model (CfnModel) depends on the IAM role and the S3 bucket deployment:

# Add dependencies to ensure proper order of resource creation
cfn_model.node.add_dependency(sagemaker_role)
cfn_model.node.add_dependency(models_deployment)

By setting these dependencies, AWS CDK will ensure that the IAM role and S3 bucket deployment are created before attempting to create the SageMaker model, preventing issues like "Not Found Error" due to resource creation order.

. Cleanup Policy: Apply a removal policy (RemovalPolicy.DESTROY) to the SageMaker model resource for proper cleanup on stack deletion:

# Apply removal policy to clean up the model on stack deletion
cfn_model.apply_removal_policy(RemovalPolicy.DESTROY)

This removal policy ensures that the SageMaker model resource is destroyed when the stack is deleted, avoiding orphaned resources and potential costs.

By implementing these steps within your CDK stack, you'll have a SageMaker model set up and integrated with the necessary IAM role and S3 resources, ready for deployment and execution of your Python job within the AWS ecosystem.

Batch Transform Job

To create a batch transform job in AWS Step Functions using AWS CDK, you can use the SageMakerCreateTransformJob class from aws_stepfunctions_tasks. Since CDK doesn't allow resources with duplicate names, we'll dynamically generate a unique name for the transform job using a timestamp. Additionally, we'll apply a removal policy (RemovalPolicy.DESTROY) to ensure proper cleanup of old resources when updating the stack.

Here's how you can define and configure the batch transform job within your CDK stack:

import datetime
from aws_cdk import aws_ec2, aws_stepfunctions_tasks as tasks, Duration, RemovalPolicy

# Generate a timestamp for unique job name
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
transform_job_name = f"LemmaTransformJob-{timestamp}"

# Define the SageMaker batch transform task
lemma_task = tasks.SageMakerCreateTransformJob(
    self,
    "BatchInference",
    transform_job_name=transform_job_name,
    model_name=cfn_model.attr_model_name,
    model_client_options=tasks.ModelClientOptions(
        invocations_max_retries=3,
        invocations_timeout=Duration.minutes(5)
    ),
    transform_input=tasks.TransformInput(
        transform_data_source=tasks.TransformDataSource(
            s3_data_source=tasks.TransformS3DataSource(
                s3_uri=f"s3://{bucket_name}/{s3_input_prefix}",
                s3_data_type=tasks.S3DataType.S3_PREFIX
            )
        ),
        split_type=tasks.SplitType.LINE,
        content_type="application/json",
    ),
    transform_output=tasks.TransformOutput(
        s3_output_path=f"s3://{bucket_name}/{s3_output_prefix}",
        assemble_with=tasks.AssembleWith.LINE
    ),
    transform_resources=tasks.TransformResources(
        instance_count=1,
        instance_type=aws_ec2.InstanceType.of(
            aws_ec2.InstanceClass.C5,
            aws_ec2.InstanceSize.XLARGE
        )
    )
)

# Apply removal policy to clean up the transform job on stack deletion
lemma_task.apply_removal_policy(RemovalPolicy.DESTROY)

In this code snippet: - timestamp is generated using datetime.datetime.now().strftime("%Y%m%d%H%M%S") to create a unique transform job name (transform_job_name) based on the current timestamp. - The SageMakerCreateTransformJob task (lemma_task) is defined with various configurations: - transform_job_name: Unique name for the transform job. - model_name: The name of the SageMaker model to be used for inference (cfn_model.attr_model_name). - model_client_options: Options for invoking the model, specifying retries and timeout. - transform_input: Configuration for the input data source, specifying the S3 input location and content type. - transform_output: Configuration for the output data destination, specifying the S3 output path. - transform_resources: Resource configuration for the batch transform job, specifying instance count and type.

  • apply_removal_policy(RemovalPolicy.DESTROY) is used to apply a removal policy to the transform job task, ensuring that the job is properly cleaned up (update: delete -> create) when the stack is updated or deleted.

By using this approach, you can dynamically create and manage SageMaker batch transform jobs within your AWS CDK stack, ensuring efficient and reliable deployment and cleanup of resources. Adjust the configurations as needed based on your specific use case and requirements.

Job Lifecycle - AWS State Machine

To manage different states of a batch transform job (lemma_task) and transition between tasks based on job status, you can define a state machine using AWS Step Functions (sfn.StateMachine) within your CDK stack. This state machine will orchestrate the workflow, including waiting for job completion, checking status, and performing subsequent actions based on the job status.

Here's how you can define and configure the state machine to manage the batch transform job state:

from aws_cdk import aws_stepfunctions as sfn, Duration
from aws_cdk.aws_stepfunctions import tasks

# Define a wait task to wait for 30 seconds
wait_task = sfn.Wait(
    self, "Wait30Seconds",
    time=sfn.WaitTime.duration(Duration.seconds(30))
)

# Define the state machine chain of tasks
chain_definition = lemma_task \
    .next(wait_task) \
    .next(status_task) \
    .next(
        sfn.Choice(self, 'JobComplete?')
            .when(sfn.Condition.string_equals('$.status', 'Failed'), fail_task)
            .when(sfn.Condition.string_equals('$.status', 'Stopping'), fail_task)
            .when(sfn.Condition.string_equals('$.status', 'Stopped'), fail_task)
            .when(sfn.Condition.string_equals('$.status', 'Completed'), delete_stack_task)
            .otherwise(wait_task)
    )

# Create the state machine
lemma_state_machine = sfn.StateMachine(
    self, "LemmaStateMachine",
    definition=chain_definition,
    timeout=Duration.minutes(60)
)

In this code snippet: - We define a wait_task using sfn.Wait to wait for 30 seconds (Duration.seconds(30)). - The state machine (lemma_state_machine) is defined with a chain of tasks (chain_definition): - lemma_task: Represents the batch transform job. - wait_task: Waits for 30 seconds after the job completes. - status_task: Checks the status of the batch transform job. - sfn.Choice: Branches based on the job status using conditions (sfn.Condition.string_equals). - If the job status is 'Failed', 'Stopping', or 'Stopped', transition to fail_task. - If the job status is 'Completed', transition to delete_stack_task. - Otherwise, go back to wait_task to check again.

  • The lemma_state_machine is created using sfn.StateMachine, specifying the chain_definition as the state machine definition and setting a timeout of Duration.minutes(60) (1 hour) for the state machine execution.

By defining this state machine in AWS CDK, you can orchestrate and automate the workflow of managing batch transform job states, including handling different job outcomes based on status checks. Adjust the tasks and conditions as needed to suit your specific use case and desired workflow logic. This state machine approach helps in managing and automating complex workflows within AWS Step Functions.

To trigger the execution of your state machine using AWS CLI after deploying your infrastructure with AWS CDK, you can save the State Machine ARN as an output and then use it to start the execution via AWS CLI commands. Here's how you can achieve this:

. Save State Machine ARN as Output: Use CfnOutput to save the State Machine ARN as an output during CDK deployment:

from aws_cdk import core

# Save State Machine ARN as output
core.CfnOutput(
    self,
    "LemmaStateMachineARN",
    value=lemma_state_machine.state_machine_arn,
    description="ARN of the Lemma State Machine"
)

. Deploy and Retrieve ARN: Deploy your CDK stack and retrieve the State Machine ARN from the deployment output.

cdk deploy --outputs-file "output.json"

In the generated output.json, you will find the ARN of your State Machine:

{
  "LemmaOnSkillsStack": {
    "LemmaStateMachineARN": "arn:aws:states:eu-north-1:*****************"
  }
}

. Trigger State Machine Execution with AWS CLI: Use the AWS CLI command to start the execution of your State Machine by providing the State Machine ARN ($lemma_state_machine_arn):

aws stepfunctions start-execution --state-machine-arn arn:aws:states:eu-north-1:*****************

Replace arn:aws:states:eu-north-1:***************** with the actual State Machine ARN retrieved from the output file (output.json).

By following these steps, you can trigger the execution of your AWS Step Functions State Machine programmatically using AWS CLI after deploying your infrastructure with AWS CDK. This allows you to automate and orchestrate the workflow defined in your State Machine within your AWS environment. Adjust the commands and ARN values as needed based on your specific deployment and execution requirements.

Conclusion

If you're accustomed to managing your AWS resources through the AWS Management Console, transitioning to AWS CDK provides a powerful alternative for defining and managing your infrastructure as code. With AWS CDK, you can describe complex architectures, set dependencies between resources, specify removal policies, and orchestrate deployments programmatically. Let's explore why AWS CDK is a compelling choice for managing and running resources:

  1. Infrastructure as Code (IaC): AWS CDK enables you to define your infrastructure using familiar programming languages like Python, TypeScript, or Java. This approach allows you to leverage software development best practices, including version control, testing, and collaboration.

  2. Complex Infrastructure Definitions: You can define intricate architectures using AWS CDK's constructs. This includes defining dependencies between resources, ensuring resources are created and destroyed in the correct order, and managing resource configurations programmatically.

  3. Resource Management: With AWS CDK, you have granular control over resource lifecycle management. You can specify removal policies (RemovalPolicy) to define how resources behave when your stack is deleted, ensuring proper cleanup and minimizing orphaned resources.

  4. Access to AWS Services: AWS CDK provides access to the entire AWS ecosystem of services. You can define and configure resources ranging from basic compute instances and storage to advanced machine learning services and serverless architectures.

  5. Custom Tasks and Containers: You can define custom tasks by leveraging Docker containers within your CDK stack. This allows you to encapsulate specific functionality or workloads in custom containers and integrate them seamlessly into your infrastructure definitions.

  6. Automation and CLI Integration: Once your infrastructure is defined using AWS CDK, you can use the AWS CLI to interact with and manage your resources. For example, you can trigger jobs, start executions, or perform other operational tasks programmatically using the CLI, leveraging the resource IDs saved during deployment.

By embracing AWS CDK, you move towards a more scalable, repeatable, and manageable approach to infrastructure management. This shift empowers you to treat your infrastructure as code, leading to enhanced automation, reproducibility, and collaboration across your development and operations teams. If you're ready to explore the benefits of AWS CDK further, consider diving into the comprehensive documentation and resources provided by AWS to get started with infrastructure as code using CDK.