Realtime Streaming with AWS Services

22. December 2018 2018 0


In today’s data-driven world, most organizations generate vast amounts of data (Big Data) and need to instrument this data to gain value out of it. Data is generated from various sources in an organization such as mobile applications, user activity, user purchases, E-commerce applications, social media and so on.  Streamed data is often generated as continuous trickle feeds, such as log files or small bytes of data at rapid intervals, and this data needs to be instrumented and processed on the fly to take analytical decisions very quickly, to help us identify what is happening right now.

A few examples of streaming data include:

    1. A company wants to do instrument search and clickstream analytics into an analytical layer quickly, to understand the search and click patterns.
    2. Social media trends such as Twitter mentions, Facebook likes, and shares, need to be instrumented and analyzed to know the product trends.
    3. Analyzing user activity in mobile applications, to understand user interaction with the application to customize and offer different experiences in the mobile app.screenshot 2018-12-21 at 2.30.12 PM
    4. Analyzing event data generated from different hardware such as sensors and IOT devices, to make timely decisions such as device replacement on hardware failures, weather forecast, etc.
    5. Various use cases of real-time alert and response during a specific event, e.g., taking necessary action when inventory runs low on a black Friday sale.

 

 

Real time streaming with AWS services


AWS offers different managed real time streaming services that allows us to stream, analyze and load data into analytic platforms. The different streaming services that AWS offers include:

  1. Amazon Kinesis Data Streams
  2. Amazon Kinesis Data Firehose
  3. Amazon Kinesis Data Analytics
  4. Amazon Kinesis Video Streams

Kinesis Data Streams enables us to capture large amounts of data from different data producers and stream it into custom applications for data processing and analytics, by default data is available for 24 hours but it can be made available up to 7 days (168 hours).

Kinesis Data Firehose is a data ingestion product that is used for capturing and streaming data into storage services such as S3, Redshift, Elasticsearch and Splunk. Data can be ingested into firehose directly using firehose APIs or can be configured to read from Kinesis Data Streams. We shall discuss more on Kinesis Data Firehose in this article.

Kinesis Data Analytics enables us to analyze streaming data by building different SQL queries using built-in templates and functions that allows us to transform and analyze streaming data. Data sources for Kinesis data analytics can be either Kinesis data streams or Kinesis Data Firehose.

Kinesis Video Streams is one of the recent offerings that enables us to securely stream video from different media devices for analytics and processing.

As all of these Kinesis services are managed services, they elastically provision, deploy and scale the infrastructure needed to process the data.

Deep Dive into Kinesis Data Firehose

 

With Kinesis data firehose it is very easy to configure data streaming and start instrumenting data in a couple of minutes. Let’s take a look at the Firehose UI and start configuring our first streaming application.

Step 1: Define stream and configure Source:

We define a delivery stream name and the source from which the stream gets data.

Kinesis Data Firehose can be configured to read data from Kinesis Data Stream or leverage Firehose APIs to write to the Firehose stream directly.


Leveraging Firehose APIs to write to the stream is called Direct PUT. when this option is chosen, we can write to firehose stream directly or can also configure to stream data from other AWS services like AWS IOT, Cloud watch logs and Cloud watch events.

When Kinesis Data Stream is chosen as a source, it is required to create a kinesis stream in the first place before creating a firehose delivery stream. It is also important to know that we cannot change the source of the firehose delivery stream once it is created.

Firehose can handle 5000 records / second when configured as Direct PUT method, records can also be batched together using the PutRecordBatch method. With PutRecordBatch, we can batch up to 500 records together and send it to firehose.

Step 2a: Record transformation

With Kinesis Data Firehose, we can transform records using custom AWS Lambda functions and can also convert records to open source formats that are very efficient to query.


While transforming kinesis records with lambda, we can write custom logic to process the records. Each record is identified by a unique RecordID and we are supposed to apply the custom logic to every record and return if the record is valid to process further or be rejected.

This is defined in the result field of the payload; the record is valid if the result is defined as ‘Ok’ and invalid otherwise.

Here is a nodejs based sample AWS Lambda code that is used for validation:

 

Step 2b: Convert record formats

