AWS Kinesis Lab

Introduction Prerequisite

Install AWS CLI


Kinesis Data Streams

KPL Setup Lab

Reference

https://www.whizlabs.com/learn/course/aws-certified-data-analytics-specialty/video/5274
https://zacks.one/aws-certified-data-analytics/#2.3.2.4
Amazon Kinesis Data Streams FAQs


Introduction


Kinesis Configuration

Services -> Kinesis


Select Kinesis Data Streams
Click on Create data stream


Data stream name

  • kinesis-kpl-demo


Under Data stream capacity, expand Shard estimator.

If consumers are more than 2.

Writing to the stream (Inbound Stream)

  • Average record size (in KiB): 1024
  • Maximum records written per second: 2

Reading from the stream (Outbound Stream)

  • Total number of consumers: 1

Click on Apply this value


Click on Create data stream


Once the Status change to Active.
Click on Monitoring tab.


Configuration

Click on Configuration tab.


Configure Data retention

Default Data retention is 24 hours.

  • Maximum: 365 days

Under Data retention
Click on Edit for extending Data retention.

See Amazon Kinesis Data Streams pricing

Give it 30 days retention.
Click on Save changes


Configure Encryption

Under Encryption
Click on Edit


Check Enable server-side encryption
Click on Cancel


Enhanced (shard-level) metrics

Under Enhanced (shard-level) metrics
Click on Edit


Click on Cancel


Enhanced fan-out (0)

Click on Enhanced fan-out (0) tab


Consumers with enhanced fan-out receive their own 2 MiB/second of read throughput per shard. Using enhanced fan-out is optional and includes additional cost.


KPL Lab

https://www.whizlabs.com/learn/course/aws-certified-data-analytics-specialty/video/5275


Introduction

How can I put data records into a Kinesis data stream using the KPL?

Short description

To put a record into a Kinesis data stream using the KPL, you must meet the following requirements:

  • You have a running Amazon Elastic Compute Cloud (Amazon EC2) Linux instance.
  • An AWS Identity and Access Management (IAM) role is attached to your instance.
  • The KinesisFullAccess policy is attached to the instance’s IAM role.

EC2 Configuration

Prerequisite

Launch a new EC2 instance with free tier settings and SSH security group. Then SSH into your EC2 instance.

  • Key: Name
  • Value: Kinesis-Instance

Java ENV

Install the latest version of the OpenJDK 8 developer package:

1
sudo yum install java-1.8.0-openjdk-devel


Confirm that Java is installed:

1
java -version


Run the following commands to set Java 1.8 as the default java and javac providers:

1
2
sudo /usr/sbin/alternatives --config java 
sudo /usr/sbin/alternatives --config javac


Apache Maven ENV

Add a repository with an Apache Maven package:

1
sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo


Set the version number for the Maven packages:

1
sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo


Use yum to install Maven:

1
sudo yum install -y apache-maven


Confirm that Maven is installed properly:

1
mvn -version


Git ENV

Install git, and then download the KPL from Amazon Web Services - Labs:

1
2
sudo yum install git
git clone https://github.com/awslabs/amazon-kinesis-producer


Check file

Open the amazon-kinesis-producer/java/amazon-kinesis-producer-sample/ directory, and then list the files:

1
2
cd amazon-kinesis-producer/java/amazon-kinesis-producer-sample/
ls


IAM Configuration

Services -> EC2 -> Instances

Select the Instance you created for Kinesis. See the IAM Role should be -.


Create a IAM Role for EC2 instance

Services -> IAM -> Roles

Click on Create role


Select EC2.
Click on Next: Permissions


Search for AmazonKinesisFullAccess.
Select it.
Then Click on Next: Tags


  • Key: Name
  • Value: Kinesis-KPL

Click on Next: Review


  • Role name: Kinesis-KPL

Click on Create role



Attach the IAM Role to EC2 instance

Services -> EC2 -> Instances

Select the Kinesis instance and Click on Action -> Security -> Modify IAM role


Select Kinesis-KPL then click on Save.


Refresh the page and see the latest Instance details.


KPL Configuration

Setup AWS Region

Run the following command and input us-east-1 for Default region name.

1
aws configure

Create Kinesis Data Stream

Run a command similar to the following to create a Kinesis data stream:

If you do not did the previous lab KPL Setup Lab, run the following command.

1
aws kinesis create-stream --stream-name kinesis-kpl-demo --shard-count 2

If you did the previous lab (KPL Setup Lab), which means you have created kinesis stream, run the following command.

1
aws kinesis list-streams

For more information about the number of shards needed, see Resharding, scaling, and parallel processing.


Modify demo file

Open the SampleProducer.java file on the Amazon Web Services - Labs GitHub repository, and then modify the following fields:
For public static final String STREAM_NAME_DEFAULT, enter the name of the Kinesis data stream that you previously created.
For public static final String REGION_DEFAULT, enter the Region that you’re using.

Example:

1
2
3
4
5
cd src/com/amazonaws/services/kinesis/producer/sample
vim SampleProducerConfig.java

public static final String STREAM_NAME_DEFAULT = "kinesis-kpl-demo";
public static final String REGION_DEFAULT = "us-east-1";


Run kinesis demo

Run the following command in the amazon-kinesis-producer-sample directory to allow Maven to download all of the directory’s dependencies:

