Udacity Spark

Reference

Udacity Spark by INSIGHT

GitHub


Lesson 1: Introduction to the Course


Introduction


According to the video, how much data is generated every day?
millions of bytes
1 billion bytes
2.5 quintillion bytes


Course Overview

  • Lesson 1: Big Data Ecosystem
  • Lesson 2: Data Wrangling
  • Lesson 3: Debugging & Optimization
  • Lesson 4: Machine Learning

Project Overview


Lesson 2: The Power of Spark


Introduction

Why Learn Spark?

Spark is currently one of the most popular tools for big data analytics. You might have heard of other tools such as Hadoop. Hadoop is a slightly older technology although still in use by some companies. Spark is generally faster than Hadoop, which is why Spark has become more popular over the last few years.

There are many other big data tools and systems, each with its own use case. For example, there are database system like Apache Cassandra and SQL query engines like Presto. But Spark is still one of the most popular tools for analyzing large data sets.

Here is an outline of the topics we are covering in this lesson:

  • What is big data?
  • Review of the hardware behind big data
  • Introduction to distributed systems
  • Brief history of Spark and big data
  • Common Spark use cases
  • Other technologies in the big data ecosystem

What is Big Data?


To test your current hardware knowledge, match each computer hardware component with the best corresponding description. Don’t worry if you’re not sure. You’ll get more information about this in the next few videos.

HARDWARE COMPONENT DESCRIPTION
Memory Short-term, quick data storage
Solid State Drive Long-term, safe data storage
Network Connection between computers
CPU Brain of the computer

Numbers Everyone Should Know

In the next few videos, you’ll learn about four key hardware components. Understanding these components helps determine whether you are working on a “big data” problem or if it’s easier to analyze the data locally on your own computer.

CPU (Central Processing Unit)
The CPU is the “brain” of the computer. Every process on your computer is eventually handled by your CPU. This includes calculations and also instructions for the other components of the compute.

Memory (RAM)
When your program runs, data gets temporarily stored in memory before getting sent to the CPU. Memory is ephemeral storage - when your computer shuts down, the data in the memory is lost.

Storage (SSD or Magnetic Disk)
Storage is used for keeping data over long periods of time. When a program runs, the CPU will direct the memory to temporarily load data from long-term storage.

Network (LAN or the Internet)
Network is the gateway for anything that you need that isn’t stored on your computer. The network could connect to other computers in the same room (a Local Area Network) or to a computer on the other side of the world, connected over the internet.

Other Numbers to Know?
You may have noticed a few other numbers involving the L1 and L2 Cache, mutex locking, and branch mispredicts. While these concepts are important for a detailed understanding of what’s going on inside your computer, you don’t need to worry about them for this course. If you’re curious to learn more, check out Peter Norvig’s original blog post from a few years ago, and an interactive version for today’s current hardware.


Rank the hardware component in order from fastest to slowest

SPEED HARDWARE COMPONENT
Fastest CPU
2nd Fastest Memory (RAM)
3nd Fastest Disk Storage (SSD)
Slowest Network

Hardware: CPU

The CPU is the brains of a computer. The CPU has a few different functions including directing other components of a computer as well as running mathematical calculations. The CPU can also store small amounts of data inside itself in what are called registers. These registers hold data that the CPU is working with at the moment.

For example, say you write a program that reads in a 40 MB data file and then analyzes the file. When you execute the code, the instructions are loaded into the CPU. The CPU then instructs the computer to take the 40 MB from disk and store the data in memory (RAM). If you want to sum a column of data, then the CPU will essentially take two numbers at a time and sum them together. The accumulation of the sum needs to be stored somewhere while the CPU grabs the next number.

This cumulative sum will be stored in a register. The registers make computations more efficient: the registers avoid having to send data unnecessarily back and forth between memory (RAM) and the CPU.


A 2.5 Gigahertz CPU means that the CPU processes 2.5 billion operations per second. Let’s say that for each operation, the CPU processes 8 bytes of data. How many bytes could this CPU process per second?
312.5 million bytes per second
3.2 billion bytes per second
20 billion bytes per second


Twitter generates about 6,000 tweets per second, and each tweet contains 200 bytes. So in one day, Twitter generates data on the order of:

(6000 tweets / second) x (86400 seconds / day) x (200 bytes / tweet) = 104 billion bytes / day

Knowing that tweets create approximately 104 billion bytes of data per day, how long would it take the 2.5 GigaHertz CPU to analyze a full day of tweets?
0.19 s
3.5 s
5.2 s
47 s
136 s


Hardware: Memory


With CPU and Memory, what else do we need?

It seems like the right combination of CPU and memory can help you quickly load and process data. We could build a single computer with lots of CPUs and a ton of memory. The computer would be incredibly fast.

What are the potential trade offs of creating one computer with a lots of CPUs and memory?

It’s true that you could build a computer with a ton of CPU and memory, which is effectively a supercomputer. However, as you’ll see in the next video, this approach has its downsides.

Watch the video below to learn about an alternative to making one computer with a lot of CPUs and memory.


Beyond the fact that memory is expensive and ephemeral, we’ll learn that for most use cases in the industry, memory and CPU aren’t the bottleneck. Instead the storage and network, which you’ll learn about in the next videos, slow down many tasks you’ll work on in the industry.

What are the limitations of memory (RAM)?
RAM is relatively expensive
You can only fit 8 GB of RAM onto a single computer
Data stored in RAM gets erased when the computer shut down
Operations in RAM are relatively inefficient compared to disk storage and network operations.


Storage


Network


Which component is the biggest bottleneck when working with big data?
Transferring data across a network
Reading to and writing from disk storage (SSD)
Reading to and writing from memory (RAM)
CPU operations


Key Ratios Review


Small Data Numbers


Big Data Numbers


What happened to your computer when you started to run the program with the larger data set? Keep in mind that in this current configuration, the data set is stored on your local computer and does not need to travel across a network (Select all that apply)