With record formats enabled, it is possible to convert record formats to open source data formats like Apache Parquet or Apache ORC format which is efficient in querying JSON.

In order to enable record format conversion, it is mandated to predefine glue database and glue tables with a table structure.

It is important to note that once record format conversion is enabled, we can only deliver records to S3. Other destinations such as Redshift, Elasticsearch, and Splunk are not currently supported.

Step 3a – Delivering data to Amazon S3:

Once record transformation and record processing is defined, we can now deliver records to a destination. When choosing S3 as the target, it is important to note that we need to choose an S3 bucket and optionally we can also choose a prefix to which data will be delivered.


It is important to note that records will be delivered in folders of time partitions in S3.

In our example records will be delivered in the partition – myfirststreambucket/test/year/mon/day/hour/myfirst-firehose-stream-xxxx

(myfirststreambucket/test/2018/12/10/10/myfirst-firehose-stream-1234)

The partitions generated by firehose is not compatible with Hive partitions, so if Firehose generated records are to be queried with Athena, or Hive or glue crawlers, we need to convert Firehose partition data to partition that is compatible with hive. This can be done by triggering an AWS Lambda that will convert Firehose partitions to Hive partitions.

Step 3b – Delivering data to Amazon Redshift

An intermediate S3 bucket is required  to stage data in addition to the Redshift cluster details in order for Firehose to deliver data to Amazon RedShift.

 

Firehose generates manifest files in a folder within the configured S3 bucket for every file that is written to S3. The manifest file is referenced in the copy command for loading data into Redshift.

Additional copy parameters can be specified in the COPY Options field which gets appended to the copy command and gets executed.

When a file fails loading into Redshift, Firehose retries to load the record into Redshift for the specified retry duration after which it skips the record. Error details of failure can be found in STL_LOAD_ERRORS table in redshift.

Step 3c – Delivering data to Amazon Elasticsearch service and Splunk

Similar to delivering data to Amazon Redshift, data can be loaded to Elasticsearch and Splunk by configuring the Elasticsearch / Splunk endpoints.

It is also required to highlight the S3 bucket details that are used for intermediate backup.

 

 

Step 4 – Buffering, Logging and Permissions

Once the source, process and destination configurations are configured, it is necessary to configure the buffer conditions, logging, and enabling the necessary permissions for Firehose to load data to the target destination.

We can specify the buffer size and intervals, and Firehose will buffer data and send it to the target when either of the conditions are met.

 

Firehose buffers data until the buffer size or the buffer interval conditions are met, once either of the conditions are satisfied, data is delivered to the destination.

In this case, firehose buffers for 5mb or 300 seconds once 5mb or 300 seconds condition is met data is delivered to the destination.

Firehose offers compression in GZip, Snappy or Zip formats, and data can also be encrypted using a KMS master key.

When error logging is enabled, Firehose creates a separate cloudwatch log group and logs the state of the delivery stream for each execution.

For delivering data from a source to a target, necessary permission needs to be granted to Firehose. This is handled by the IAM roles.

Step 5 – Review and send data.

Once the above configuration is defined correctly, we can now start to send data into Firehose using Firehose APIs.

We will see how we can use Firehose API commands to ingest data.

Kinesis Put Record Example:

On successful execution, the API returns a RecordId that is similar to the one below.

Firehose by default will not add a record delimiter when multiple put record statements are fired. Firehose delivers data with records appended to each other.

It is necessary to embed any required delimiters within the record.

 

This delivers the following output to S3 with the appropriate delimiters.

When Kinesis Data Stream is chosen as a source, Firehose scales elastically based on the number of shards defined in the kinesis stream.

We saw how kinesis data firehose can be used to effectively ingest data, by combining the 3 services of Kinesis – Kinesis Data Streams, Kinesis Data Analytics and Kinesis Data Firehose it is possible to create an end-end ETL pipeline for streaming data.

About the Author:
Srivignesh KN (@srivigneshkn) is a Senior Data Engineer at Chegg, Sri builds and architects data platforms in the cloud.  Sri is an avid AWS user, he recently presented at the AWS Community Day on Real time Analytics, the presentation can be found here.