AWS EMR

Introduction

Easily run and scale Apache Spark, Hive, Presto, and other big data frameworks

Amazon EMR

Amazon EMR is the industry-leading cloud big data platform for processing vast amounts of data using open source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto. Amazon EMR makes it easy to set up, operate, and scale your big data environments by automating time-consuming tasks like provisioning capacity and tuning clusters. With EMR you can run petabyte-scale analysis at less than half of the cost of traditional on-premises solutions and over 3x faster than standard Apache Spark. You can run workloads on Amazon EC2 instances, on Amazon Elastic Kubernetes Service (EKS) clusters, or on-premises using EMR on AWS Outposts.


EMR Components

  • EMR is built upon clusters (or collections) of E2 instances
    • The EC2 instances are called nodes, all of which have roles (or node type) in the cluster
    • EMR installs different software components on each node type, defining the node’s role in the distributed architecture of EMR
    • Three types of nodes in an EMR cluster
      • Master node: manages the cluster, running software components to coordinate the distribution of data and tasks across other nodes for processing
      • Core node: has software components that run tasks and store data in the Hadoop Distributed File System (HDFS) on your cluster
      • Task node: node with software components that only runs tasks and does not store data in HDFS

EMR Cluster


EMR Cluster - Work

  • Options for submitting work to your EMR cluster
    • Script: Script the work to be done as functions that you specify in the steps you define when you create cluster
      • Short-term: This approach is used for clutters that process data and then terminate
    • Console: Build a long-running cluster and submit steps (containing one or more jobs) to it via the console, the EMR API, or the AWS CLI
      • Long-term: This approach is used for clusters that process dat continuously or need to remain available
    • SSH: Create a cluster and connect to the master node and/or other nodes as required using SSH
      • This approach is used to perform tasks and submit queries, either scripted or interactively, via the interfaces of the installed applications

SSH to the Master node than manage other nodes


EMR Cluster - Processing Data

  • At launch time, you choose the frameworks and applications to install to achieve your data processing needs
  • You submit jobs or queries to installed applications or run steps to process data in your EMR cluster
    • Submitting jobs/steps to installed applications
      • Each step is a unit of work that has instructions to process data by software installed on the cluster
      • Example:
        • Submit a request to run a set of steps


EMR Cluster - Lifecycle

  • Provisions EC2 instances of the cluster
  • Runs bootstrap actions
  • Installs applications such as Hive, Hadoop, Sqoop, Spark
  • Connect to the cluster instances; cluster sequentially runs any steps that specified at creation; submit additional steps
  • After steps complete the cluster waits or shuts down, depending on config
  • When all instances are terminated, the cluster moves to COMPLETED state


EMR Architecture

EMR Architecture - Storage

  • Architected in layers
    • Storage: file systems used by the cluster
      • HDFS: distributed the data it stores across instances in the cluster (ephemeral)
      • EMR File System (EMRFS): directly access data stored in S3 as if it were a file system like HDFS
      • Local file system: EC2 locally connected disk

What is HDFS?
HDFS is a distributed file system that handles large data sets running on commodity hardware. It is used to scale a single Apache Hadoop cluster to hundreds (and even thousands) of nodes. HDFS is one of the major components of Apache Hadoop, the others being MapReduce and YARN. HDFS should not be confused with or replaced by Apache HBase, which is a column-oriented non-relational database management system that sits on top of HDFS and can better support real-time data needs with its in-memory processing engine.


EMR Architecture - Cluster Management

  • YARN (Yet Another Resource Manager)
    • Centrally manage cluster resources
    • Agent on each node that keeps the cluster healthy, and communicates with EMR
    • EMR defaults to scheduling YARN jobs so that jobs won’t fail when task nodes running on spot instances are terminated


EMR Architecture - Data Processing Frameworks

  • Framework layer that is used to process and analyze data
    • Different frameworks available
      • Hadoop MapReduce
        • Parallel distributed applications that use Map and Reduce functions (e.g. Hive)
          • Map function maps data to sets of key-value pairs
          • Reduce function combines the key-value pairs and processes the data
      • Spark
        • Cluster framework and programming model for processing big data workloads