1
2
3
cd /home/ec2-user/amazon-kinesis-producer/java/amazon-kinesis-producer-sample

mvn clean package


Run the following command in the amazon-kinesis-producer-sample directory to run the producer and to send data into the Kinesis data stream:

1
mvn exec:java -Dexec.mainClass="com.amazonaws.services.kinesis.producer.sample.SampleProducer"


Validation Test

Services -> Kinesis -> Data streams

Check the Incoming Data (Count) graph on the Monitoring tab of the Kinesis console to verify the number of records sent to the stream.

Note: The record count might be lower than the number of records sent to the data stream. This lower record count can occur because the KPL uses aggregation.

Click on the kinesis-kpl-demo


See Incoming data chart

Put Records latency


One more thing

In the default_config.properties file read the descriptions of these critical properties: AggregationEnabled, AggregationMaxCount, AggregationMaxSize, RecordMaxBufferedTime, RecordTtl, and RequestTimeout

1
2
3
pwd
ls
vim default_config.properties


KCL Lab

Review

Write

$Inbound \space Write \space Bandwidth \space in \space KB = Average \space record \space size \space (in \space KiB) \times Maximum \space records \space written \space per \space second$
$Inbound \space Write \space Capacity = \frac{Inbound \space Write \space Bandwidth \space in \space KB}{1000}$

Read

$Outgoing \space Read \space Bandwidth \space in \space KB = Inbound \space Write \space Bandwidth \space in \space KB \times number \space of \space consumers$
$Outgoing \space Read \space Capacity = \frac{Outgoing \space Read \space Bandwidth \space in \space KB}{2000}$

Number of Shards

$Number \space of \space Shards = Max (Inbound \space Write \space Capacity, Outgoing \space Read \space Capacity)$

Total Data Stream Capacity

  • 1 Shard = 1 MB/s Write Capacity = 2 MB/s Read Capacity
  • Total Read Capacity = Number of Shards * 1
  • Total Write Capacity = Number of Shards * 2

KPL aggregate our records.


KCL Configuration

Python3 KinesisClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import boto3
import json
import time

# define your stream name
kpl_stream = 'kinesis-kpl-demo'

# create the Kinesis service reference for your region
kinesis_client = boto3.client('kinesis', region_name='us-east-1')

# get the description of your Kinesis Data Stream
response = kinesis_client.describe_stream(StreamName=kpl_stream)

# use the Kinesis Data Stream description to get the shard ID
shard_id = response['StreamDescription']['Shards'][0]['ShardId']

# create your shard iterator
shard_iterator = kinesis_client.get_shard_iterator(StreamName=kpl_stream, ShardId=shard_id, ShardIteratorType='LATEST')

shard_iterator = shard_iterator['ShardIterator']

# retrieve your first Kinesis Data Streams record
record_response = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=2)

# loop until you have received all of the Kinesis Data Streams records
while 'NextShardIterator' in record_response:
record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'], Limit=2)

# add your deaggregation logic here
# where you will deaggregate the user records from each Kinesis Data Streams record
# you will then perform actions on your user records, such as storing them on S3, or copying them to a Redshift table
print (record_response, "\n")

# wait for 1 second before retrieving the next Kinesis Data Streams record
time.sleep(1)


Validation Test

Run KinesisClient.py.
We can see that the data is uploading to Kinesis.

Go to Kinesis Data Streams -> Monitoring to see the data stream.


Kinesis Data Firehose


Firehose basic lab

Kinesis Data Streams Configuration

Services -> Kinesis -> Data Streams
Click on Create data stream


  • Data stream name: kinesis-firehose-demo
  • Number of open shards: 2

Click on Create data stream



Kinesis Data Firehose Configuration

Services -> Kinesis


Click on Create delivery stream


Step 1: Name and source
  • Delivery stream name: firehose-stream
  • Source: Kinesis Data Stream
    • Select kinesis-firehose-demo

Click on Next


Step 2: Process records
  • Data transformation: Enabled

Open Lambda in new tab
Services -> Lambda -> Dashboard
Click on Create function


Click on Use a blueprint
Search for firehose
Select kinesis-firehose-process-record-python
Click on Configure


Function name: firehosetransformation
Execution role: Select Create a new role with basic Lambda permissions

Review the code

Python 2.7 kinesis-firehose-process-record-python.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from __future__ import print_function

import base64

print('Loading function')


def lambda_handler(event, context):
output = []

for record in event['records']:
print(record['recordId'])
payload = base64.b64decode(record['data'])

# Do custom processing on the payload here

output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(payload)
}
output.append(output_record)

print('Successfully processed {} records.'.format(len(event['records'])))

return {'records': output}

Click on Create function



Go back to Kinesis Firehose page.

  • Data transformation: Enabled
    • Lambda function: firehosetransformation (You may need click on the refresh button)


Go to Lambda page.

Scroll down to Basic settings, then click on Edit


  • Timeout: 1 min 3 sec

Click on Save


Go back to Kinesis Firehose page.

Increase Lambda function timeout
To reduce the risk of the function timing out before data transformation is complete, increase the Timeout to 1 minute or longer in the Advanced settings section of your Lambda configuration.


Click on Next


Step 3: Choose a destination
  • Destination: Amazon S3