It took too long to move data across the network
The CPU couldn’t load data quickly from memory
The memory couldn’t load data quickly from storage
The CPU simply couldn’t handle the larger data set
The storage couldn’t load data quickly from the network
The CPU couldn’t load data quickly from the network



Medium Data Numbers

If a dataset is larger than the size of your RAM, you might still be able to analyze the data on a single computer. By default, the Python pandas library will read in an entire dataset from disk into memory. If the dataset is larger than your computer’s memory, the program won’t work.

However, the Python pandas library can read in a file in smaller chunks. Thus, if you were going to calculate summary statistics about the dataset such as a sum or count, you could read in a part of the dataset at a time and accumulate the sum or count.

Here is an example of how this works.


History of Distributed and Parallel Computing


Mark the statements that are true.

Generally speaking, distributed computing assumes that multiple CPUs are sharing a single source of memory.
Parallel computing is another way of saying distributed computing
In general parallel computing implies multiple CPUs share the same memory.
With distributed computing, each CPU has its own memory.
In distributed computing, each computer/machine is connected to the other machines across a network.


The Hadoop Ecosystem

Hadoop Framework

  • HDFS: Data Storage
  • MapReduce: Data Processing
  • Yarn: Resource Manager
  • Hadoop Common: Utilities

Other Big Data Projects:

  • Apache Pig: SQL for MapReduce
  • Apache Hive: SQL for MapReduce

Newer Distributed Data Technologies

  • Apache Spark
  • Apache Storm: Streaming Data
  • Apache Flink: Streaming Data

Hadoop Vocabulary

Here is a list of some terms associated with Hadoop. You’ll learn more about these terms and how they relate to Spark in the rest of the lesson.

  • Hadoop - an ecosystem of tools for big data storage and data analysis. Hadoop is an older system than Spark but is still used by many companies. The major difference between Spark and Hadoop is how they use memory. Hadoop writes intermediate results to disk whereas Spark tries to keep data in memory whenever possible. This makes Spark faster for many use cases.
  • Hadoop MapReduce - a system for processing and analyzing large data sets in parallel.
  • Hadoop YARN - a resource manager that schedules jobs across a cluster. The manager keeps track of what computer resources are available and then assigns those resources to specific tasks.
  • Hadoop Distributed File System (HDFS) - a big data storage system that splits data into chunks and stores the chunks across a cluster of computers.

As Hadoop matured, other tools were developed to make Hadoop easier to work with. These tools included:

  • Apache Pig - a SQL-like language that runs on top of Hadoop MapReduce
  • Apache Hive - another SQL-like interface that runs on top of Hadoop MapReduce

Oftentimes when someone is talking about Hadoop in general terms, they are actually talking about Hadoop MapReduce. However, Hadoop is more than just MapReduce. In the next part of the lesson, you’ll learn more about how MapReduce works.


Spark, which is the main focus of this course, is another big data framework. Spark contains libraries for data analysis, machine learning, graph analysis, and streaming live data. Spark is generally faster than Hadoop. This is because Hadoop writes intermediate results to disk whereas Spark tries to keep intermediate results in memory whenever possible.

The Hadoop ecosystem includes a distributed file storage system called HDFS (Hadoop Distributed File System). Spark, on the other hand, does not include a file storage system. You can use Spark on top of HDFS but you do not have to. Spark can read in data from other sources as well such as Amazon S3.


Streaming Data

Data streaming is a specialized topic in big data. The use case is when you want to store and analyze data in real-time such as Facebook posts or Twitter tweets.

Spark has a streaming library called Spark Streaming although it is not as popular and fast as some other streaming libraries. Other popular streaming libraries include Storm and Flink. Streaming won’t be covered in this course, but you can follow these links to learn more about these technologies.


MapReduce


Hadoop - MapReduce

MapReduce is a programming technique for manipulating large data sets. “Hadoop MapReduce” is a specific implementation of this programming technique.

3 Stages of MapReduce:

  1. The map or mapper’s job is to process the input data. It works by first dividing up a large dataset from HDFS and distributing the data across a cluster. In the map step, each line is analyzed and converted into a (key, value) pair. The mapper processes the data and creates several small chunks of data.
  2. Then these key-value pairs are shuffled across the cluster so that all keys are on the same machine.
  3. In the reduce step, the values with the same keys are combined together.
  • Generally MapReduce paradigm is based on sending the computer to where the data resides!

  • During a MapReduce job, Hadoop sends the Map and Reduce tasks to the appropriate servers in the cluster.

  • The framework manages all the details of data-passing such as issuing tasks, verifying task completion, and copying data around the cluster between the nodes.

  • Most of the computing takes place on nodes with data on local disks that reduces the network traffic.

  • After completion of the given tasks, the cluster collects and reduces the data to form an appropriate result, and sends it back to the Hadoop server.

While Spark doesn’t implement MapReduce, you can write Spark programs that behave in a similar way to the map-reduce paradigm. In the next section, you will run through a code example.


In the map-reduce paradigm, what happens in the shuffle step?
The data gets randomly shuffled for cross validation purposes.
Calculations are done to find the sum of all values with the same key.
Each value in the data set goes through some mathematical operation.
Data points with the same key get moved to the same cluster node.


MapReduce Demo

MapReduce Demo


The Spark Cluster


Spark Modes

  • Local Mode: for learning syntax and to prototype your projects. (Not Distributed Computing)
  • Cluster Mode: Sparks on Standalone Cluster Manager
  • Yarn: for the Hadoop project
  • Other open source manager from UCB AMPLab Coordinators.

In which cases would you use Local Mode?
When you are working with a computer cluster that only has two machines
For distributed computing across a cluster
When you are working with Spark installed on your own laptop
There is never a reason to use local mode since you’re not taking advantage of Spark’s ability to work on a cluster: you should be using Standalone, YARN, or Mesos


