Spark Tutorial

Reference

LearningSparkV2


Spark Features

Spark Purpose
In particular, data engineers will learn how to use Spark’s Structured APIs to perform complex data exploration and analysis on both batch and streaming data; use Spark SQL for interactive queries; use Spark’s built-in and external data sources to read, refine, and write data in different file formats as part of their extract, transform, and load (ETL) tasks; and build reliable data lakes with Spark and the open source Delta Lake table format.

Speed
Second, Spark builds its query computations as a directed acyclic graph (DAG); its DAG scheduler and query optimizer construct an efficient computational graph that can usually be decomposed into tasks that are executed in parallel across workers on the cluster

Ease of Use
Spark achieves simplicity by providing a fundamental abstraction of a simple logical data structure called a Resilient Distributed Dataset (RDD) upon which all other higher-level structured data abstractions, such as DataFrames and Datasets, are constructed.

Modularity
Spark SQL, Spark Structured Streaming, Spark MLlib, and GraphX

Extensibiltiy
Spark to read data stored in myriad sources—Apache Hadoop, Apache Cassandra, Apache HBase, MongoDB, Apache Hive, RDBMSs, and more—and process it all in memory. Spark’s DataFrameReaders and DataFrame Writers can also be extended to read data from other sources, such as Apache Kafka, Kinesis, Azure Storage, and Amazon S3, into its logical data abstraction, on which it can operate.


Spark Components


Apache Spark’s Distributed Execution

Spark driver
As the part of the Spark application responsible for instantiating a SparkSession, the Spark driver has multiple roles: it communicates with the cluster manager; it requests resources (CPU, memory, etc.) from the cluster manager for Spark’s executors (JVMs); and it transforms all the Spark operations into DAG computations, schedules them, and distributes their execution as tasks across the Spark executors. Once the resources are allocated, it communicates directly with the executors.

SparkSession
In Spark 2.0, the SparkSession became a unified conduit to all Spark operations and data. Not only did it subsume previous entry points to Spark like the SparkContext, SQLContext, HiveContext, SparkConf, and StreamingContext, but it also made working with Spark simpler and easier

Cluster manager
The cluster manager is responsible for managing and allocating resources for the cluster of nodes on which your Spark application runs. Currently, Spark supports four cluster managers: the built-in standalone cluster manager, Apache Hadoop YARN, Apache Mesos, and Kubernetes.

Spark executor
A Spark executor runs on each worker node in the cluster. The executors communicate with the driver program and are responsible for executing tasks on the workers. In most deployments modes, only a single executor runs per node.

Deployment methods

Distributed data and partitions


Spark Shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
scala> val strings = spark.read.text("spark-3.2.1-bin-hadoop3.2/README.md")
strings: org.apache.spark.sql.DataFrame = [value: string]

scala> strings.show(10, false)
+--------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------+
|# Apache Spark |
| |
|Spark is a unified analytics engine for large-scale data processing. It provides|
|high-level APIs in Scala, Java, Python, and R, and an optimized engine that |
|supports general computation graphs for data analysis. It also supports a |
|rich set of higher-level tools including Spark SQL for SQL and DataFrames, |
|MLlib for machine learning, GraphX for graph processing, |
|and Structured Streaming for stream processing. |
| |
|<https://spark.apache.org/> |
+--------------------------------------------------------------------------------+
only showing top 10 rows


scala> strings.count()
res1: Long = 109
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
>>> strings = spark.read.text("spark-3.2.1-bin-hadoop3.2/README.md")
>>> strings.show(10, truncate=False)
+--------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------+
|# Apache Spark |
| |
|Spark is a unified analytics engine for large-scale data processing. It provides|
|high-level APIs in Scala, Java, Python, and R, and an optimized engine that |
|supports general computation graphs for data analysis. It also supports a |
|rich set of higher-level tools including Spark SQL for SQL and DataFrames, |
|MLlib for machine learning, GraphX for graph processing, |
|and Structured Streaming for stream processing. |
| |
|<https://spark.apache.org/> |
+--------------------------------------------------------------------------------+
only showing top 10 rows