EMR Architecture - Application Programs

  • Supports many application programs
  • Create processing workloads
  • Use machine learning algorithms
  • Create stream processing apps
  • Build data warehouses and data lakes
Application Description
Flink Framework and distributed processing engine for stateful computations over unbounded and bounded data streams
Ganglia Distributed system designed to monitor clusters and grids while minimizing the impact on their performance
Hadoop Framework that allows for the distributed processing of of large data sets across clusters of computers
HBase Non-relational distributed database modeled after Google’s Bigtable
HCatalog Table storage management tool for Hadoop that expose the tabular data of Hive metastore to other Hadoop applications
JupyterHub Serve Jupyter notebooks for multiple users
Livy Enables easy interaction with a Spark cluster over a REST interface
Mahout Create scalable ML algorithms
MXNet Deep learning software framework, used to train, and deploy deep neural networks
Phoenix Massively parallel, relational database engine supporting OLTP for Hadoop using Apache HBase as its backing store
Pig High-level platform for creating programs that run on Apache Hadoop. The language for this platform is called Pig Latin
Spark Distributed general-purpose cluster-computing framework
Sqoop Command-line interface application for transferring data between relational database and Hadoop
Tez Extensible framework for building high performance batch and interactive data processing applications, coordinated by YARN in Apache Hadoop
TensorFlow Machine learning library
Zeppelin Web-based notebook for data-driven analytics
ZooKeeper Centralized service for maintain configuration information, naming, providing distributed synchronization, and providing group services
Hive A SQL-like interface to query dat stored in various databases and file systems that integrate with Hadoop
Hue Web interface for analyzing data with Hadoop
Oozie Server-based workflow scheduling system to manage Hadoop jobs
Presto High performance, distributed SQL query engine for big data

EMR Application

AWS EMR Application Lab


EMR Application and Categories

  • EMR supports many hadoop application
    • Spark
    • Hive
    • Presto
    • HBase
  • Data scientists and engineers use EMR to run analytics workflows using these tools along with Hue and EMR Notebooks
  • Several important categories of EMR applications
    • Data Processing
    • SOL
    • NoSQL
    • Interactive Analytics

Data Processing Applications

  • Apache Spark
    • Hadoop ecosystem engine used process large data sets very quickly
    • Runs fault-tolerant resilient distributed datasets (RDDs) in-memory, and defines data transformations
    • Includes Spark SQL, Spark Streaming, MLlib, and GraphX
  • Apache Flink
    • Streaming dataflow engine that allows you to run real-time stream processing on high-throughput data sources
    • Supports APIs optimized for writing both streaming and batch applications

SQL Applications

  • Apache Hive
    • Data warehouse and analytics package that runs on top of Hadoop
    • Allows you to structure, summarize, and query data
    • Supports map/reduce functions and complex extensible user-defined data types like JSON and Thrift
    • Can process complex and unstructured data sources such as text documents and log files
  • Presto
    • SQL query engine optimized for low-latency, ad-hoc analysis of data
    • Can process data from multiple data sources including the Hadoop Distributed File System (HDFS) and Amazon S3

NoSQL Applications

  • Apache HBase
    • Non-relational, distributed database modeled after Google’s BigTable
    • Runs on top of Hadoop Distributed File System (HDFS) to provide BigTable-like capabilities for Hadoop
    • Store large amounts of sparse data using column-based compression and storage
    • Use S3 as a data store for HBase, enabling you to lower costs and reduce operational complexity

Interactive Analytics Applications

  • Hue
    • User interface for Hadoop that allows you to create and run Hive queries, manage files in HDFS, create and run Pig scripts, and manage tables
    • Also integrates with S3, so you can query directly against S3 and easily transfer files between HDFS and Amazon S3
  • EMR Notebooks
    • Notebooks pre-configured for Spark that are based on Jupyter notebooks
    • Interactively run Spark jobs on EMR clusters written in PySpark, Spark SQL, Spark R, and Scala

EMR Optimization