Spark Use Cases


Spark Use Cases and Resources

Here are a few resources about different Spark use cases:

You Don’t Always Need Spark

Spark is meant for big data sets that cannot fit on one computer. But you don’t need Spark if you are working on smaller data sets. In the cases of data sets that can fit on your local computer, there are many other options out there you can use to manipulate data such as:

  • AWK - a command line tool for manipulating text files
  • R - a programming language and software environment for statistical computing
  • Python PyData Stack, which includes pandas, Matplotlib, NumPy, and scikit-learn among other libraries

Sometimes, you can still use pandas on a single, local machine even if your data set is only a little bit larger than memory. Pandas can read data in chunks. Depending on your use case, you can filter the data and write out the relevant parts to disk.

If the data is already stored in a relational database such as MySQL or Postgres, you can leverage SQL to extract, filter and aggregate the data. If you would like to leverage pandas and SQL simultaneously, you can use libraries such as SQLAlchemy, which provides an abstraction layer to manipulate SQL tables with generative Python expressions.

The most commonly used Python Machine Learning library is scikit-learn. It has a wide range of algorithms for classification, regression, and clustering, as well as utilities for preprocessing data, fine tuning model parameters and testing their results. However, if you want to use more complex algorithms - like deep learning - you’ll need to look further. TensorFlow and PyTorch are currently popular packages.

Spark’s Limitations

Spark has some limitation.

Spark Streaming’s latency is at least 500 milliseconds since it operates on micro-batches of records, instead of processing one record at a time. Native streaming tools such as Storm, Apex, or Flink can push down this latency value and might be more suitable for low-latency applications. Flink and Apex can be used for batch computation as well, so if you’re already using them for stream processing, there’s no need to add Spark to your stack of technologies.

Another limitation of Spark is its selection of machine learning algorithms. Currently, Spark only supports algorithms that scale linearly with the input data size. In general, deep learning is not available either, though there are many projects integrate Spark with Tensorflow and other deep learning tools.

Hadoop versus Spark

The Hadoop ecosystem is a slightly older technology than the Spark ecosystem. In general, Hadoop MapReduce is slower than Spark because Hadoop writes data out to disk during intermediate steps. However, many big companies, such as Facebook and LinkedIn, started using Big Data early and built their infrastructure around the Hadoop ecosystem.

While Spark is great for iterative algorithms, there is not much of a performance boost over Hadoop MapReduce when doing simple counting. Migrating legacy code to Spark, especially on hundreds of nodes that are already in production, might not be worth the cost for the small performance boost.

Beyond Spark for Storing and Processing Big Data

Keep in mind that Spark is not a data storage system, and there are a number of tools besides Spark that can be used to process and analyze large datasets.

Sometimes it makes sense to use the power and simplicity of SQL on big data. For these cases, a new class of databases, know as NoSQL and NewSQL, have been developed.

For example, you might hear about newer database storage systems like HBase or Cassandra. There are also distributed SQL engines like Impala and Presto. Many of these technologies use query syntax that you are likely already familiar with based on your experiences with Python and SQL.

In the lessons ahead, you will learn about Spark specifically, but know that many of the skills you already have with SQL, Python, and soon enough, Spark, will also be useful if you end up needing to learn any of these additional Big Data tools.


Lesson 3: Data Wrangling with Spark


Introduction

In this lesson, you’ll practice wrangling data with Spark. If you are familiar with both SQL and Python’s pandas library, you’ll notice quite a few similarities with the Spark SQL module and Spark Dataset API.


Lesson Overview

  • Wrangling data with Spark
  • Functional programming
  • Read in and write out data
  • Spark environment and Spark APIs
  • RDD API

Manipulating Data Using Functional Programming

Spark

  • Written in Scala
  • Application Programming Interfaces in JAVA, R, Python


Why Use Functional Programming


In the video, David explains why Spark uses functional programming. He also implies that Python is not a functional programming language. How is it possible that Spark programs can be written in Python if Python is not a functional programming language? Mark all answers that are true.
You cannot write Spark programs in Python.
The PySpark API is written with functional programming principles in mind.
You can write Spark programs in Python using an API, but you need to be extra careful when writing Spark programs with the PySpark API because Python is an imperative language.
Spark is written in Scala. So the PySpark API first translates your code to Scala, which is a functional programming language.


Procedural Example


Procedural [Example Code]

Procedural [Example Code]


Pure Functions Analogy

Pure Functions: The functions that preserve their inputs and avoid side effects.


The Spark DAGs: Recipe for Data

Once Spark builds the DAG from your code, it checks if it can procrastinate, waiting until the last possible moment to get the data.

Stages


Maps and Lambda Functions

For more about the theory and origins of lambda functions, take a look at this blog post. Why are lambda functions called “lambda” functions?

According to legend, the inventor of Lambda Calculus, Alonzo Church, originally used the wedge symbol $\wedge$ as part of his notation. But the typsetter transcribing his manuscript used $\lambda$ instead. You can read more about it in the blog post.

sc: SparkContext


Maps and Lambda Functions [Example Code]

Maps and Lambda Functions [Example Code]

  • RDD: resilient distributed dataset
  • SparkContext: Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create L{RDD} and broadcast variables on that cluster.
    • appName: A name for your job, to display on the cluster web UI.
    • sc.parallelize: Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import pyspark
sc = pyspark.SparkContext(appName="maps_and_lazy_evaluation_example")

# Define big data file
log_of_songs = [
"Despacito",
"Nice for what",
"No tears left to cry",
"Despacito",
"Havana",
"In my feelings",
"Nice for what",
"despacito",
"All the stars"
]

# parallelize the log_of_songs to use with Spark
distributed_song_log = sc.parallelize(log_of_songs)