>>> strings.count()
109
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> strings.show(10, truncate=True)
+--------------------+
| value|
+--------------------+
| # Apache Spark|
| |
|Spark is a unifie...|
|high-level APIs i...|
|supports general ...|
|rich set of highe...|
|MLlib for machine...|
|and Structured St...|
| |
|<https://spark.ap...|
+--------------------+
only showing top 10 rows

Spark Application

Spark Application Concepts

Application

  • A user program built on Spark using its APIs. It consists of a driver program and executors on the cluster.

SparkSession

  • An object that provides a point of entry to interact with underlying Spark functionality and allows programming Spark with its APIs. In an interactive Spark shell, the Spark driver instantiates a SparkSession for you, while in a Spark application, you create a SparkSession object yourself.

Job

  • A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g., save(), collect()).

Stage

  • Each job gets divided into smaller sets of tasks called stages that depend on each other.

Task

  • A single unit of work or execution that will be sent to a Spark executor.

Spark Application and SparkSession

At the core of every Spark application is the Spark driver program, which creates a SparkSession object. When you’re working with a Spark shell, the driver is part of the shell and the SparkSession object (accessible via the variable spark) is created for you, as you saw in the earlier examples when you launched the shells.

In those examples, because you launched the Spark shell locally on your laptop, all the operations ran locally, in a single JVM. But you can just as easily launch a Spark shell to analyze data in parallel on a cluster as in local mode. The commands spark-shell –help or pyspark –help will show you how to connect to the Spark cluster man‐ ager. Figure 2-2 shows how Spark executes on a cluster once you’ve done this.

Once you have a SparkSession, you can program Spark using the APIs to perform Spark operations.


Spark Jobs

During interactive sessions with Spark shells, the driver converts your Spark applica‐ tion into one or more Spark jobs (Figure 2-3). It then transforms each job into a DAG. This, in essence, is Spark’s execution plan, where each node within a DAG could be a single or multiple Spark stages.


Spark Stage

As part of the DAG nodes, stages are created based on what operations can be per‐ formed serially or in parallel (Figure 2-4). Not all Spark operations can happen in a single stage, so they may be divided into multiple stages. Often stages are delineated on the operator’s computation boundaries, where they dictate data transfer among Spark executors.


Spark Tasks

Each stage is comprised of Spark tasks (a unit of execution), which are then federated across each Spark executor; each task maps to a single core and works on a single par‐ tition of data (Figure 2-5). As such, an executor with 16 cores can have 16 or more tasks working on 16 or more partitions in parallel, making the execution of Spark’s tasks exceedingly parallel!


Transformations, Actions, and Lazy Evaluation

Spark operations on distributed data can be classified into two types: transformations and actions. Transformations, as the name suggests, transform a Spark DataFrame into a new DataFrame without altering the original data, giving it the property of immutability. Put another way, an operation such as select() or filter() will not change the original DataFrame; instead, it will return the transformed results of the operation as a new DataFrame.

All transformations are evaluated lazily. That is, their results are not computed immediately, but they are recorded or remembered as a lineage. A recorded lineage allows Spark, at a later time in its execution plan, to rearrange certain transformations, coalesce them, or optimize transformations into stages for more efficient execution. Lazy evaluation is Spark’s strategy for delaying execution until an action is invoked or data is “touched” (read from or written to disk).
An action triggers the lazy evaluation of all the recorded transformations. In
Figure 2-6, all transformations T are recorded until the action A is invoked. Each transformation T produces a new DataFrame.

While lazy evaluation allows Spark to optimize your queries by peeking into your chained transformations, lineage and data immutability provide fault tolerance. Because Spark records each transformation in its lineage and the DataFrames are immutable between transformations, it can reproduce its original state by simply replaying the recorded lineage, giving it resiliency in the event of failures.
Table 2-1 lists some examples of transformations and actions.


Narrow and Wide Transformations

As noted, transformations are operations that Spark evaluates lazily. A huge advantage of the lazy evaluation scheme is that Spark can inspect your computational query and ascertain how it can optimize it. This optimization can be done by either joining or pipelining some operations and assigning them to a stage, or breaking them into stages by determining which operations require a shuffle or exchange of data across clusters.

