Data Flow: Leveraging AWS Kinesis and Redshift

No Image
7 min read

Motivation

Many use cases require streaming processes for near real-time data analysis with low latency. One such scenario involves gathering user log data to perform analytics or train machine learning models. In this solution, we leverage AWS services such as Amazon Kinesis Data Streams and Amazon Redshift. Here, we focus on creating an API to store user records, but the pipeline can easily connect to other data sources. We'll delve into the details of each AWS module and emphasize the importance of connecting AWS services effectively. From a cost perspective, this solution is cost-effective compared to others and requires minimal configuration, ensuring speedy implementation.

Step 1: Create an Amazon Kinesis Data stream

Amazon Kinesis Data Stream serves as a conduit for seamlessly passing your data, whether for storage or real-time processing. Its primary function is to facilitate data flow. To achieve more complex tasks, additional services are often required to orchestrate processes beyond the stream itself. Here are several options:

  1. Amazon Data Firehose: This service is ideal for seamlessly storing data in your database.
  2. Managed Apache Flink (formerly Amazon Kinesis Data Analytics): It enables application consumption and processing of the stream data.
  3. Custom consumer libraries: These provide flexibility to tailor data processing according to specific requirements.

In this setup, we focus on directly storing data in Amazon Redshift using Redshift digestion, eliminating the need for Firehose to save data to the Redshift warehouse.

Steps to create an Amazon Kinesis Data Stream:

  1. Open the Amazon Kinesis Console and navigate to Data streams. Then, click on Create data stream.
  2. Provide a name for the Data stream, such as "mltimes-logger".
  3. Specify the Number of open shards as "1". (For production scenarios, use the shard calculator to determine the appropriate number of shards.)
  4. Click Create data stream.

(Note: The chosen name is significant for later reference when specifying which records to pass through. For now, we've used "mltimes-logger" as an example.)

Step 2: Set up Amazon redshift

We've set up a serverless Redshift cluster, although the configuration process remains consistent regardless of whether a server is involved. Begin by creating an Amazon Redshift cluster through a simple click on the create button and follow the default settings. Next, in the left panel, navigate to Query editor V2 to interact with the database via queries.

To map data from Kinesis Data Streams to an Amazon Redshift object, execute the following SQL command:

CREATE EXTERNAL SCHEMA evdata FROM KINESIS
IAM_ROLE 'arn:aws:iam::????:????';

Replace 'arn:aws:iam::????:????' with your IAM role that has both AmazonKinesis and AmazonRedshiftAllCommands policies attached to it.

At this point, we've merely defined the mapping of stream data to Redshift. However, we need to save the data from the designated queue. Here, we introduce the concept of a MATERIALIZED VIEW, which aids in this process. A MATERIALIZED VIEW is the result of a SQL query that can be utilized later to enhance performance, akin to saving intermediate computations to expedite subsequent calculations. These can be employed automatically when complex queries are executed in the background. In our scenario, we leverage this feature to consume the data stream.

Consider the following SQL query:

CREATE MATERIALIZED VIEW mltimes_endpoint_extracted DISTKEY(6) SORTKEY(1) AUTO REFRESH YES AS
    SELECT 
    refresh_time,
    approximate_arrival_timestamp,
    partition_key,
    shard_id,
    sequence_number,
    json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'mltimes_endpoint_name',true)::varchar(100) AS mltimesEndpointName
    FROM evdata."mltimes-logger" 
    WHERE LENGTH(kinesis_data) < 65355;

In the FROM clause (FROM evdata."mltimes-logger"), we specify reading data from the streaming source. The expression json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'mltimes_endpoint_name',true)::varchar(100) AS mltimesEndpointName parses the streaming data and extracts the mltimes_endpoint_name from the JSON. Additionally, there's a filter based on the length of the Kinesis data.

Step 3: Create API gateway

Head over to the API Gateway console and create an API along with a resource. Follow these steps to create a method:

1- For Method type, select PUT. 2- Choose AWS service for Integration type. 3- Select the AWS Region where your Kinesis stream resides. 4- For AWS service, opt for Kinesis. 5- Leave AWS subdomain blank. 6- Pick POST for HTTP method. 7- Choose Use action name for Action type. 8- Enter PutRecords for Action name. 9- Input your execution role's ARN for Execution role. 10- Maintain the default of Passthrough for Content Handling. 11- Click on Create method.

12- In the Integration request section, add the following URL request headers parameters:

Content-Type: 'x-amz-json-1.1'

13- Now, add the following mapping template to map data from the PUT method request to the corresponding integration request:

{
    "StreamName": "mltimes-logger",
    "Records": [
       #foreach($elem in $input.path('$.records'))
          {
            "Data": "$util.base64Encode($elem.data)",
            "PartitionKey": "$elem.partition-key"
          }#if($foreach.hasNext),#end
        #end
    ]
}

Ensure that the StreamName matches your Amazon Kinesis Data Stream name, and the structure of the Records contains PartitionKey for each record. You can find more details about the request parameters here.

14- Before deploying, you can test your API, check input/output, and even verify that your data appears in the table mltimes_endpoint_extracted. For testing, you can use this payload in your PUT request:

{
    "records": [
        {
            "data": "some data",
            "partition-key": "some key"
        },
        {
            "data": "some other data",
            "partition-key": "some key"
        }
    ]
}

Once testing is complete, you can deploy your API with a chosen stage.

Step 4: Generate data using API

In your API panel, navigate to stages and select your stage to find your Invoke URL. Keep this URL handy and open a notebook to send a PUT request using the Python requests library.

import requests
import json

info_dict = {"mltimes_endpoint_name": "Blog"}
payload = {
    "records": [
        {
            "data": json.dumps(info_dict),
            "partition-key": "p1"
        }
    ]
}

invoke_url = "<INVOKE_URL>"
r = requests.put(invoke_url, json=payload)

It's crucial to maintain the structure of the payload. Since we've defined json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'mltimes_endpoint_name',true)::varchar(100) AS mltimesEndpointName, we must include mltimes_endpoint_name somewhere in our JSON payload. Depending on your requirements, you can add any additional fields you need. However, ensure that the value for the data key is a string type. To achieve this, we use json.dumps. Additionally, since the partition-key is required, we include it in the payload. Later on, you may use it to manage different partitions at a lower cost in SQL queries. Finally, it's important to include the json parameter in your PUT request with the described payload (not data=payload).

References

Track and visualize streaming beacon data on AWS

Real-time analytics with Amazon Redshift streaming ingestion

Streaming Data Solution for Amazon Kinesis

Tutorial: Create a REST API as an Amazon Kinesis proxy in API Gateway

PutRecords