# Apply a lambda function using a map step
# Due to the DAG and Lay Evaluation, the function will not run now.
distributed_song_log.map(lambda song: song.lower())
# Apply a lambda function using a map step and run it.
# The collect() method takes the results from all of the clusters and "collects" them into a single list on the master node.
# In other words, you can check the rusults while developing.
distributed_song_log.map(lambda song: song.lower()).collect()

Data Formats


Distributed Data Stores

You can find more information about Amazon S3 and HDFS by following these links.


SparkSession

The SparkContext is the main entry point for Spark functionality and connects the cluster with the application.

Initialization

1
2
3
4
5
6
7
8
9
from pyspark import SparkContext, SparkConf

# Cluster Mode
configure = SparkConf().setAppName('MyApp').setMaster('IP Address')
# Local Mode
configure = SparkConf().setAppName('MyApp').setMaster('local')

sc = SparkContext(conf = configure)

To read data frames

1
2
3
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyApp").config("<config option>", "<config value>").getOrCreate()

Reading and Writing Data into Spark Data Frames

Tip
If Spark is used in a cluster mode all the worker nodes need to have access to the input data source. If you’re trying to import a file saved only on the local disk of the driver node you’ll receive an error message similar to this:

AnalysisException: u'Path does not exist: file:/home/ubuntu/test.csv;'

Loading the file should work if all the nodes have it saved under the same path.


Read and Write Data into Spark Data Frames [example code]