Transformations can be classified as having either narrow dependencies or wide dependencies. Any transformation where a single output partition can be computed from a single input partition is a narrow transformation. For example, in the previous code snippet,filter() and contains() represent narrow transformations because they can operate on a single partition and produce the resulting output partition without any exchange of data.

However, groupBy() or orderBy() instruct Spark to perform wide transformations, where data from other partitions is read in, combined, and written to disk. Since each partition will have its own count of the word that contains the “Spark” word in its row of data, a count (groupBy()) will force a shuffle of data from each of the executor’s partitions across the cluster. In this transformation, orderBy() requires output from other partitions to compute the final aggregation.
Figure 2-7 illustrates the two types of dependencies.


The Spark UI

Spark includes a graphical user interface that you can use to inspect or monitor Spark applications in their various stages of decomposition—that is jobs, stages, and tasks. Depending on how Spark is deployed, the driver launches a web UI, running by default on port 4040, where you can view metrics and details such as:

  • A list of scheduler stages and tasks
  • A summary of RDD sizes and memory usage
  • Information about the environment
  • Information about the running executors
  • All the Spark SQL queries

In local mode, you can access this interface at http://<localhost>:4040 in a web browser.


Set environment variables

How to Set and List Environment Variables in Linux

  • env – The command allows you to run another program in a custom environment without modifying the current one. When used without an argument it will print a list of the current environment variables.

  • printenv – The command prints all or the specified environment variables.

  • set – The command sets or unsets shell variables. When used without an argument it will print a list of all variables including environment and shell variables, and shell functions.

  • unset – The command deletes shell and environment variables.

  • export – The command sets environment variables.

  • /etc/environment - Use this file to set up system-wide environment variables.

  • /etc/profile - Variables set in this file are loaded whenever a bash login shell is entered. ```

  • Per-user shell specific configuration files. For example, if you are using Bash, you can declare the variables in the ~/.bashrc:

Set your SPARK_HOME environment variable to the root-level directory where you installed Spark on your local machine.

1
2
➜  spark-3.2.1-bin-hadoop3.2 pwd
/Users/zacks/Git/Data Science Projects/spark-3.2.1-bin-hadoop3.2
1
2
3
➜  spark-3.2.1-bin-hadoop3.2 echo 'export SPARK_HOME="/Users/zacks/Git/Data Science Projects/spark-3.2.1-bin-hadoop3.2"' >> ~/.zshrc

➜ spark-3.2.1-bin-hadoop3.2 source ~/.zshrc

Review ~/.zshrc

1
2
➜  spark-3.2.1-bin-hadoop3.2 tail -1 ~/.zshrc
export SPARK_HOME="/Users/zacks/Git/Data Science Projects/spark-3.2.1-bin-hadoop3.2"

To avoid having verbose INFO messages printed to the console, copy the log4j.properties.template file to log4j.properties and set log4j.rootCategory=WARN in the conf/log4j.properties file.

Content in log4j.properties.template

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
➜  spark-3.2.1-bin-hadoop3.2 cat conf/log4j.properties.template 
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell/spark-sql log level to WARN. When running the
# spark-shell/spark-sql, the log level for these classes is used to overwrite
# the root logger's log level, so that the user can have different defaults
# for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
log4j.logger.org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

# For deploying Spark ThriftServer
# SPARK-34128:Suppress undesirable TTransportException warnings involved in THRIFT-4805
log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter
log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message
log4j.appender.console.filter.1.AcceptOnMatch=false
1
➜  spark-3.2.1-bin-hadoop3.2 cp conf/log4j.properties.template conf/log4j.properties

After edit log4j.rootCategory in log4j.properties

1
2
➜  spark-3.2.1-bin-hadoop3.2 cat conf/log4j.properties | grep log4j.rootCategory
log4j.rootCategory=WARN, console

Spark Job

mnmcount.py

Under LearningSparkV2/chapter2/py/src

1
2
➜  src git:(dev) ✗ source ~/.zshrc
➜ src git:(dev) ✗ $SPARK_HOME/bin/spark-submit mnmcount.py data/mnm_dataset.csv

Output:

1st table: head of df
2nd table: sum Count group by State, Color
3rd table: sum Count group by State, Color where Color = 'CA'

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/zacks/Git/Data%20Science%20Projects/spark-3.2.1-bin-hadoop3.2/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
22/02/17 22:34:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+-----+------+-----+
|State|Color |Count|
+-----+------+-----+
|TX |Red |20 |
|NV |Blue |66 |
|CO |Blue |79 |
|OR |Blue |71 |
|WA |Yellow|93 |
+-----+------+-----+
only showing top 5 rows

+-----+------+----------+
|State|Color |sum(Count)|
+-----+------+----------+
|CA |Yellow|100956 |
|WA |Green |96486 |
|CA |Brown |95762 |
|TX |Green |95753 |
|TX |Red |95404 |
|CO |Yellow|95038 |
|NM |Red |94699 |
|OR |Orange|94514 |
|WY |Green |94339 |
|NV |Orange|93929 |
|TX |Yellow|93819 |
|CO |Green |93724 |
|CO |Brown |93692 |
|CA |Green |93505 |
|NM |Brown |93447 |
|CO |Blue |93412 |
|WA |Red |93332 |
|WA |Brown |93082 |
|WA |Yellow|92920 |
|NM |Yellow|92747 |
|NV |Brown |92478 |
|TX |Orange|92315 |
|AZ |Brown |92287 |
|AZ |Green |91882 |
|WY |Red |91768 |
|AZ |Orange|91684 |
|CA |Red |91527 |
|WA |Orange|91521 |
|NV |Yellow|91390 |
|UT |Orange|91341 |
|NV |Green |91331 |
|NM |Orange|91251 |
|NM |Green |91160 |
|WY |Blue |91002 |
|UT |Red |90995 |
|CO |Orange|90971 |
|AZ |Yellow|90946 |
|TX |Brown |90736 |
|OR |Blue |90526 |
|CA |Orange|90311 |
|OR |Red |90286 |
|NM |Blue |90150 |
|AZ |Red |90042 |
|NV |Blue |90003 |
|UT |Blue |89977 |
|AZ |Blue |89971 |
|WA |Blue |89886 |
|OR |Green |89578 |
|CO |Red |89465 |
|NV |Red |89346 |
|UT |Yellow|89264 |
|OR |Brown |89136 |
|CA |Blue |89123 |
|UT |Brown |88973 |
|TX |Blue |88466 |
|UT |Green |88392 |
|OR |Yellow|88129 |
|WY |Orange|87956 |
|WY |Yellow|87800 |
|WY |Brown |86110 |
+-----+------+----------+

Total Rows = 60
+-----+------+----------+
|State|Color |sum(Count)|
+-----+------+----------+
|CA |Yellow|100956 |
|CA |Brown |95762 |
|CA |Green |93505 |
|CA |Red |91527 |
|CA |Orange|90311 |
|CA |Blue |89123 |
+-----+------+----------+

Spark’s Structured APIs

Spark: What’s Underneath an RDD?

The RDD is the most basic abstraction in Spark. There are three vital characteristics associated with an RDD:

  • Dependencies
  • Partitions (with some locality information)
  • Compute function: Partition => Iterator[T]

All three are integral to the simple RDD programming API model upon which all higher-level functionality is constructed. First, a list of dependencies that instructs Spark how an RDD is constructed with its inputs is required. When necessary to reproduce results, Spark can recreate an RDD from these dependencies and replicate operations on it. This characteristic gives RDDs resiliency.

Second, partitions provide Spark the ability to split the work to parallelize computation on partitions across executors. In some cases—for example, reading from HDFS—Spark will use locality information to send work to executors close to the data. That way less data is transmitted over the network.

And finally, an RDD has a compute function that produces an Iterator[T] for the data that will be stored in the RDD.


Structuring Spark

Key Metrics and Benefits

Structure yields a number of benefits, including better performance and space efficiency across Spark components. We will explore these benefits further when we talk about the use of the DataFrame and Dataset APIs shortly, but for now we’ll concentrate on the other advantages: expressivity, simplicity, composability, and uniformity.

Let’s demonstrate expressivity and composability first, with a simple code snippet. In the following example, we want to aggregate all the ages for each name, group by name, and then average the ages—a common pattern in data analysis and discovery. If we were to use the low-level RDD API for this, the code would look as follows:

1
2
3
4
5
6
7
8
9
10
# In Python
# Create an RDD of tuples (name, age)
dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)])
# Use map and reduceByKey transformations with their lambda
# expressions to aggregate and then compute average

agesRDD = (dataRDD
.map(lambda x: (x[0], (x[1], 1)))
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
.map(lambda x: (x[0], x[1][0]/x[1][1])))

No one would dispute that this code, which tells Spark how to aggregate keys and compute averages with a string of lambda functions, is cryptic and hard to read. In other words, the code is instructing Spark how to compute the query. It’s completely opaque to Spark, because it doesn’t communicate the intention. Furthermore, the equivalent RDD code in Scala would look very different from the Python code shown here.

By contrast, what if we were to express the same query with high-level DSL operators and the DataFrame API, thereby instructing Spark what to do? Have a look:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# In Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
# Create a DataFrame using SparkSession
spark = (SparkSession
.builder
.appName("AuthorsAges")
.getOrCreate())

# Create a DataFrame
data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)], ["name", "age"])

# Group the same names together, aggregate their ages, and compute an average
avg_df = data_df.groupBy("name").agg(avg("age"))

# Show the results of the final execution
avg_df.show()
+------+--------+
| name|avg(age)|
+------+--------+
|Brooke| 22.5|
| Denny| 31.0|
| Jules| 30.0|
| TD| 35.0|
+------+--------+

The DataFrame API


Spark’s Basic Data Types

Data Types


Spark’s Structured and Complex Data Types


Schemas and Creating DataFrames

A schema in Spark defines the column names and associated data types for a DataFrame.

Defining a schema up front as opposed to taking a schema-on-read approach offers three benefits:

  • You relieve Spark from the onus of inferring data types.
  • You prevent Spark from creating a separate job just to read a large portion of your file to ascertain the schema, which for a large data file can be expensive and time-consuming.
  • You can detect errors early if data doesn’t match the schema.

So, we encourage you to always define your schema up front whenever you want to read a large file from a data source.

Two ways to define a schema

Spark allows you to define a schema in two ways. One is to define it programmatically, and the other is to employ a Data Definition Language (DDL) string, which is much simpler and easier to read.

To define a schema programmatically for a DataFrame with three named columns, author, title, and pages, you can use the Spark DataFrame API. For example:

1
2
3
4
5
# In Python
from pyspark.sql.types import *
schema = StructType([StructField("author", StringType(), False),
StructField("title", StringType(), False),
StructField("pages", IntegerType(), False)])

Defining the same schema using DDL is much simpler:

1
2
# In Python
schema = "author STRING, title STRING, pages INT"

You can choose whichever way you like to define a schema. For many examples, we will use both:

Example-3_6-DataFrame_API.ipynb
Example-3_6-DDL.ipynb

1
2
3
4
5
6
7
8
9
# define schema by using Spark DataFrame API
schema = StructType([
StructField("Id", IntegerType(), False),
StructField("First", StringType(), False),
StructField("Last", StringType(), False),
StructField("Url", StringType(), False),
StructField("Published", StringType(), False),
StructField("Hits", IntegerType(), False),
StructField("Campaigns", ArrayType(StringType()), False)])
1
2
3
4
# define schema by using Definition Language (DDL)
schema = "Id INT, First STRING, Last STRING, Url STRING, Published STRING, Hits INT, Campaigns ARRAY<STRING>"
# avoid reserved word conflict
# schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

Columns and Expressions

Named columns in DataFrames are conceptually similar to named columns in pandas or R DataFrames or in an RDBMS table: they describe a type of field. You can list all the columns by their names, and you can perform operations on their values using relational or computational expressions. In Spark’s supported languages, columns are objects with public methods (represented by the Column type).

You can also use logical or mathematical expressions on columns. For example, you could create a simple expression using expr("columnName * 5") or (expr("columnName - 5") > col(anothercolumnName)), where columnName is a Spark type (integer, string, etc.). expr() is part of the pyspark.sql.functions (Python) and org.apache.spark.sql.functions (Scala) packages. Like any other function in those packages, expr() takes arguments that Spark will parse as an expression, computing the result.

Scala, Java, and Python all have public methods associated with columns. You’ll note that the Spark documentation refers to both col and Column. Column is the name of the object, while col() is a standard built-in function that returns a Column.

Example-3_7.ipynb


Rows

A row in Spark is a generic Row object, containing one or more columns. Each column may be of the same data type (e.g., integer or string), or they can have different types (integer, string, map, array, etc.). Because Row is an object in Spark and an ordered collection of fields, you can instantiate a Row in each of Spark’s supported languages and access its fields by an index starting at 0:

1
2
3
4
5
>>> from pyspark.sql import Row
>>> blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015", ["twitter", "LinkedIn"])
>>> # access using index for individual items
>>> blog_row[1]
'Reynold'

Row objects can be used to create DataFrames if you need them for quick interactivity and exploration:

1
2
3
4
5
6
7
8
9
>>> rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
>>> authors_df = spark.createDataFrame(rows, ["Authors", "State"])
>>> authors_df.show()
+-------------+-----+
| Authors|State|
+-------------+-----+
|Matei Zaharia| CA|
| Reynold Xin| CA|
+-------------+-----+

Common DataFrame Operations

To perform common data operations on DataFrames, you’ll first need to load a Data‐ Frame from a data source that holds your structured data. Spark provides an inter‐ face, DataFrameReader, that enables you to read data into a DataFrame from myriad data sources in formats such as JSON, CSV, Parquet, Text, Avro, ORC, etc. Likewise, to write a DataFrame back to a data source in a particular format, Spark uses DataFrameWriter.

Using DataFrameReader and DataFrameWriter

Dataset: Fire-Incidents

By defining a schema and use DataFrameReader class and its methods to tell Spark what to do, it’s more efficient to define a schema than have Spark infer it.

  • inferSchema

If you don’t want to specify the schema, Spark can infer schema from a sample at a lesser cost. For example, you can use the samplingRatio option(under chapter3 directory):

1
2
3
4
5
sampleDF = (spark
.read
.option("sampleingRatio", 0.001)
.option("header", True)
.csv("data/sf-fire-calls.csv"))

The following example shows how to read DataFrame with a schema:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from pyspark.sql.types import *

# Programmatic way to define a schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
StructField('UnitID', StringType(), True),
StructField('IncidentNumber', IntegerType(), True),
StructField('CallType', StringType(), True),
StructField('CallDate', StringType(), True),
StructField('WatchDate', StringType(), True),
StructField('CallFinalDisposition', StringType(), True),
StructField('AvailableDtTm', StringType(), True),
StructField('Address', StringType(), True),
StructField('City', StringType(), True),
StructField('Zipcode', IntegerType(), True),
StructField('Battalion', StringType(), True),
StructField('StationArea', StringType(), True),
StructField('Box', StringType(), True),
StructField('OriginalPriority', StringType(), True),
StructField('Priority', StringType(), True),
StructField('FinalPriority', IntegerType(), True),
StructField('ALSUnit', BooleanType(), True),
StructField('CallTypeGroup', StringType(), True),
StructField('NumAlarms', IntegerType(), True),
StructField('UnitType', StringType(), True),
StructField('UnitSequenceInCallDispatch', IntegerType(), True),
StructField('FirePreventionDistrict', StringType(), True),
StructField('SupervisorDistrict', StringType(), True),
StructField('Neighborhood', StringType(), True),
StructField('Location', StringType(), True),
StructField('RowID', StringType(), True),
StructField('Delay', FloatType(), True)])

fire_df = (spark
.read
.csv("data/sf-fire-calls.csv", header=True, schema=fire_schema))

The spark.read.csv() function reads in the CSV file and returns a DataFrame of rows and named columns with the types dictated in the schema.

To write the DataFrame into an external data source in your format of choice, you can use the DataFrameWriter interface. Like DataFrameReader, it supports multiple data sources. Parquet, a popular columnar format, is the default format; it uses snappy compression to compress the data. If the DataFrame is written as Parquet, the schema is preserved as part of the Parquet metadata. In this case, subsequent reads back into a DataFrame do not require you to manually supply a schema.

Saving a DataFrame as a Parquet file or SQL table. A common data operation is to explore and transform your data, and then persist the DataFrame in Parquet format or save it as a SQL table. Persisting a transformed DataFrame is as easy as reading it. For exam‐ ple, to persist the DataFrame we were just working with as a file after reading it you would do the following:

1
2
3
# In Python to save as Parquet files
parquet_path = "..."
fire_df.write.format("parquet").save(parquet_path)

Alternatively, you can save it as a table, which registers metadata with the Hive meta-store (we will cover SQL managed and unmanaged tables, metastores, and Data-Frames in the next chapter):

1
2
3
# In Python to save as a Hive metastore
parquet_table = "..." # name of the table
fire_df.write.format("parquet").saveAsTable(parquet_table)

Transformations and actions

Example-3_8.ipynb

Projections and filters. A projection in relational parlance is a way to return only the rows matching a certain relational condition by using filters. In Spark, projections are done with the select() method, while filters can be expressed using the filter() or where() method. We can use this technique to examine specific aspects of our SF Fire Department data set:

1
2
3
4
fire_parquet.select("IncidentNumber", "AvailableDtTm", "CallType") \
.where(col("CallType") != "Medical Incident") \
.orderBy("IncidentNumber") \
.show(5, truncate=False)
1
2
3
4
5
6
7
8
9
10
+--------------+----------------------+-----------------------------+
|IncidentNumber|AvailableDtTm |CallType |
+--------------+----------------------+-----------------------------+
|30636 |04/12/2000 10:18:53 PM|Alarms |
|30773 |04/13/2000 10:34:32 AM|Citizen Assist / Service Call|
|30781 |04/13/2000 10:53:48 AM|Alarms |
|30840 |04/13/2000 01:39:00 PM|Structure Fire |
|30942 |04/13/2000 07:42:53 PM|Outside Fire |
+--------------+----------------------+-----------------------------+
only showing top 5 rows

What if we want to know how many distinct CallTypes were recorded as the causes of the fire calls? These simple and expressive queries do the job:

1
2
3
4
5
(fire_parquet
.select("CallType")
.where(col("CallType").isNotNull())
.agg(countDistinct("CallType").alias("DistinctCallTypes"))
.show())
1
2
3
4
5
+-----------------+
|DistinctCallTypes|
+-----------------+
| 30|
+-----------------+

We can list the distinct call types in the data set using these queries:

1
2
3
4
5
6
# In Python, filter for only distinct non-null CallTypes from all the rows
(fire_parquet
.select("CallType")
.where(col("CallType").isNotNull())
.distinct()
.show(10, False))

Renaming, adding, and dropping columns. Sometimes you want to rename particular columns for reasons of style or convention, and at other times for readability or brevity. The original column names in the SF Fire Department data set had spaces in them. For example, the column name IncidentNumber was Incident Number. Spaces in column names can be problematic, especially when you want to write or save a DataFrame as a Parquet file (which prohibits this).

By specifying the desired column names in the schema with StructField, as we did, we effectively changed all names in the resulting DataFrame.

Alternatively, you could selectively rename columns with the withColumnRenamed() method. For instance, let’s change the name of our Delay column to ResponseDelayedinMins and take a look at the response times that were longer than five minutes:

1
2
3
4
5
6
7
8
9
# create a new dataframe new_fire_parquet from fire_parquet
# rename "Delay" with "ResponseDelayedinMins" in new_fire_parquet
new_fire_parquet = fire_parquet.withColumnRenamed("Delay",
"ResponseDelayedinMins")
# select ResponseDelayedinMins > 5 mins
(new_fire_parquet
.select("ResponseDelayedinMins")
.where(col("ResponseDelayedinMins") > 5)
.show(5, False))
1
2
3
4
5
6
7
8
9
10
+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.0833335 |
|7.2166667 |
|8.666667 |
|5.7166667 |
|16.016666 |
+---------------------+
only showing top 5 rows

Because DataFrame transformations are immutable, when we rename a column using withColumnRenamed() we get a new DataFrame while retaining the original with the old column name.

Modifying the contents of a column or its type are common operations during data exploration. In some cases the data is raw or dirty, or its types are not amenable to