Optimizing EMR

  • Based on your workload you have several options for optimizing your EMR cluster
    • Design options
      • Instance type
      • Instance configuration
      • HDFS capacity
      • Dynamic scaling
      • Run multiple steps in parallel

EMR Instance Types

  • Ways to add EC2 instances to your cluster
    • Instance Groups or Instance Fleets
      • Uniform instance Groups
        • Specify a single instance type and purchasing option ofr each node type
        • Manually add instances of the same type to existing core and task instance groups
        • Manually add a task in stance group, can use a different instance type
        • Automatic scaling for an instance group based on the value of a CloudWatch metric specified by you
      • Instance Fleets
        • Specify target capacity and how Amazon EMR fulfills it for each node type. Mix instance types and purchasing options.
        • Add a single task instance fleet
        • Change the target capacity for On-Demand and Spot Instances for existing core and task instance fleets


Best Practices
  • Best practices
    • Plan your instance capacity
      • Run a test cluster using a representative sample data set and monitor the node utilization
      • Calculate instance capacity and compare that value against the size of your data
    • Master node doesn’t require high computational capacity
    • Most EMR clusters can run on m5.xlarge or m4.xlarge
    • Data processing load performance depends on the capacity of your core nodes and data size as input, during processing, and as output

EMR Instance Configuration

  • Long-running clusters and data warehouses
    • Persistent EMR cluster (e.g. Data Warehouse)
      • Master and core instance groups as on-demand instances
      • Task instance groups as spot instances
    • Cost-driven workloads: low cost, partial work loss OK
      • Transient clusters
      • Run all groups, master, core, and task instance groups as spot instances
    • Data-critical workloads: low cost, partial work loss NOT OK
      • Master and core instance groups as on-demand, task instance groups as spot
    • Test environment: all instance groups on spot
Cluster Type Master Node Core Node Task Node
Persistent EMR cluster On-demand On-demand or instance fleet mix Spot or instance fleet mix
Cost-driven workloads Spot Spot Spot
Data-critical workloads On-demand On-demand Spot or instance fleet mix
Test Environment Spot Spot Spot

EMR HDFS Capacity

What is HDFS?
HDFS is a distributed file system that handles large data sets running on commodity hardware. It is used to scale a single Apache Hadoop cluster to hundreds (and even thousands) of nodes. HDFS is one of the major components of Apache Hadoop, the others being MapReduce and YARN. HDFS should not be confused with or replaced by Apache HBase, which is a column-oriented non-relational database management system that sits on top of HDFS and can better support real-time data needs with its in-memory processing engine.

  • To calculate the storage allocation for your cluster consider the following
    • Number of EC2 instances used for core nodes
    • Capacity of the EC2 instance store for the instance type used
    • Number and size of EBS volumes attached to core nodes
    • Replication factor: how each data block is stored in HDFS for RAID-like redundancy
      • RAID 3: core nodes ≥ 10
      • RAID 2: 4 ≤ core nodes ≤ 9
      • RAID 1: core nodes ≤ 3
    • HDFS capacity of your cluster
      • For each core node, add instance store volume capacity to EBS storage capacity
      • Multiply by the number of core nodes, then divide the total by the replication factor

EMR Dynamic Scaling

  • Dynamically scales nodes in your cluster based on demand
    • On scale down of task nodes on a running cluster expect a short delay for any running Hadoop to decommission
    • On scale down of core nodes EMR waits for HDFS to decommission to protect your data
    • Changing configuration improperly on a cluster with high load can seriously degrade cluster performance

Run Multiple Steps in Parallel

  • Allows for parallel processing and greater speed
  • Considerations
    • Use EMR automatic scaling to scale up/down based on the YARN resources to prevent resource contention
    • Running multiple steps in paralleled requires more memory and CPU utilization from the master node than running one step at a time
    • Use YARN scheduling features such as FairScheduler or Capacity Scheduler to run multiple steps in parallel
    • If you run out of resources because the cluster is running too many concurrent steps, manually cancel any running steps to free up resources

EMR Monitoring