[Read and Write Data into Spark Data Frames example code

1
2
3
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
path = "<data path>"

# read json
df = spark.read.json(path)

# schema
df.printSchema()

# describe
df.describe()

# show
df.show(n=1)

# take
df.take(5)

# write
out_path = "<data path>"
df.write.save(out_path, format='csv', header=True)

spark.read.

  • csv
  • format
  • jdbc
  • json
  • orc
  • parquet
  • text
  • table
  • schema

Imperative vs Declarative programming


Data Wrangling with Data Frames

Correction

You might notice in the screencast code that the SparkSession object wasn’t instantiated explicitly. This is because Judit was using the same environment from a previous exercise. In general, you would have to instantiate an object using code like this:

1
2
3
4
spark = SparkSession \
.builder \
.appName("Wrangling Data") \
.getOrCreate()

Functions

In the previous video, we’ve used a number of functions to manipulate our dataframe. Let’s take a look at the different type of functions and their potential pitfalls.

General functions

We have used the following general functions that are quite similar to methods of pandas dataframes:

  • select(): returns a new DataFrame with the selected columns
  • filter(): filters rows using the given condition
  • where(): is just an alias for filter()
  • groupBy(): groups the DataFrame using the specified columns, so we can run aggregation on them
  • sort(): returns a new DataFrame sorted by the specified column(s). By default the second parameter ‘ascending’ is True.
  • dropDuplicates(): returns a new DataFrame with unique rows based on all or just a subset of columns
  • withColumn(): returns a new DataFrame by adding a column or replacing the existing column that has the same name. The first parameter is the name of the new column, the second is an expression of how to compute it.
Aggregate functions

Spark SQL provides built-in methods for the most common aggregations such as count(), countDistinct(), avg(), max(), min(), etc. in the pyspark.sql.functions module. These methods are not the same as the built-in methods in the Python Standard Library, where we can find min() for example as well, hence you need to be careful not to use them interchangeably.

In many cases, there are multiple ways to express the same aggregations. For example, if we would like to compute one type of aggregate for one or more columns of the DataFrame we can just simply chain the aggregate method after a groupBy(). If we would like to use different functions on different columns, agg()comes in handy. For example agg({"salary": "avg", "age": "max"}) computes the average salary and maximum age.

User defined functions (UDF)

In Spark SQL we can define our own functions with the udf method from the pyspark.sql.functions module. The default type of the returned variable for UDFs is string. If we would like to return an other type we need to explicitly do so by using the different types from the pyspark.sql.types module.

Window functions

Window functions are a way of combining the values of ranges of rows in a DataFrame. When defining the window we can choose how to sort and group (with the partitionBy method) the rows and how wide of a window we’d like to use (described by rangeBetween or rowsBetween).

For further information see the Spark SQL, DataFrames and Datasets Guide and the Spark Python API Docs.


Data Wrangling with Spark [Example Code]

Data Wrangling with Spark [Example Code]


Quiz - Data Wrangling with DataFrames Jupyter Notebook

5_dataframe_quiz.ipynb

Which page did user id “” (empty string) NOT visit?
About
Home
Login
NextSong


What type of user does the empty string user id most likely refer to?

Users who are only visiting the About, Home and Login pages are probably unregistered visitors.


How many female users do we have in the data set?
462
501
3820
5844


How many songs were played from the most played artist?
3
53
83
113


How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.
5
7
9
11


Quiz Solution

6_dataframe_quiz_solution.ipynb


Spark SQL

Spark SQL resources

Here are a few resources that you might find helpful when working with Spark SQL


Example Spark SQL


Example Spark SQL [Example Code]

7_data_wrangling-sql.ipynb


Quiz - Data Wrangling with SparkSQL

8_spark_sql_quiz.ipynb


Quiz Solution

9_spark_sql_quiz_solution.ipynb


RDDs


RDDs are a low-level abstraction of the data. In the first version of Spark, you worked directly with RDDs. You can think of RDDs as long lists distributed across various machines. You can still use RDDs as part of your Spark code although data frames and SQL are easier. This course won’t go into the details of RDD syntax, but you can find some further explanation of the difference between RDDs and DataFrames in Databricks’ A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets blog post.

Here is a link to the Spark documentation’s RDD programming guide.


Summary


Lesson 4: Setting up Spark Clusters with AWS

Introduction

Lesson Overview

By the end of the lesson, you will be able to:

  • Distinguish between setting up a Spark Cluster using both Local and Standalone Mode
  • Set up Spark Cluster in AWS
  • Use Spark UI
  • Use AWS CLI
  • Create EMR using AWS CLI
  • Create EMR Cluster
  • Test Port Forwarding
  • Use Notebooks on your Spark Cluster
  • Write Spark Scripts
  • Store and Retrieve data on the Cloud
  • Read and Write to Amazon S3
  • Understand the distinction between HDFS and S3
  • Reading and Writing Data to HDFS

From Local to Standalone Mode

Overview of the Set up of a Spark Cluster

  1. Amazon S3 will store the dataset.
  2. We rent a cluster of machines, i.e., our Spark Cluster, and it is located in AWS data centers. We rent these using AWS service called Elastic Compute Cloud (EC2).
  3. We log in from your local computer to this Spark cluster.
  4. Upon running our Spark code, the cluster will load the dataset from Amazon S3 into the cluster’s memory distributed across each machine in the cluster.
New Terms:
  • Local mode: You are running a Spark program on your laptop like a single machine.
  • Standalone mode: You are defining Spark Primary and Secondary to work on your (virtual) machine. You can do this on EMR or your machine. Standalone mode uses a resource manager like YARN or Mesos.

Setup Instructions AWS

AWS EMRAWS EC2
Distributed computingYesYes
Node categorizationCategorizes secondary nodes into core and task nodes as a result of which data can be lost in case a data node is removed.Does not use node categorization
Can support HDFS?YesOnly if you configure HDFS on EC2 yourself using multi-step process.
What protocol can be used?Uses S3 protocol over AWS S3, which is faster than s3a protocolECS uses s3a
Comparison cost Bit higherLower

Circling back about HDFS

Previously we have looked over the Hadoop Ecosystem. To refresh those concepts, we have provided reference material here. HDFS (Hadoop Distributed File System) is the file system. HDFS uses MapReduce system as a resource manager.

Spark can replace the MapReduce algorithm. Since Spark does not have its own distributed storage system, it leverages using HDFS or AWS S3, or any other distributed storage. Primarily in this course, we will be using AWS S3, but let’s review the advantages of using HDFS over AWS S3.


What is HDFS?

HDFS (Hadoop Distributed File System) is the file system in the Hadoop ecosystem. Hadoop and Spark are two frameworks providing tools for carrying out big-data related tasks. While Spark is faster than Hadoop, Spark has one drawback. It lacks a distributed storage system. In other words, Spark lacks a system to organize, store and process data files.


MapReduce System

HDFS uses MapReduce system as a resource manager to allow the distribution of the files across the hard drives within the cluster. Think of it as the MapReduce System storing the data back on the hard drives after completing all the tasks.

Spark, on the other hand, runs the operations and holds the data in the RAM memory rather than the hard drives used by HDFS. Since Spark lacks a file distribution system to organize, store and process data files, Spark tools are often installed on Hadoop because Spark can then use the Hadoop Distributed File System (HDFS).

Instance types
Amazon EC2 Instance Types

Naming conventions

Instance type names combine the instance family, generation, and size. They can also indicate additional capabilities, such as:

  • a – AMD processors
  • g – AWS Graviton processors
  • i – Intel processors
  • d – Instance store volumes
  • n – Network optimization
  • b – Block storage optimization
  • e – Extra storage or memory
  • z – High frequency

Errors in video:

  1. AWS does not allow standalone mode: The video describes that AWS offers to run Spark on Standalone Mode at timestamp 2.50. This is inaccurate. AWS does not allow standalone mode - only YARN mode. More context is available at this AWS discussion forum post.
  2. Cost for using AWS EMR clusters: Please note that for all nodes on the the cluster, AWS charges EMR on top of EC2 cost. The true cost of a cluster with 1 master + 2 core on m5.xlarge (which is more than sufficient for the exercises) would be around $0.75/hour. Note the cost for the EMR clusters on top of AWC EC2 (https://aws.amazon.com/emr/pricing/)). Remember: You have limited credits that should be adequate for your exercises. If you forget to shut off any connection with an AWS instance when you are not actively using the AWS instance for the exercise, you may exhaust your credits.

Why do you need EMR Cluster?

Since a Spark cluster includes multiple machines, in order to use Spark code on each machine, we would need to download and install Spark and its dependencies. This is a manual process. Elastic Map Reduce is a service offered by AWS that negates the need for you, the user, to go through the manual process of installing Spark and its dependencies for each machine.


Setting up AWS

Please refer to the latest AWS documentation to set up an EMR Cluster.

Let’s pause to do a quick check for understanding from a previous page.


What are some characteristics of the AWS EMR standalone mode? (may be more than one answer)

It runs on your local machine
It is distributed
Spark is taking care of the resource management


AWS - Install and Configure AWS CLI

AWS CLI Documentation
Installation and configuration guide


AWS CLI - Create EMR Cluster

Let’s learn how to create an EMR cluster from the CLI, and configure the related settings.

1
2
3
aws emr help

aws emr create-cluster help

aws emr create-cluster command

While creating EMR through AWS console has been shown, but if you know your instances’ specificity, such as which applications you need or what kind of clusters you’ll need, you can reuse the aws emr create-cluster command below multiple times.

1
2
3
4
5
6
aws emr create-cluster --name <cluster_name> \
--use-default-roles --release-label emr-5.28.0 \
--instance-count 3 --applications Name=Spark Name=Zeppelin \
--bootstrap-actions Path="s3://bootstrap.sh" \
--ec2-attributes KeyName=<Key-pair-file-name>, SubnetId=<subnet-Id> \
--instance-type m5.xlarge --log-uri s3:///emrlogs/
  1. Options: Let’s break down the command above and go over each option.

    • --name : You can give any name of your choice. This will show up on your AWS EMR UI.

    • --release-label: This is the version of EMR you’d like to use.

    • --instance-count: Annotates instance count. One is for the primary, and the rest are for the secondary. For example, if –instance-count is given 4, then 1 instance will be reserved for primary, then 3 will be reserved for secondary instances.

    • --applications: List of applications you want to pre-install on your EMR at the launch time

    • --bootstrap-actions: The Path attribute provides the path to a file (residing in S3 or locally) that contains a script that runs during a bootstrap action. The script may set environmental variables in all the instances of the cluster. This file must be accessible to each instance in the cluster.

    • --ec2-attributes: The KeyName field specifies your key-pair file name, for example, if it is MyKey.pem, just specify MyKey for this field. There is one more field that you should specify, SubnetId.
      The aws documentation says that the cluster must be launched within an EC2-VPC. Therefore, you need to provide the VPC subnet Id in which to create the cluster. If you do not specify this value, the cluster is launched in the normal AWS cloud, outside of any VPC. Go to the VPC service in the web console to copy any of the subnet IDs within the default VPC. If you do not see a default VPC in your account, use a simple command to create a default VPC:

      1
      aws ec2 create-default-vpc --profile <profile-name>

      See the snapshot below to copy the subnet Id.

    • --instance-type: Specify the type of instances you want to use. Detailed list can be accessed here, but find the one that can fit your data and your budget.

    • --log-uri: S3 location to store your EMR logs in. This log can store EMR metrics and also the metrics/logs for submission of your code.

      Use the subnet ID in --ec2-attributes KeyName=<Key-pair-file-name>, SubnetId=<subnet-Id> option

  2. Reference - You can refer to an even more detailed explanation about all possible options of the aws emr create-cluster command at CLI command reference.


Exercise:

Lab: Create an EMR cluster using AWS CLI


Using Notebooks on Your Cluster


Spark Scripts


Submitting Spark Scripts


Lab: Submitting Spark Scripts

Here is the link to the GitHub repo where a copy of the exercise instructions are located along with cities.csv file.

  • Download the cities.csv dataset to your local machine.
  • Upload a file into an S3 location using the AWS S3 console, or you can use the AWS CLI command, like aws s3 cp <your current file location>/<filename> s3://<bucket_name>.
  • Create an EMR instance.
  • Copy the file to your EMR instance, preferably in your home directory of EMR instance.
  • Execute the file using spark-submit <filename>.py.

A note about SSH

SSH is a specific protocol for secure remote login and file transfer.

The instructor is showing you one way to save your files. He is using SSH protocol to save the files in the EMR instance. When you see hadoop@ip-###- ###-####, this indicates that the instructor accessed the EMR instance using SSH protocol. However, once he terminates the EMR instance, everything he would saved on the EMR instance will be lost. This is because EMR instance is not kept active all the time since it is expensive.

In the Reflection Exercise you can experiment with an alternate good industry practice. Data engineers always save their initial, final, and intermediate data of the data pipeline in the S3 for future retrieval. It is best practice to move your files from your local machine to AWS S3, then use the program to read the data from AWS S3.

Reflection exercise:

Use your proxy to view the Spark UI to understand how your code and workers are working, i.e. which are transformation vs action words (and if they are correctly showing up on Spark UI), and to get familiar with reading the Spark UI. This will give you a better understanding on how your Spark program runs.

Reminder link to Amazon documentation on FoxyProxy

Supporting Materials


Storing and Retrieving Data on the Cloud


What are the characteristics of AWS S3? (may be more than one answer)
It’s a type of relational database
It’s a file storage system that you use to store primarily text files
It’s a file storage system that you can access with a bucket, object, and keys.
You can access S3 from other AWS services, like EMR or EC2 if you have the same access credentials.


Read and Writing to Amazon S3

Lab: Reading and Writing Data to Amazon S3


S3 Buckets

With the convenient AWS UI, we can easily mistake AWS S3 (Simple Storage Service) equivalent as Dropbox or even Google Drive. This is not the case for S3. S3 stores an object, and when you identify an object, you need to specify a bucket, and key to identify the object. For example,

1
df = spark.read.load(“s3://my_bucket/path/to/file/file.csv”)

From this code, s3://my_bucketis the bucket, and path/to/file/file.csv is the key for the object. Thankfully, if we’re using spark, and all the objects underneath the bucket have the same schema, you can do something like below.

1
df = spark.read.load(“s3://my_bucket/”)

This will generate a dataframe of all the objects underneath the my_bucket with the same schema. Pretend some structure in s3 like below:

1
2
3
4
5
6
7
my_bucket
|---test.csv
path/to/
|--test2.csv
file/
|--test3.csv
|--file.csv

If all the csv files underneath my_bucket, which are test.csv, test2.csv, test3.csv, and file.csv have the same schema, the dataframe will be generated without error, but if there are conflicts in schema between files, then the dataframe will not be generated. As an engineer, you need to be careful on how you organize your data lake.

Link to Github Repo on Demo code referred to in video: HERE

Supporting Materials


Introduction to HDFS


Differences between HDFS and AWS S3

Since Spark does not have its own distributed storage system, it leverages using HDFS or AWS S3, or any other distributed storage. Primarily in this course, we will be using AWS S3, but let’s review the advantages of using HDFS over AWS S3.

Although it would make the most sense to use AWS S3 while using other AWS services, it’s important to note the differences between AWS S3 and HDFS.

  • AWS S3 is an object storage system that stores the data using key value pairs, namely bucket and key, and HDFS is an actual distributed file system which guarantees fault tolerance. HDFS achieves fault tolerance by having duplicate factors, which means it will duplicate the same files at 3 different nodes across the cluster by default (it can be configured to different numbers of duplication).
  • HDFS has usually been installed in on-premise systems, and traditionally have had engineers on-site to maintain and troubleshoot Hadoop Ecosystem, which cost more than having data on cloud. Due to the flexibility of location and reduced cost of maintenance, cloud solutions have been more popular. With extensive services you can use within AWS, S3 has been a more popular choice than HDFS.
  • Since AWS S3 is a binary object store, it can store all kinds of format, even images and videos. HDFS will strictly require a certain file format - the popular choices are avro and parquet, which have relatively high compression rate and which makes it useful to store large dataset.

Reading and Writing Data to HDFS


Lab: Reading and Writing Data to HDFS


What are the file types that are supported in HDFS? (may be more than one answer)

Image files
Avro files
Parquet files
Zip files


Which of these are valid HDFS commands that you can use? (may be more than one answer)
hdfs ls: shows the files
hadoop fs mkdir: makes a directory
hadoop fs -text : views raw files
hdfs dfs -copyToLocal : copies files to local (like your laptop)


Recap Local Mode to Cluster Mode


Lesson 5: Debugging and Optimization

Debugging is Hard

Debugging Spark is harder on Standalone mode

  • Previously, we ran Spark codes in the local mode where you can easily fix the code on your laptop because you can view the error in your code on your local machine.
  • For Standalone mode, the cluster (group of manager and executor) load data, distribute the tasks among them and the executor executes the code. The result is either a successful output or a log of the errors. The logs are captured in a separate machine than the executor, which makes it important to interpret the syntax of the logs - this can get tricky.
  • One other thing that makes the standalone mode difficult to deploy the code is that your laptop environment will be completely different than AWS EMR or other cloud systems. As a result, you will always have to test your code rigorously on different environment settings to make sure the code works.

Intro: Syntax Errors

Remember: Lazy Evaluation may cause you see your code has no bug.


Code Errors


Data Errors


Demo: Data Errors


Debugging Your Code


How to Use Accumulators

What are Accumulators?

As the name hints, accumulators are variables that accumulate. Because Spark runs in distributed mode, the workers are running in parallel, but asynchronously. For example, worker 1 will not be able to know how far worker 2 and worker 3 are done with their tasks. With the same analogy, the variables that are local to workers are not going to be shared to another worker unless you accumulate them. Accumulators are used for mostly sum operations, like in Hadoop MapReduce, but you can implement it to do otherwise.

For additional deep-dive, here is the Spark documentation on accumulators if you want to learn more about these.


What would be the best scenario for using Spark Accumulators?

When you’re using transformation functions across your code
When you know you will have different values across your executors


Spark Broadcast

What is Spark Broadcast?

Spark Broadcast variables are secured, read-only variables that get distributed and cached to worker nodes. This is helpful to Spark because when the driver sends packets of information to worker nodes, it sends the data and tasks attached together which could be a little heavier on the network side. Broadcast variables seek to reduce network overhead and to reduce communications. Spark Broadcast variables are used only with Spark Context.


When is broadcast usually used in Spark?

Broadcast join is a way of joining a large table and small table in Spark.
Broadcast variable is a cached variable in the driver.
Broadcast variable is shipped to each machine with tasks.
Broadcast join is like map-side join in MapReduce.


Exercise: Broadcast Example

Run the starter code in Jupyter Notebook to practice Broadcast Joins.


Spark Web UI


Connecting to the Spark Web UI


Getting Familiar with the Spark UI

For Further Optional Reading on the Spark UI

You may be interested in the Monitoring and Instrumentation section of the Spark documentation.


Review of the Log Data

What are some efficient ways to keep logs? (may be more than one answer)

Put print statements wherever needed
Use logging systems, like Logger
Save log files to local system

Further Optional Study on Log Data

For further information please see the Configuring Logging section of the Spark documentation.


Diagnosing Errors


  • Key Metrics for Sparkify
    • Monthly Active Users
    • Daily Active Users in Past Month
    • Total Paid and Unpaid Users
    • Total Ads Served in Past Month

  • Cohort analysis

Optimization Introduction


Understanding Data Skew

Data Skew

  • Change workload division
  • Partition the data

Other Issues and How to Address Them

Troubleshooting Other Spark Issues

In this lesson, we walked through various examples of Spark issues you can debug based on error messages, loglines and stack traces.

We have also touched on another very common issue with Spark jobs that can be harder to address: everything working fine but just taking a very long time. So what do you do when your Spark job is (too) slow?


Insufficient resources

Often while there are some possible ways of improvement, processing large data sets just takes a lot longer time than smaller ones even without any big problem in the code or job tuning. Using more resources, either by increasing the number of executors or using more powerful machines, might just not be possible. When you have a slow job it’s useful to understand:

How much data you’re actually processing (compressed file formats can be tricky to interpret) If you can decrease the amount of data to be processed by filtering or aggregating to lower cardinality, And if resource utilization is reasonable.

There are many cases where different stages of a Spark job differ greatly in their resource needs: loading data is typically I/O heavy, some stages might require a lot of memory, others might need a lot of CPU. Understanding these differences might help to optimize the overall performance. Use the Spark UI and logs to collect information on these metrics.

If you run into out of memory errors you might consider increasing the number of partitions. If the memory errors occur over time you can look into why the size of certain objects is increasing too much during the run and if the size can be contained. Also, look for ways of freeing up resources if garbage collection metrics are high.

Certain algorithms (especially ML ones) use the driver to store data the workers share and update during the run. If you see memory issues on the driver check if the algorithm you’re using is pushing too much data there.


Data skew

If you drill down in the Spark UI to the task level you can see if certain partitions process significantly more data than others and if they are lagging behind. Such symptoms usually indicate a skewed data set. Consider implementing the techniques mentioned in this lesson:

Add an intermediate data processing step with an alternative key Adjust the spark.sql.shuffle.partitions parameter if necessary

The problem with data skew is that it’s very specific to a dataset. You might know ahead of time that certain customers or accounts are expected to generate a lot more activity but the solution for dealing with the skew might strongly depend on how the data looks like. If you need to implement a more general solution (for example for an automated pipeline) it’s recommended to take a more conservative approach (so assume that your data will be skewed) and then monitor how bad the skew really is.


Inefficient queries

Once your Spark application works it’s worth spending some time to analyze the query it runs. You can use the Spark UI to check the DAG and the jobs and stages it’s built of.

Spark’s query optimizer is called Catalyst. While Catalyst is a powerful tool to turn Python code to an optimized query plan that can run on the JVM it has some limitations when optimizing your code. It will for example push filters in a particular stage as early as possible in the plan but won’t move a filter across stages. It’s your job to make sure that if early filtering is possible without compromising the business logic than you perform this filtering where it’s more appropriate.

It also can’t decide for you how much data you’re shuffling across the cluster. Remember from the first lesson how expensive sending data through the network is. As much as possible try to avoid shuffling unnecessary data. In practice, this means that you need to perform joins and grouped aggregations as late as possible.

When it comes to joins there is more than one strategy to choose from. If one of your data frames are small consider using broadcast hash join instead of a hash join.


Further reading

Debugging and tuning your Spark application can be a daunting task. There is an ever-growing community out there though, always sharing new ideas and working on improving Spark and its tooling, to make using it easier. So if you have a complicated issue don’t hesitate to reach out to others (via user mailing lists, forums, and Q&A sites).

You can find more information on tuning Spark and Spark SQL in the documentation.


Different types of Spark Functions

Transformations and Actions

There are two types of functions in Spark:

  1. Transformations
  2. Actions

Spark uses lazy evaluation to evaluate RDD and dataframe. Lazy evaluation means the code is not executed until it is needed. The action functions trigger the lazily evaluated functions.

For example,

1
2
3
df = spark.read.load("some csv file")
df1 = df.select("some column").filter("some condition")
df1.write("to path")
  • In this code, select and filter are transformation functions, and write is an action function.
  • If you execute this code line by line, the second line will be loaded, but you will not see the function being executed in your Spark UI.
  • When you actually execute using action write, then you will see your Spark program being executed:
    • select –> filter –> write chained in Spark UI
    • but you will only see Writeshow up under your tasks.

This is significant because you can chain your RDD or dataframe as much as you want, but it might not do anything until you actually trigger with some action words. And if you have lengthy transformations, then it might take your executors quite some time to complete all the tasks.


Lesson Summary


Lesson 6: Machine Learning with Spark

Introduction


Machine Learning in Spark

At the time of this recording Spark’s latest release - version 2.2.1 - supports two machine learning libraries spark.ml and spark.mllib. Both libraries are part of Spark’s Machine Learning Library known as MLlib.

Spark.mllib is an RDD based library and it has been in maintenance mode since version 2.0. According to the current plans spark.ml, the Dataframe based API, will be feature complete by version 2.3 and then the older spark.mllib will be removed in Spark 3.0. So currently we might need to use a mixture of these two libraries but as time goes on you should focus on spark.ml as it’s becoming Spark’s standard machine learning library.

The term “Spark ML” is sometimes used to refer to the Spark Machine Learning library, which is officially called “MLlib”. For further details see the MLlib documentation. In the following tutorials we’ll use the DataFrame-based API.

Machine Learning Parallelization

  • Data Parallelization
  • Task Parallelization

Feature Extraction

You can find more information about the different scaler and indexer options through the links below.

Scalers

Indexers

To learn more about text processing you can read further in the documentation.


Numeric Features

The data comes from StackOverflow data dumps.


Notebook

1_numeric_features.ipynb


Text Processing


Notebook

2_text_processing.ipynb


Quiz: Creating Features

3_creating_features_quiz.ipynb

Select the question with Id = 1112. How many words does its body contain (check the BodyLength column)?

53
63
73
83


Create a new column that concatenates the question title and body. Apply the same functions we used before to compute the number of words in this combined column. What’s the value in this new column for Id = 5123?

75
95
115
135


Using the Normalizer method what’s the normalized value for question Id = 512?

0.5
1.0
1.5
2.0


Using the StandardScaler method (scaling both the mean and the standard deviation) what’s the normalized value for question Id = 512?

-0.73
-0.64
0.29
57


Using the MinMAxScaler method what’s the normalized value for question Id = 512?

0.0062
0.0123
0.1576
0.3211


Dimensionality Reduction


Supervised ML Algorithms


Linear Regression


Quiz: Linear Regression

5_linear_regression_quiz.ipynb

Build a linear regression model using the length of the combined Title + Body fields. What is the value of $r^2$ when fitting a model with maxIter=5, regParam=0.0, fitIntercept=False, solver="normal"?

0.42
0.44
0.52
1.12


Logistic Regression


Unsupervised ML Algorithms


Quiz: K-means

7_k_means_quiz.ipynb

How many times greater is the Description Length of the longest question than the Description Length of the shortest question (rounded to the nearest whole number)?

Tip: Don’t forget to import Spark SQL’s aggregate functions that can operate on DataFrame columns.

67
123
356
753


What is the mean and standard deviation of the Description length?

170, 64
180, 192
180, 213
190, 319


Let’s use K-means to create 5 clusters of Description Lengths. Set the random seed to 42 and fit a 5-class K-means model on the Description Length column (you can use KMeans().setParams(…) ). What length is the center of the cluster representing the longest questions?

92
180
2634
7532


ML Pipelines

  • Two main components
    • Transformers: Transform one dataframe to another.
    • Estimators

ML Pipeline Example


Model Selection and Tuning

Spark supports training and evaluation in parallel:

  • Train-Validation Split
  • K-Fold Cross Validation

Model Selection and Tuning Example


Quiz: Model Tuning

9_model_tuning_quiz.ipynb

What is the accuracy of the best model trained with the parameter grid described above (and keeping all other parameters at their default value computed on the 10% untouched data?

0.23
0.32
0.39
0.46


Summary


Resources

GitHub

Data Analytics
Machine Learning
Streaming
Graph Analytics

Spark SQL, DataFrames and Datasets Guide
Spark Python API Docs.

Spark SQL built-in functions
Spark SQL guide

RDD programming guide.