S3 bucket: Click on Create new
Give your bucket an unique name


  • Prefix - optional: transformed
  • Error prefix - optional: error


  • Source record S3 backup: Enabled

    • Select your kinesis firehose bucket
  • Backup S3 bucket prefix - optional: source_records/

Click on Next


Step 4: Configure settings
  • Buffer interval: 60

Click on Next


Step 5: Review

Click on Create delivery stream


Monitor Firehose

Click on your firehose name

Click on Monitoring tab


Feed data to Kinesis

The following Python program will generate a JSON file with some records then send it to Kinesis Firehose.

Python3 KinesisProducer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import json
import boto3
import random
import datetime

# define our Kinesis service using the us-west-2 region
kinesis = boto3.client('kinesis', region_name='us-east-1')

# function to produce our streaming data
def produceData():
data = {}
time_now = datetime.datetime.now()
time_now_string = time_now.isoformat()
data['EVENT_TIME'] = time_now_string
data['ITEM'] = random.choice(['Longboard', 'Onewheel', 'Surfboard', 'Snowboard', 'Paddleboard'])
price = random.random() * 100
data['PRICE'] = round(price, 2)
return data

# define the number of data stream elements we wish to create
number_of_records = 30
record_count = 0

# create the streaming data and send it to our Kinesis Data Stream called kinesis-firehose-demo
while record_count < number_of_records:
data = json.dumps(produceData()) + 'record # ' + str(record_count)
print(data)
kinesis.put_record(
StreamName="kinesis-firehose-demo",
Data=data,
PartitionKey="partitionkey")
record_count += 1

Run KinesisProducer.py.


Validation Test

Go to Kinesis Firehose Monitoring page.
Click on 1h.
Wait 60s then refresh the page.


Go to S3 and open the kinesis firehose lab bucket.

We have the transformed data and backup data.

After give the object public access permission, we can download and open it.


Data Format and Compression Lab

Diagram

  • Kinesis Data Firehose
    • Use format conversion feature if data in JSON
    • Use S3 compression (GZIP, Snappy, or Zip)

Kinesis Data Streams Configuration

Services -> Kinesis -> Data streams


Click on Create data stream


  • Data stream name: kinesis-transform-demo
  • Number of open shards: 1

Click on Create data stream


Kinesis Data Firehose Configuration

Services -> Kinesis -> Delivery streams


Step 1: Name and source

Click on Create delivery stream

  • Delivery stream name: kinesis-transform-firehose-demo
  • Source: Kinesis Data Stream: kinesis-transform-demo

Click on Next


Step 2: Process records

Click on Next


Step 3: Choose a destination
  • Destination: Amazon S3
  • S3 bucket: choose your lab bucket
  • Backup S3 bucket prefix - optional: compress-transform
  • Backup S3 error prefix - optional: error-compress-transform

Click on Next


Step 4: Configure settings

S3 compression and encryption

  • S3 compression: GZIP

Click on Next


Step 5: Review

Click on Create delivery stream


Feed Kinesis Data Stream

Python 3 KinesisProducer-C&F.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import sys
import json
import boto3
import random
import datetime
import pandas as pd

# define our Kinesis service using the us-west-2 region
kinesis = boto3.client('kinesis', region_name='us-east-1')
def start_process(format_flag):
#format_flag = int(format_flag)
begin_streaming(format_flag)

# function to produce our streaming data in JSON format
def produceData():
data = {}
time_now = datetime.datetime.now()
time_now_string = time_now.isoformat()
data['EVENT_TIME'] = time_now_string
data['ITEM'] = random.choice(['Longboard', 'Onewheel', 'Surfboard', 'Snowboard', 'Paddleboard'])
price = random.random() * 100
data['PRICE'] = round(price, 2)
return data

# function to produce our streaming data in CSV format
def produceCSVData():
data = pd.read_csv("Surveillance.csv")
return data

def begin_streaming(format_flag):
# define the number of data stream elements we wish to create
number_of_records = 30
record_count = 0

if format_flag == 'transform':
# create the streaming data and send it to our Kinesis Data Stream called kinesis-transform-demo
data = produceCSVData()
for _, row in data.iterrows():

values = ','.join(str(value) for value in row) # join the values together by a ','

encodedValues = bytes(values, 'utf-8') # encode the string to bytes
print(encodedValues)
kinesis.put_record(
StreamName="kinesis-transform-demo",
Data=encodedValues,
PartitionKey="partitionkey")
record_count += 1
elif format_flag == 'format':
data = produceData()
while record_count < number_of_records:
data = json.dumps(produceData()) #+ 'record # ' + str(record_count)
print(data)
kinesis.put_record(
StreamName="kinesis-transform-demo",
Data=data,
PartitionKey="partitionkey")
record_count += 1

if __name__ == "__main__":
#print(f"Arguments count: {len(sys.argv)}")
#for i, arg in enumerate(sys.argv):
# print(f"Argument {i:>6}: {arg}")
format_flag = sys.argv[1]
start_process(format_flag)

Copy the code above and create a Python file named KinesisProducer-C&F.py

Shell
1
python3 KinesisProducer-C\&F.py format

Notice the terminal windows shows we have sent some tabular data to kinesis-transform-demo


Validation Test

Services -> Kinesis -> Data streams

Click on kinesis-transform-demo
Click on Monitoring tab

Kinesis Data Stream monitor shows we have received the data.