Use CloudWatch Metrics to Manage EMR Cluster

  • EMR metrics updated every 5 minutes, collected and pushed to CloudWatch
    • Non-configurable metric limiting
    • Metrics archived for two weeks then discard
  • EMR metrics uses
Use Case Metrics
Track progress of cluster RunningMapTasks, RemainingMapTasks, RunningReduceTasks, and RemainingReduceTasks metrics
Detect idle clusters IsIdele metric tracks if a cluster is live, but not currently running tasks. Set an alarm to fire when the cluster has been idle for a given period of time
Detect when a node runs out of storage HDFSUtilization metric gives the percentage of disk space currently used. If it rises above an acceptable level for your application, such as 80% of capacity used, you take action to resize your cluster and add more core nodes

View and Monitor EMR Cluster

  • EMR has several tools for retrieving information about your cluster
    • Console, CLI, API
    • Hadoop web interfaces and logs on Master node
    • Use monitoring services like CloudWatch and Ganglia to track the performance of your cluster
    • Application history available through persistent application UIs for Spark History Server, persistent YARN timeline server, and Tez user interfaces


Leverage EMR API Calls in CloudTrail

  • CloudTrail holds a record of actions taken by users, roles, or an AWS service in EMR
    • Captures all API calls for EMR as events
    • Enable continuous delivery of CloudTrail events to an S3 bucket
    • Determine the EMR request, the IP address from which the request was made, who made the request, when it was made, and additional details


EMR Automation

Step Functions Directly with EMR


Orchestration or Workflows

  • Coordinating your ETL jobs across Glue, EMR, and Redshift
    • With orchestration you can automate each step in your workflow
      • Retry on errors
      • Find and recover jobs that fail
      • Track the steps in your workflow
      • Build repeatable workflows
    • Respond to state change on EMR cluster
    • Use CloudWatch metrics to manage your EMR cluster
    • View and monitor your EMR cluster
    • Leverage EMR API calls in CloudTrail
    • Orchestrate Spark, Redshift, and EMR workloads using Step Functions


Respond to State Changes on EMR Cluster

  • Trigger create, terminate, scale cluster, run Spark, Hive, or Pig workloads based on Cluster state changes
    • EMR CloudWatch events support notify you of state changes in your cluster
    • Respond to state changes programmatically
    • EMR CloudWatch events
      • Cluster State Change
      • Instance Group and Instance Fleet State Change
      • Step State Change
      • Auto Scaling State Change
      • Create filters and rules to match events and rout them to SNS topics, Lambda functions, SQS queues, Kinesis Streams


Orchestrate Spark and EMR Workloads using Step Functions

  • Ways to manage EMR Steps
    • Use Apache Oozie or Apache Airflow scheduler tools for EMR Spark applications
    • Use Step Functions and interact with Spark applications on EMR using Apache Livy
    • Directly connect Step Functions to EMR
      • Create data processing and analysis workflows with minimal code and optimize cluster utilization


Operationalize data processing with Glue and EMR Workflows

Orchestration of Glue and EMR Workflows

  • Several ways to operationalize Glue and EMR
    • Glue workflows
    • Automate workflow using Lambda
    • Step Functions with Glue
    • Step Functions with EMR and Apache Livy
    • Step Functions directly with EMR


Step Functions with EMR and Apache Livy

AWS Step Functions

  • Step Functions state machine starts executing, and the input file path of the data stored in S3 is passed to the state machine
  • First step in the state machine triggers a Lambda function
  • Lambda function interacts with Apache Spark sunning on EMR using Apache Livy, and submits a Spark job
  • State Machine checks the Spark job status
  • Based on the job status, the state machine moves to the success of failure state
  • Output data from the Spark job is stored in S3

AWS Step Functions
Call Amazon EMR with Step Functions
Supported AWS Service Integrations for Step Functions
Request Response
Run a Job
Wait for a Callback with the Task Token

  • Step Functions now interacts directly with EMR
  • Use Step Functions to control all steps in your EMR cluster operation
  • Also integrate other AWS services into your workflow

EMR Network Security

