AWS - Spark job on EMR

No Image
12 min read

Motivation

Mostly, it's essential to facilitate working with Spark for creating jobs over your data whenever processing is required. In this guide, we'll walk through the creation of a Spark job and its submission on AWS EMR (serverless). We'll explain how to include your Python dependencies and submit the entire package as a unified entity to EMR for execution. Additionally, we'll encapsulate all procedures within a bash script, providing you with control over the entire process. This approach allows for more manageable steps and facilitates rerunning the job with different datasets.

Input Variable

Here we assume the following variables and will work with them. It's recommended to set them up using your AWS account settings to proceed.

export EXE_ROLE="YOUR_ROLE"
export S3_BUCKET_NAME="YOUR_BUCKET"
export ACCOUNT_ID="YOUR_ACCOUNT"
export REGION="YOUR_REGION"
export REPO_NAME="YOUR_REPO"
export TASK_NAME="YOUR_TASK"
export INPUT_DATA="PATH_TO_DATA"
export OUTPUT_DATA="PATH_TO_SAVE"

Build PySpark Dependencies

Alright! Let's create a dependencies tar file. Let's assume we have a requirements.txt file like the following:

polyglot
PyICU
pycld2
morfessor
regex
six

The general procedure is that every PySpark project needs additional Python packages like those listed in requirements.txt. Based on these packages, you may also need to ensure certain binary packages are installed beforehand to facilitate the installation of Python packages via pip. For example, with the list provided above, we should ensure the following binary packages are installed:

gcc-c++ gcc python3 python3-devel icu libicu libicu-devel

Here's the modified Dockerfile-create-env file:

# Use the base image suggested for EMR 7.0.0
FROM --platform=linux/amd64 public.ecr.aws/amazonlinux/amazonlinux:2023-minimal AS base

# Install necessary binary packages
RUN yum install -y gcc-c++ gcc python3 python3-devel icu libicu libicu-devel

# Set up virtual environment
ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# Copy requirements file
COPY requirements.txt .

# Install dependencies
RUN python3 -m pip install --upgrade pip && \
    python3 -m pip install venv-pack==0.2.0 && \
    python3 -m pip install -r requirements.txt

# Create tar file with dependencies
RUN mkdir /output && venv-pack -o "/output/pyspark_venv.tar.gz"

# Export the tar file
FROM scratch AS export
COPY --from=base "/output/pyspark_venv.tar.gz" /

This modification ensures clarity and accuracy in setting up the Dockerfile for creating the environment and generating the dependencies tar file. This Dockerfile is used to build a Docker image for packaging a Python environment with specific dependencies. Let's break down the different sections and commands:

  1. FROM --platform=linux/amd64 public.ecr.aws/amazonlinux/amazonlinux:2023-minimal AS base: This line sets the base image for the Dockerfile. It specifies an Amazon Linux image tagged as 2023-minimal for the linux/amd64 platform. The image is named base for reference in subsequent commands.

  2. RUN dnf install -y gcc-c++ gcc python3 python3-devel icu libicu libicu-devel: This command installs various dependencies using the dnf package manager. These dependencies include C/C++ compilers (gcc-c++ and gcc), Python 3 (python3), Python development headers (python3-devel), and ICU libraries and headers (icu, libicu, libicu-devel).

  3. ENV VIRTUAL_ENV=/opt/venv: This sets an environment variable VIRTUAL_ENV to /opt/venv.

  4. RUN python3 -m venv $VIRTUAL_ENV: This creates a Python virtual environment at the location specified by VIRTUAL_ENV (/opt/venv).

  5. ENV PATH="$VIRTUAL_ENV/bin:$PATH": This adds the virtual environment's binary directory to the PATH environment variable. This ensures that commands installed within the virtual environment can be executed directly.

  6. COPY requirements.txt .: This copies a file named requirements.txt from the Docker build context into the current directory in the Docker image.

  7. RUN python3 -m pip install --upgrade pip && \ ...: This installs Python dependencies specified in requirements.txt. It first upgrades pip, then installs venv-pack version 0.2.0, and finally installs the requirements listed in requirements.txt.

  8. RUN mkdir /output && venv-pack -o "/output/pyspark_venv.tar.gz": This creates a directory named /output within the Docker image and then uses venv-pack to package the Python virtual environment (/opt/venv) along with its dependencies into a TAR archive named pyspark_venv.tar.gz within the /output directory.

  9. FROM scratch AS export: This line specifies a new stage in the Dockerfile named export that starts from a completely empty base image (scratch). This stage will be used to export the packaged virtual environment without any additional filesystem layers from the base image.

  10. COPY --from=base "/output/pyspark_venv.tar.gz" /: This copies the previously created pyspark_venv.tar.gz TAR archive from the base stage to the root directory (/) of the export stage.

In summary, this Dockerfile sets up an environment based on Amazon Linux, installs dependencies, creates a Python virtual environment, installs Python packages specified in requirements.txt, packages the virtual environment with its dependencies, and then exports the packaged environment into a separate stage of the Docker image.

To facilitate building the Docker image and obtaining pyspark_venv.tar.gz as the output, modify the command as follows:

DOCKER_BUILDKIT=1 docker build --file Dockerfile-create-env --output . -t pyspark_env:latest .

This command instructs Docker to build the image using the specified Dockerfile (Dockerfile-create-env), with the output set to the current directory (.). Additionally, it tags the image with the name pyspark_env and the latest version for easy identification.

Implement PySpark Script

To incorporate the provided script into your PySpark script and utilize the input_path and output_path arguments, you can follow these steps:

  1. Add the script at the beginning of your PySpark script.
  2. Parse the command-line arguments using the argparse module.
  3. Access the input_path and output_path variables later in your script as needed.

Here's how you can modify your PySpark script:

import argparse

# Parse command-line arguments
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--input_path', type=str, help='Input path')
parser.add_argument('-o', '--output_path', type=str, help='Output path')
args = parser.parse_args()

# Access input and output paths
input_path = args.input_path
output_path = args.output_path

# Add your existing PySpark script below this section
# Example:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("YourAppName") \
    .getOrCreate()

# Read data from input path
df = spark.read.csv(input_path, header=True)

# Your data processing logic goes here

# Write processed data to output path
df.write.csv(output_path, header=True)

# Stop Spark session
spark.stop()

In this script:

  • argparse.ArgumentParser() is used to create a parser object to handle command-line arguments.
  • -i and -o are defined as command-line arguments for specifying input and output paths, respectively.
  • args.input_path and args.output_path are used to access the values passed for input and output paths, respectively.
  • You can replace the data reading, processing, and writing logic with your existing PySpark script.
  • Ensure that the script is saved with a .py extension, and you can execute it on EMR, passing appropriate input and output paths.

To incorporate the copying of script.py and pyspark_venv.tar.gz into your S3 bucket, you can use the following bash commands:

# Copy pyspark_venv.tar.gz to S3
aws s3 cp pyspark_venv.tar.gz s3://"$S3_BUCKET_NAME"/"$TASK_NAME"/

# Copy script.py to S3
aws s3 cp script.py s3://"$S3_BUCKET_NAME"/"$TASK_NAME"/

In these commands:

  • S3_BUCKET_NAME and TASK_NAME are variables representing your S3 bucket name and task name, respectively.
  • aws s3 cp is the AWS CLI command for copying files to/from S3.
  • Ensure that both pyspark_venv.tar.gz and script.py are in the current working directory or provide the full paths to them if they are located elsewhere.

Create Spark Job On EMR(Serverless)

To create an application in EMR using AWS CLI and store the application ID for later use, you can use the following bash commands:

# Create the application and retrieve the application ID
output=$(aws emr-serverless create-application \
    --release-label emr-7.0.0 \
    --type "SPARK" \
    --name "$TASK_NAME")
APPLICATION_ID=$(echo "$output" | jq -r '.applicationId')

# Optionally, you can also save the application ID to a file for future reference
echo "$APPLICATION_ID" > application_id.txt

In these commands:

  • TASK_NAME is the variable representing the name of your EMR task.
  • aws emr-serverless create-application is the AWS CLI command to create an application in EMR.
  • --release-label emr-7.0.0 specifies the release label for the EMR version.
  • --type "SPARK" indicates that the application type is Spark.
  • --name "$TASK_NAME" provides the name of the application.
  • jq -r '.applicationId' is used to extract the application ID from the JSON output.
  • Optionally, you can save the application ID to a file (application_id.txt) for future reference using echo "$APPLICATION_ID" > application_id.txt.

Here's the modified AWS CLI command for submitting a Spark job in EMR:

aws emr-serverless start-job-run \
    --application-id "$APPLICATION_ID" \
    --execution-role-arn "$EXE_ROLE" \
    --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://'"${S3_BUCKET_NAME}"'/'"${TASK_NAME}"'/script.py",
            "entryPointArguments": [
                "--input_path",
                "s3://'"${S3_BUCKET_NAME}"'/'"${INPUT_PATH}"'",
                "--output_path",
                "s3://'"${S3_BUCKET_NAME}"'/'"${OUTPUT_PATH}"'"
            ],
            "sparkSubmitParameters": "--conf spark.archives=s3://'"${S3_BUCKET_NAME}"'/'"${TASK_NAME}"'/pyspark_venv.tar.gz#environment --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python"
        }
    }' \
    --configuration-overrides '{
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": {
                "logUri": "s3://'"${S3_BUCKET_NAME}"'/'"${TASK_NAME}"'/logs/"
            }
        }
    }'

This script is using the AWS CLI (Command Line Interface) to start a job run on Amazon EMR (Elastic MapReduce) serverless. Let's break down the components:

  1. aws emr-serverless start-job-run: This is the command to initiate a job run on EMR serverless using the AWS CLI.

  2. --application-id "$APPLICATION_ID": This specifies the application ID for the job run. This likely identifies the specific application or service that will be used for processing.

  3. --execution-role-arn "$EXE_ROLE": This specifies the ARN (Amazon Resource Name) of the IAM (Identity and Access Management) role used for the execution of the job. IAM roles define permissions for the actions that the EMR service can perform on your behalf.

  4. --job-driver: This section defines the job driver configuration, particularly for Spark jobs. It includes:

  5. sparkSubmit: Specifies that the job is a Spark job and provides details about how to submit it.

  6. entryPoint: Specifies the entry point for the Spark job. It points to a script located in an S3 bucket. The ${S3_BUCKET_NAME} and ${TASK_NAME} are variables that likely hold the names of the S3 bucket and task respectively.

  7. entryPointArguments: Specifies the arguments to be passed to the entry point script. In this case, it seems to specify input and output paths in the S3 bucket.

  8. sparkSubmitParameters: Additional parameters to be passed to the Spark submit command. This includes configurations related to setting up the Python environment using virtualenv stored in the S3 bucket.

  9. --configuration-overrides: This section overrides default configurations for the job run. In this case, it includes monitoring configuration:

  10. s3MonitoringConfiguration: Specifies that the monitoring logs will be stored in an S3 bucket.

  11. logUri: Specifies the S3 URI where the logs will be stored. ${S3_BUCKET_NAME} and ${TASK_NAME} are variables representing the S3 bucket and task names.

Overall, this script is starting a serverless EMR job run, configuring it to run a Spark job, specifying the input and output paths, setting up Python environment using virtualenv, and configuring monitoring to store logs in an S3 bucket.