Services -> Kinesis -> Delivery streams

Click on kinesis-transform-firehose-demo
Click on Monitoring tab

Kinesis Data Firehose monitor shows Kinesis Data Stream has transformed the data to Kinesis Data Firehose.


Services -> S3
Open your lab bucket.

S3 shows Kinesis Data Firehose has compressed then transformed the data to S3 bucket.


Download the file.

Open the file.


Data Transform and Format Lab


Data Source & Prerequisite

COVID-19 Surveillance Data Set


Download the dataset from the above link then upload it to your S3 bucket.


Diagram

  • Kinesis Data Firehose
    • Use Lambda transform to preprocess format conversion feature if data not JSON
    • Use format conversion feature once data in JSON format

AWS Glue Configuration

Services -> AWS Glue


Finish AWS Glue ETL Lab first if you cannot understand what is AWS Glue.

AWS Glue Lab


We want to crawl the .csv file to build the table in catalog. So we need configure the security.

Click on Security configurations navigation


Security configurations

Click on Add security configuration


  • Security configuration name: glueDemoSecurityConfig
  • Encryption mode: SSE-KMS

Three things we can encrypt in AWS Glue:

  • Data (at-rest)

  • CloudWatch logs

  • Job bookmark

  • Uncheck CloudWatch logs encryption

Expand Advanced properties

  • Uncheck Job bookmark encryption

Click on Finish


Refresh the page to see your security configuration


Extract Data

Click on Tables navigation


Click on Add tables using a crawler


Crawler info
  • Crawler name: uci-covid

Expand Tags, description, security configuration, and classifiers (optional)

  • Description:
    • Glue crawler for COVID-19 Surveillance Data Set.
    • Data source: https://archive.ics.uci.edu/ml/datasets/COVID-19+Surveillance
  • Security configuration: glueDemoSecurityConfig

Click on Next


Crawler source type

Leave all settings as default

Click on Next


Data store
  • Choose a data store from the following: S3
    • S3
    • JDBC
    • DynamoDB
    • Amazon DocumentDB
    • MongoDB
  • Table Name:

Select Surveillance.csv you uploaded at the prerequisite step.


  • Include path: your bucket with the sample .csv file. If you want to scan all files under the directory, add / at the end of the path.

Exclude patterns: define the prefix of the data that you want the crawler skip.

Click on Next


Select No and click on Next


IAM Role
  • The IAM role allows the crawler to run and access your Amazon S3 data stores.

    • Select Create an IAM role
  • IAM Role for Glue: Role assumed by the crawler with permission to access your data store.

    • uci-covid

The IAM will give your new role (demoClawer) the S3 bucket the following permissions:

  • s3:GetObject
  • s3:PutObject

Click on Next


Schedule
  • Frequency: Run on demand

Click on Next


Output

Click on Add database

  • Database: uci-covid

Click on Next


Review all steps

Click on Finish


Validation Test
Run Crawler

Working with Crawlers on the AWS Glue Console

Select your new crawler, then click on Run crawler


CloudWatch logs

Click on Logs


Databases

Click on Databases navigation
Click on uci-covid


Click on tables in uci-covid


Transform Data

Click on Jobs navigation


Click on Add job


Job properties
  • Name: uci-covid-job

Click on Create IAM role
Ensure that this role has permission to your Amazon S3 sources, targets, temporary directory, scripts, and any libraries used by the job.

Temporarily give a role with AWSGlueConsoleFullAccess and AmazonS3FullAccess

  • IAM role: unlimitedGlue


  • Type: Spark
  • Glue version: Spark 2.4, Python 3 (Glue Version 1.0)
  • This job runs: A proposed script generated by AWS Glue


Expand Advanced properties


Expand Monitoring options

  • Job bookmark: Specifies how AWS Glue processes job bookmark when the job runs. It can remember previously processed data (Enable), update state information (Pause), or ignore state information (Disable).
    • Disable

Expand Security configuration, script libraries, and job parameters (optional)

  • Security configuration: glueDemoSecurity

  • Worker type: Standard

    • Standard
    • G.1X(Recommended for memory-intensive jobs)
    • G.2X(Recommended for jobs with ML transforms)
  • Number of workers: The maximum number of workers you can define are 299 for G.1X, and 149 for G.2X.

  • Max concurrency: Sets the maximum number of concurrent runs that are allowed for this job. An error is returned when this threshold is reached. The default is 1.

  • Job timeout (minutes): Set the execution time limit in minutes.

  • Delay notification threshold (minutes): Set a delay threshold in minutes. If the job runs longer than the specified time Glue will send a delay notification via CloudWatch

  • Catalog options (optional): To use Glue data catalog as the Hive metastore, the IAM role used for the job should have glue:CreateDatabase permissions. A database called “default” is created in the Glue catalog if it does not exist.

Click on Next


Data source

Select the data source you created before.
Click on Next


Transform type
  • Change schema: Change schema of your source data and create a new target dataset
  • Find matching records: Use machine learning to find matching records within your source data
    • Remove duplicate records: When records match, the record with the lowest primary key value survives.
  • Worker type: Set the type of predefined worker that is allocated when a job runs.
  • Maximum capacity: Set the maximum number of AWS Glue data processing units (DPUs) that can be allocated when a job runs.

Select Change schema then click on Next


Data target