EMR VPC Isolation Lab

  • Need to secure the physical boundary of your analytics services using network isolation
  • Use VPC to achieve network isolation
  • Example: EMR cluster where the master, core, and task nodes are in a private subnet using NAT to perform outbound only internet access
  • Also, use security groups to control inbound and outbound access from your individual EC2 instances
  • With EMR use both EMR-managed security groups and additional security groups to control network access to your instance


EMR - Managed Security Groups

  • Every EMR cluster has managed security groups associated with it, ether default or custom
  • EMR automatically adds rules to managed security groups used by the cluster to communicate between cluster instances and AWS services
  • Rules that EMR creates in managed security groups allow the cluster to communicate among internal components
  • To allow users and applications to access a cluster from outside the cluster, edit rules in managed security groups
    • Editing rules in managed security groups may have unintended consequences; may inadvertently block the traffic required for clusters to function properly
  • Specify security groups only on cluster create, can’t add to a cluster or cluster instances while a cluster is running
  • Can edit, add, and remove rules from existing security groups, the rules take effect as soon as you save them

EMR - Additional Security Groups

  • Additional security groups are optional
  • Specify in addition to managed security groups to tailor access to cluster instances
  • Contain only rules that you define, EMR does not modify them
  • To allow users and applications to access a cluster from outside the cluster, create additional security groups with additional rules
  • Separate the rules that you defined from EMR - Managed Security Groups, which is easier for managing.

EMR - VPC Options - Security Groups

  • Security groups
    • At EMR cluster launch, must specify one EMR-managed security group for the master instance, one EMR-managed security group for the core/task instances, and optionally, one EMR-managed security group for the EMR resources used to manage clusters in private subnets.
  • The core/task instances share the same security group.


EMR - VPC Options - S3 Endpoints & NAT Gateway

  • S3 Endpoints
    • All instances in a cluster connect to S3 through either a VPC endpoint or internet gateway.
    • Create a private endpoint for S3 in your subnet to give your EMR cluster direct access to data in Amazon S3.
  • NAT Gateway
    • Other AWS services which do not currently support VPC endpoints use only an internet gateway.


EMR Encryption and Tokenization


Encryption at Rest and In Transit EMR


Encryption and Tokenization

  • Protection data against data exfiltration (unauthorized copying and/or transferring data) and unauthorized access
  • Different ways to protect data
    • Use a hardware security module (HSM) to manage the top-level encryption keys
    • Encryption at rest
    • Encryption in transit
    • Alternative to encryption to secure highly sensitive subnets of data from compliance purpose
    • Secrets management


HSM

  • Use HSM if you require your keys stored in dedicated, third-party validate hardware security modules under your exclusive control
  • If you need to comply to FIPS 140-2, which is a security approval program run by the U.S government
  • If you need high-performance in -VPC cryptographic acceleration (bulk crypto)
  • If you need a multi-tenant, managed service that allows you to use and manage encryption keys


Encryption at Rest and In Transit - EMR

  • Prevent unauthorized users from reading data on a cluster and associated data storage systems
  • Encryption at rest: data saved to persistent media
  • Encryption in transit: data that may be intercepted as it travels the network
  • Use EMR security configurations to configure data encryption settings for clusters
    • Enable security for data in-transit and data at-rest in EBS volumes and EMRFS on S3
  • EMR release version 4.1.0 and later, you can configure transparent encryption in HDFS, which is not configured using security configurations


Tokenization

  • Protect certain elements in the data that contains high sensitivity or a specific regulatory compliance requirement, such as PCI
  • Separates sensitive data into its own dedicated, secured data store
  • Use tokens to represent sensitive data instead of end-to-end encryption
  • Reduce risk with temporary, one-time-use tokens


Secrets Managements

  • Secrets Manager
    • Use secrets in application code to connect to databases, APIs, and other resources
    • Provides rotation, audit, and access control
  • Systems Manager Parameter Store
    • Centralized store to manage configuration data
    • Plain-text data such as database strings or secrets such as passwords
    • Does not rotate parameter stores automatically