Select Create tables in your data target

  • Data store: Amazon S3
  • Format: JSON
  • Compression type: None
  • Target path: choose your sample .csv file bucket

Click on Next


Schema

Review the original .csv file

You can Add column, Delete column, Reorder columns.

Click on Save job and edit script.


Script editor


Click on Database on the left and top page.

Then click on Schema tab.


On the top navigation pane, you can change the Source, Target, Target Location.

See more of Transforms


Click on Run job
You have a second chance to modify the settings we did before.
Click on Run job


The job is running.

Refresh the page to see Logs

Once the refresh icon(on the left and top page) disappeared, the job is finished.


Go to S3 bucket to see the transformed file.

Select the transformed file, then click on Action and Download

Compare to the .csv file.


Run Crawler Again

Services -> AWS Glue -> Crawlers

Since we have the .json file that converted from the .csv file in the S3 bucket, we need to run the crawler again.

Select uci-covid
Click on Run crawler


Click on Tables navigation.
We have a brand new table that is .json format.
Click on the .json table.



Click on Edit table
We manually update the Serde parameters, Key (path), Value from A01 to A07, and Category.
Click on Apply


Click on Edit Schema
Then update the column name as the screenshot.
Click on Save


Lambda Configuration

Services -> Lambda


Create a Lambda function from scratch

Click on Create function


Select Author from scratch

Basic information

  • Function name: firehosetransformation2
  • Runtime: Python 2.7

Expand Change default execution role

Execution role

  • Select Create a new role with basic Lambda permissions

Click on Create function


Edit function

Under Function code
Copy and paste the following code.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from __future__ import print_function

import base64
import json

print('Loading function')

def lambda_handler(event, context):
output = []

for record in event['records']:
print(record['recordId'])
payload = base64.b64decode(record['data'])

# Do custom processing on the payload here, where we convert our CSV data to
json_data = {}
fieldnames = ("A01","A02","A03","A04", "A05", "A06", "A07", "Category")
print(json.dumps(payload))
separated_data = payload.split(",")
count = 0
for element in separated_data:
json_data[fieldnames[count]] = element
count += 1

print(json_data)

output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(json_data))
}
output.append(output_record)

print('Successfully processed {} records.'.format(len(event['records'])))

return {'records': output}

Click on Deploy


Scroll down to Basic settings, then click on Edit


  • Timeout: 1 min 3 sec

Click on Save


Kinesis Data Streams Configuration

Services -> Kinesis -> Data Streams


Same as Data Compression Lab

If you have kinesis-transform-demo, skip the above step.


Kinesis Data Firehose Configuration

Services -> Kinesis -> Delivery streams


Almost ame as Data Compression Lab

If you have kinesis-transform-firehose-demo, skip the above step.


Click on kinesis-transform-firehose-demo


Click on Edit


Transform source records with AWS Lambda

  • Source record transformation: Enabled
  • Lambda function: firehosetransformation2


Convert record format

  • Record format conversion: Enabled

And we need a schema for the conversion.


  • AWS Glue region: Choose your region
  • AWS Glue database: uci-covid
  • AWS Glue table: uci_covid convert to the latest one, here is run.. (Because it is .json instead of .csv)
  • AWS Glue table version: Latest


Source record S3 backup

  • Source record S3 backup: Disabled

P.S: Backup the source data before conversion.


Amazon S3 destination

  • S3 bucket: Choose your S3 bucket

  • Backup S3 bucket prefix - optional: csv-transform

  • Backup S3 bucket error prefix - optional: csv-error-transform

  • Buffer size: 128

  • S3 compression: Disabled

You don’t need to enable compression. Snappy compression was enabled automatically when you chose Record format conversion. To choose another compression method, or to disable data compression, use the AWS CLI.

  • S3 encryption: Disabled


Settings

  • CloudWatch error logging: Enabled


Scroll up and click on Save.


Feed Kinesis Data Stream

Move Surveillance.csv to the directory storing KinesisProducer-C&F.py

Shell
1
python3 KinesisProducer-C\&F.py format


Validation Test

Kinesis Monitoring

Services -> Kinesis -> Data Streams

Click on kinesis-transform-demo
Click on Monitoring tab
You can also click custom on the top of the cluster charts and select Local timezone
See Get records - sum (Count)


Services -> Kinesis -> Delivery streams

Click on kinesis-transform-firehose-demo
Click on Monitoring tab
You can also click custom on the top of the cluster charts and select Local timezone
See Records read from Kinesis Data Streams (Sum)


Services -> S3

Go to your bucket.
Then click on csv-transform2021 fold. (May need several minutes for generating)


Download the parquet file.


Install parquet-tools

Shell
1
brew install parquet-tools


Use parquet-tools to open the parquet file

Shell
1
2
cd ~/Downloads
parquet-tools cat --json *.parquet


Conclusion

  1. We used AWS Glue Crawler for crawling the .csv file and got the table schema.
  2. We used Glue ETL job to transform the .csv file to .json file.
  3. We used AWS Glue Crawler again for crawling the .json file schema.
  4. We uploaded the Python 2.7 Lambda code.
  5. We set up the Kinesis.
  6. We invoked the Python Kinesis producer transform function to sent the Surveillance.csv file.
  7. Kinesis Data Streams received the Surveillance.csv then transferred it to Kinesis Data Firehose.
  8. Kinesis Data Firehose received the .csv file then transformed it to .json file based on the .json schema we got at step 3.
  9. In the future, we could automatically transform the same type data since we already have the schema and Kinesis workflow.