EMR Security configuration

Encryption

Data encryption helps prevent unauthorized users from reading data on a cluster and associated data storage systems. This includes data saved to persistent media, known as data at rest, and data that may be intercepted as it travels the network, known as data in transit.


S3 encryption

Amazon S3 encryption works with EMR File System (EMRFS) objects read from and written to Amazon S3. Specify server-side encryption (SSE) or client-side encryption (CSE).

You can click on Per bucket encryption overrides -> Add bucket override for customize each S3 bucket encryption mode
But in this lab, we don’t need Per bucket encryption overrides, so click on X to close it.


Local disk encryption

Amazon EC2 instance store volumes and the attached Amazon Elastic Block Store (EBS) storage volumes are encrypted using Linux Unified Key Setup (LUKS). Alternatively, when using AWS KMS as your key provider, you can choose to turn on EBS encryption to encrypt EBS root device and storage volumes. AWS KMS customer master keys (CMKs) require additional permissions for EBS encryption


Data in transit encryption

Transport Layer Security (TLS) is essential for encrypting information that is exchanged on the internet. Turn on open-source TLS encryption features for in-transit data and choose a certificate provider type.


Authentication

Use Kerberos Authentication

Amazon EMR release version 5.10.0 and later supports Kerberos, which is a network authentication protocol created by the Massachusetts Institute of Technology (MIT). Kerberos uses secret-key cryptography to provide strong authentication so that passwords or other credentials aren’t sent over the network in an unencrypted format.

In Kerberos, services and users that need to authenticate are known as principals. Principals exist within a Kerberos realm. Within the realm, a Kerberos server known as the key distribution center (KDC) provides the means for principals to authenticate. The KDC does this by issuing tickets for authentication. The KDC maintains a database of the principals within its realm, their passwords, and other administrative information about each principal. A KDC can also accept authentication credentials from principals in other realms, which is known as a cross-realm trust. In addition, an EMR cluster can use an external KDC to authenticate principals.

A common scenario for establishing a cross-realm trust or using an external KDC is to authenticate users from an Active Directory domain. This allows users to access an EMR cluster with their domain user account when they use SSH to connect to a cluster or work with big data applications.

When you use Kerberos authentication, Amazon EMR configures Kerberos for the applications, components, and subsystems that it installs on the cluster so that they are authenticated with each other.

Enable Kerberos authentication for interactions between certain application components on your cluster using Kerberos principals. You can choose between having EMR install a KDC server on the master node of the cluster or you can share your own KDC details that EMR cluster can use.

KDC: A Kerberos Key Distribution Center (KDC) is installed on the master node of your cluster.


IAM roles for EMRFS

When an Amazon S3 request is made through EMRFS, each Basis for access is evaluated in order. EMRFS assumes the corresponding IAM role for the first match. Specify the cluster Users or Groups, or S3 prefixes as the Basis for access. If no Basis for access matches the request, EMRFS uses the cluster’s EMR role for EC2.


Authorization

  • Enable integration of AWS Lake Formation for fine-grained access control: Use corporate credentials together with Lake Formation permissions to control access to the Data Catalog and underlying data store. You must create a data lake and configure associated permissions in Lake Formation. You must also opt in at Lake Formation console to allow external data filtering on Amazon EMR.
    • IAM role for AWS Lake Formation: Enter the role that federated users assume when they access a Lake Formation data lake using EMR.
    • IAM role for other AWS services: Enter the role that federated users assume when they use EMR to access AWS resources through other AWS services. Do not attach permissions that Lake Formation manages to this role.
    • Identity provider (IdP) metadata: Enter the location of the file that contains your Identity provider (IdP) metadata.
  • Enable integration with Apache Ranger for fine-grained access control: Use the self-managed two IAM roles to separate the permission control for accessing EMR data and other AWS services. Specify roles below to define user access to protected resources
    • IAM role for Apache RangerProvide an IAM role for access governed by Apache Ranger access control policies.
    • IAM role for other AWS Services: Provide an IAM role for access governed by IAM permission only.

Labs

AWS EMR Lab