Kinesis ClickStream Analysis

Create real-time clickstream sessions and run analytics with Amazon Kinesis Data Analytics, AWS Glue, and Amazon Athena

Region: us-east-1


Lab Details

  • Kinesis streaming of web log data through Kinesis Data Streams
  • Kinesis Data Firehose transforms and delivers to S3
  • Athena catalogs and queries in real time
  • QuickSight visualize web log analytics

  • Data ingestion: You can use Kinesis Data Streams to build custom applications that process or analyze streaming data for specialized needs. Kinesis Data Streams can continuously capture and store terabytes of data per hour from hundreds of thousands of sources, such as website clickstreams, financial transactions, social media feeds, IT logs, and location-tracking events.
  • Data sessionization: Kinesis Data Analytics is the easiest way to process streaming data in real time with standard SQL without having to learn new programming languages or processing frameworks. With Kinesis Data Analytics, you can query streaming data or build entire streaming applications using SQL, so that you can gain actionable insights and respond to your business and customer needs promptly.
  • Data processing and storage: The sessionization stream is read from Kinesis Data Analytics using an AWS Lambda function. The function triggers two events: one real-time dashboard in Amazon CloudWatch and a second one to persist data with Kinesis Data Firehose.
  • Data analysis: AWS Glue is used to crawl Amazon S3 and build or update metadata definition for Amazon Athena tables.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena provides connectivity to any application using JDBC or ODBC drivers.

  • Data visualization: Amazon QuickSight is a visualization tool that is natively used to build dashboards over Amazon Athena data.
  • Monitoring: Amazon CloudWatch is a tool that lets you monitor the streaming activities, such as the number of bytes processed or delivered per second, or the number of failures.

Task Details

  1. Web Logs in JSON format generated by Python will be feeded to Kinesis Data Streams
  2. Set Kinesis Data Streams destination as Kinesis Data Firehose
  3. Kinesis Data Firehose transforms JSON data to Parquet data and places them onto S3 bucket
  4. Use Athena to query the Parquet data in S3 bucket
  5. Glue crawls the data through Athena and saves the schema in Glue Data Catalog
  6. Run analysis and visualization in real time by QuickSight that query data from Athena and S3 pipeline


Python ClickStream Generator

Create a Python program for sending stream data to Kinesis Data Streams.


To generate the workload, you can use a Python program with random values, simulating a beer-selling application.

The same user ID can have sessions on different devices, such as a tablet, a browser, or a phone application. This information is captured by the device ID. As a result, the data for the Python program payload has these parameters: a user ID, a device ID, a client event, and a client timestamp, as shown in the following example.

KinesisClickStreamGenerator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import sys
import json
import boto3
import random
import datetime
import pandas as pd

# Generate random data by package random
def getReferrer():
x = random.randint(1, 5) # randomly generate x from 1 to 5
x = x*50
y = x+30
data = {}
data['user_id'] = random.randint(x, y) # # randomly select a number between x and y as 'user_id'
data['device_id'] = random.choice(['mobile','computer', 'tablet', 'mobile', 'voice', 'AI-ML'])
data['client_event'] = random.choice(['auto_nav', 'product_checkout', 'electronics_product_detail', 'electronics_products', 'electronics_selection', 'electronics_cart'])
now = datetime.datetime.now()
str_now = now.isoformat()
data['client_timestamp'] = str_now
return data

# Send data to Kinesis Data Streams
kinesis = boto3.client('kinesis', region_name='us-east-1')
number_of_records = 100
record_count = 0
while record_count < number_of_records:
data = json.dumps(getReferrer())
print(data)
kinesis.put_record(
StreamName='ClickStream-Kinesis-Data-Streams',
Data=data,
PartitionKey='partitionkey'
)
record_count += 1

Kinesis Data Streams Configuration

Services -> Kinesis -> Data Streams
Create a Kinesis Data Stream for data ingestion.


Click on Create data stream


Data stream configuration

  • Data stream name: ClickStream-Kinesis-Data-Streams

Data stream capacity

  • Number of open shards: 1

Click on Create data stream


Athena Configuration

Services -> Athena
Use Athena to create a table that defines data structure and Glue Data Catalog


Set up Athena query result location in Amazon S3 (optional)

If you see Before you run your first query, you need to set up a query result location in Amazon S3. Learn more, Click on set up a query result location in Amazon S3

Choose a S3 bucket folder as the query result location, then click on Save


Create the table in default database and load the database from Parquet in S3 bucket.
Replace the LOCATION as your s3 bucket url.

1
2
3
4
5
6
7
8
9
CREATE EXTERNAL TABLE default.clickstream (
user_id string,
device_id string,
client_event string,
client_timestamp string
)
STORED AS parquet
LOCATION 's3://aws-labs-bucket/Kinesis/ClickStream-S3/'
TBLPROPERTIES ('parquet.compression'='SNAPPY')

Click on Run query

You can see the new table clickstream under database default

You can also see the Glue table is ready.


S3 Configuration

Create a folder ClickStream-S3 in S3 bucket.


Kinesis Data Firehose Configuration

Services -> Kineses -> Delivery systems

Kinesis Data Streams receives and ingests the ClickStream data, then send it to Kinesis Data Firehose.
Kinesis Data Firehose deliveries the data to S3 bucket.


Click on Create delivery stream

Step 1: Name and source
  • Delivery stream name: ClickStream-Kinesis-Data-Firehose
  • Source: Kinesis Data Stream
  • Kinesis data stream: ClickStream-Kinesis-Data-Streams

Click on Next


Step 2: Process records
  • Data transformation: Disabled


  • Record format conversion: Enabled
  • Output format: Apache Parquet
  • AWS Glue region: us-east-1
  • AWS Glue database: default
  • AWS Glue table: clickstream
  • AWS Glue table version: Latest

Click on Next


Step 3: Choose a destination
  • Destination: Amazon S3
  • S3 bucket: choose your bucket


Use the folder path as S3 bucket prefix.

  • Backup S3 bucket prefix - optional: Kinesis/ClickStream-S3/ClickStream-Kinesis-Data-Streams
  • Backup S3 bucket error prefix - optional: Kinesis/ClickStream-S3/ClickStream-Kinesis-Data-Streams-error

Click on Next


Step 4: Configure settings

Leave all settings as default

Click on Next


Step 5: Review

Click on Create delivery stream


QuickSight Configuration

Services -> QuickSight
Use QuickSight to connect to Athena, and run analysis and visualization.


Connect to Athena

Click on Datasets on navigation panel


Click on New dataset on the right and top page


Click on Athena


  • Data source name: ClickStream-Athena

Click on Validate connection

Click on Create data source


  • Catalog: AwsDataCatalog
  • Database: default
  • Tables: clickstream

Click on Edit/Preview data


Edit/Preview data

You can see we have nothing in the database

Run KinesisClickStreamGenerator.py on your local machine

Kinesis Data Streams (ClickStream-Kinesis-Data-Streams): Received data.

Kinesis Data Firehose (ClickStream-Kinesis-Data-Firehose): Received data, 100 records.
You may have to wait a minute for Kinesis Data Firehose to receive the data from Kinesis Data Streams.

S3 (s3://aws-labs-bucket/Kinesis/ClickStream-S3/):
You may have to wait a minute for Kinesis Data Firehose to save the records in S3.


Back to QuickSight window.
Refresh the web page, you can see QuickSight has already loaded the data in S3 through Athena.
Click on Save & visualize on the top and middle page


Pie Chart
  • Visual types: AutoGraph
  • Group/Color: client_event


AutoGraph
  • Visual types: AutoGraph
  • Y axis: client_event

You can see the Histogram of client_event

Try to modify the ClickStream-Kinesis-Data-Streams and let number_of_records = 200, then save the program.

Run ClickStream-Kinesis-Data-Streams again.

Wait several minutes and refresh the QuickSight page.
See we have 300 records now.


Kinesis Data Analytics

Kinesis Data Analytics Lab

Core Components

  • Kinesis
    • Kinesis Data Streams
    • Kinesis Data Analytics
    • Kinesis Data Firehose
  • S3
  • QuickSight

Lab Details

  • Kinesis
    • Kinesis Data Streams: Collect streaming data
    • Kinesis Data Analytics: Get actionable insights from streaming data with serverless Apache Flink
    • Kinesis Data Firehose: Ingest streaming data
  • S3: Data storage
  • QuickSight: Analyze and visualize data

Task Details

  • Kinesis Data Analytics with Kinesis Data Streams, Kinesis Data Firehose, and QuickSight
    • Create a Kinesis Data Analytics application
    • Create a Kinesis Data Streams stock ticker stream
    • Create a Kinesis Firehose delivery stream to store on S3
    • Create manifest file for our S3 data store
    • Use QuickSight to visualize our stock ticker data

Diagram


Kinesis Data Firehose Configuration

Services -> Kinesis


Click on Create delivery stream


Step 1: Name and source
  • Delivery stream name: kinesis-analytics-demo-firehose
  • Source: Direct PUT or other sources

Click on Next


Step 2: Process records
  • Data transformation: Disabled
  • Record format conversion: Disabled

Click on Next


Step 3: Choose a destination
  • Destination: Amazon S3
  • S3 bucket: choose your bucket

  • Backup S3 bucket prefix - optional: kinesis-analytics-demo
  • Backup S3 bucket error prefix - optional: kinesis-analytics-demo-error

Click on Next


Step 4: Configure settings
  • Buffer interval: 60

Click on Next


Step 5: Review

Click on Create delivery stream


Kinesis Data Analytics Configuration

Services -> Kinesis


Click on Analytics applications


Click on Create application


Create a Data Analytics Application
  • Application: TickerApp
  • Runtime: SQL

Click on Create application


Step 1: Source
Connect streaming data

Click on Connect streaming data for data feeding.


If you select Choose source, you can connect to previous Kinesis Data stream or Kinesis Firehose delivery stream

Select Configure a new stream for create a new stream.
We will go with Create a demo stream


Create a demo stream

AWS Data will do the following steps automatically for creating a demo stream:

  1. Create/update IAM role kinesis-analytics-TickerApp-us-east-1
  2. Create Kinesis stream kinesis-analytics-demo-stream (takes on average 30-40 seconds)
  3. Begin populating stream kinesis-analytics-demo-stream with sample stock ticker data
  4. Discover schema: capture a stream sample, identify data format, apply schema
  5. Select stream kinesis-analytics-demo-stream from your streams

You can see kinesis-analytics-demo-stream on Data Streams page


Record pre-processing with AWS Lambda
Kinesis Data Analytics can invoke your Lambda function to pre-process records before they are used in this application. To pre-process records, your Lambda function must be compliant with the required record transformation output model.

  • Record pre-processing: Disabled

Access permissions
Create or choose IAM role with the required permissions.

  • Access permissions: Create / update IAM role kinesis-analytics-TickerApp-us-east-1


Schema
Schema discovery can generate a schema using recent records from the source. Schema column names are the same as in the source, unless they contain special characters, repeated column names, or reserved keywords.

Click on Save and continue


Step 2: Real time analytics

Click on Go to SQL editor

The SQL editor is much more powerful when your application is running.

  • See samples from your source data stream
  • Get feedback on any errors in your configuration or SQL
  • Watch as your data is processed in real-time by your SQL code

Click on Yes, start application


Click on Add SQL from templates


Select Continuous filter

Continuous filter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- ** Continuous Filter ** 
-- Performs a continuous filter based on a WHERE condition.
-- .----------. .----------. .----------.
-- | SOURCE | | INSERT | | DESTIN. |
-- Source-->| STREAM |-->| & SELECT |-->| STREAM |-->Destination
-- | | | (PUMP) | | |
-- '----------' '----------' '----------'
-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
-- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), sector VARCHAR(12), change REAL, price REAL);
-- Create pump to insert into output
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Select all columns from source stream
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
-- LIKE compares a string to a string pattern (_ matches all char, % matches substring)
-- SIMILAR TO compares string to a regex, may use ESCAPE
WHERE sector SIMILAR TO '%TECH%';

Click Add this SQL to the editor


Click Save and run SQL


Click on Destination tab
We are going to send the in-application stream to Kinesis Firehose Delivery System.
Click on Connect to a destination


Step 3: Destination optional
  • Destination: Kinesis Firehose delivery system

If you choose a Kinesis Data Firehose connected to a Kinesis Data Streams. You will see the following error:
Delivery stream *** is connected to Kinesis data stream ***. Delivery streams that receive records from a Kinesis data stream cannot connect to an Analytics application. Instead, choose the connected Kinesis data stream.

It means you need to let the Kinesis Data Analytics App connecting to Kinesis Data Firehose directly. The Data transferring path is: Kinesis Data Streams -> Kinesis Data Analytics -> Kinesis Data Firehose


  • Kinesis Firehose delivery system: kinesis-analytics-demo-firehose


In-application stream

In-application streams are continuous flows of data records. You create in-application streams in SQL to contain the data you want to persist to the specified destination.

  • Connect in-application stream: Choose an existing in-application stream
  • In-application stream name: DESTINATION_SQL_STREAM
  • Output format: JSON

Click on Save and continue


Check the Real time analytics

Click on Go to SQL results


Application status: RUNNING
The Kinesis Data Analytics App is receiving data, so we can go check the Kinesis Data Firehose (kinesis-analytics-demo-firehose) Monitoring


And Kinesis Data Firehose (kinesis-analytics-demo-firehose) has storing data in S3 bucket


Open the folder to see what is inside.

Download the file.

Open the JSON file.
Remember we used the SQL query for continuously filtering WHERE sector SIMILAR TO '%TECH%';, that’s why we can only see the TECHNOLOGY` tickers.


Back to the S3 bucket.

Click one of the file to see the detail.

Copy the object S3 URI


On your local machine, create a file kinesis-analytics-demo2021.json
Then copy and past the following JSON code and replace the URIs field to your object S3 URI.

JSON manifest file
1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"fileLocations": [
{
"URIs": [
"s3://aws-labs-bucket/kinesis-analytics-demo2021/01/28/04/kinesis-analytics-demo-firehose-1-2021-01-28-04-05-06-1bdaf2a5-819d-472d-b15f-a26987b68e7c"
]
}
],
"globalUploadSettings": {
"format": "JSON",
"textqualifier": "'",
"containsHeader": "true"
}
}


Go to kinesis-analytics-demo2021 folder and click on Upload.


Click on Add files and select kinesis-analytics-demo2021.json


Select kinesis-analytics-demo2021.json then click on Upload


Click on kinesis-analytics-demo2021.json


Copy S3 URI


QuickSight Configuration

Services -> QuickSight

You may need to create an account and login.


Connect to S3

Click on Datasets on navigation panel


Click on New dataset on the right and top page


Click on S3


  • Data source name: TickerData

Click on URL and enter your JSON manifest file (mine is kinesis-analytics-demo2021.json) URL.
Click on Connect


Click on Edit/Preview data


Edit/Preview data

Click on Save & visualize on the top and middle page


Cluster bar combo chart
  • Visual types: Cluster bar combo chart
  • X axis: TICKER_SYMBOL
  • Bars: PRICE (Sum)
  • Group/Color for bars: SECTOR
  • Lines: CHANGE (Sum)

You can hold cursor on any bar to see the detail


Stop Feeding Data

Services -> Kinesis -> Data Analytics


Select TickerApp, then click on Actions -> Stop

Click on Stop

Status: Stopping

Wait unit the Status changed to Stopped