LearningSpark

Reference

LearningSparkV2


Spark Features

Spark Purpose
In particular, data engineers will learn how to use Spark’s Structured APIs to perform complex data exploration and analysis on both batch and streaming data; use Spark SQL for interactive queries; use Spark’s built-in and external data sources to read, refine, and write data in different file formats as part of their extract, transform, and load (ETL) tasks; and build reliable data lakes with Spark and the open source Delta Lake table format.

Speed
Second, Spark builds its query computations as a directed acyclic graph (DAG); its DAG scheduler and query optimizer construct an efficient computational graph that can usually be decomposed into tasks that are executed in parallel across workers on the cluster

Ease of Use
Spark achieves simplicity by providing a fundamental abstraction of a simple logical data structure called a Resilient Distributed Dataset (RDD) upon which all other higher-level structured data abstractions, such as DataFrames and Datasets, are constructed.

Modularity
Spark SQL, Spark Structured Streaming, Spark MLlib, and GraphX

Extensibiltiy
Spark to read data stored in myriad sources—Apache Hadoop, Apache Cassandra, Apache HBase, MongoDB, Apache Hive, RDBMSs, and more—and process it all in memory. Spark’s DataFrameReaders and DataFrame Writers can also be extended to read data from other sources, such as Apache Kafka, Kinesis, Azure Storage, and Amazon S3, into its logical data abstraction, on which it can operate.


Spark Components


Apache Spark’s Distributed Execution

Spark driver
As the part of the Spark application responsible for instantiating a SparkSession, the Spark driver has multiple roles: it communicates with the cluster manager; it requests resources (CPU, memory, etc.) from the cluster manager for Spark’s executors (JVMs); and it transforms all the Spark operations into DAG computations, schedules them, and distributes their execution as tasks across the Spark executors. Once the resources are allocated, it communicates directly with the executors.

SparkSession
In Spark 2.0, the SparkSession became a unified conduit to all Spark operations and data. Not only did it subsume previous entry points to Spark like the SparkContext, SQLContext, HiveContext, SparkConf, and StreamingContext, but it also made working with Spark simpler and easier

Cluster manager
The cluster manager is responsible for managing and allocating resources for the cluster of nodes on which your Spark application runs. Currently, Spark supports four cluster managers: the built-in standalone cluster manager, Apache Hadoop YARN, Apache Mesos, and Kubernetes.

Spark executor
A Spark executor runs on each worker node in the cluster. The executors communicate with the driver program and are responsible for executing tasks on the workers. In most deployments modes, only a single executor runs per node.

Deployment methods

Distributed data and partitions


Spark Shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
scala> val strings = spark.read.text("spark-3.2.1-bin-hadoop3.2/README.md")
strings: org.apache.spark.sql.DataFrame = [value: string]

scala> strings.show(10, false)
+--------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------+
|# Apache Spark |
| |
|Spark is a unified analytics engine for large-scale data processing. It provides|
|high-level APIs in Scala, Java, Python, and R, and an optimized engine that |
|supports general computation graphs for data analysis. It also supports a |
|rich set of higher-level tools including Spark SQL for SQL and DataFrames, |
|MLlib for machine learning, GraphX for graph processing, |
|and Structured Streaming for stream processing. |
| |
|<https://spark.apache.org/> |
+--------------------------------------------------------------------------------+
only showing top 10 rows


scala> strings.count()
res1: Long = 109
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
>>> strings = spark.read.text("spark-3.2.1-bin-hadoop3.2/README.md")
>>> strings.show(10, truncate=False)
+--------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------+
|# Apache Spark |
| |
|Spark is a unified analytics engine for large-scale data processing. It provides|
|high-level APIs in Scala, Java, Python, and R, and an optimized engine that |
|supports general computation graphs for data analysis. It also supports a |
|rich set of higher-level tools including Spark SQL for SQL and DataFrames, |
|MLlib for machine learning, GraphX for graph processing, |
|and Structured Streaming for stream processing. |
| |
|<https://spark.apache.org/> |
+--------------------------------------------------------------------------------+
only showing top 10 rows

>>> strings.count()
109
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> strings.show(10, truncate=True)
+--------------------+
| value|
+--------------------+
| # Apache Spark|
| |
|Spark is a unifie...|
|high-level APIs i...|
|supports general ...|
|rich set of highe...|
|MLlib for machine...|
|and Structured St...|
| |
|<https://spark.ap...|
+--------------------+
only showing top 10 rows

Every computation expressed in high-level Structured APIs is decomposed into low-level optimized and generated RDD operations and then converted into Scala bytecode for the executors’ JVMs. This generated RDD operation code is not accessible to users, nor is it the same as the user-facing RDD APIs.


Spark Application

Spark Application Concepts

Application

  • A user program built on Spark using its APIs. It consists of a driver program and executors on the cluster.

SparkSession

  • An object that provides a point of entry to interact with underlying Spark functionality and allows programming Spark with its APIs. In an interactive Spark shell, the Spark driver instantiates a SparkSession for you, while in a Spark application, you create a SparkSession object yourself.

Job

  • A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g., save(), collect()).

Stage

  • Each job gets divided into smaller sets of tasks called stages that depend on each other.

Task

  • A single unit of work or execution that will be sent to a Spark executor.

Spark Application and SparkSession

At the core of every Spark application is the Spark driver program, which creates a SparkSession object. When you’re working with a Spark shell, the driver is part of the shell and the SparkSession object (accessible via the variable spark) is created for you, as you saw in the earlier examples when you launched the shells.

In those examples, because you launched the Spark shell locally on your laptop, all the operations ran locally, in a single JVM. But you can just as easily launch a Spark shell to analyze data in parallel on a cluster as in local mode. The commands spark-shell –help or pyspark –help will show you how to connect to the Spark cluster man‐ ager. Figure 2-2 shows how Spark executes on a cluster once you’ve done this.

Once you have a SparkSession, you can program Spark using the APIs to perform Spark operations.


Spark Jobs

During interactive sessions with Spark shells, the driver converts your Spark applica‐ tion into one or more Spark jobs (Figure 2-3). It then transforms each job into a DAG. This, in essence, is Spark’s execution plan, where each node within a DAG could be a single or multiple Spark stages.


Spark Stage

As part of the DAG nodes, stages are created based on what operations can be per‐ formed serially or in parallel (Figure 2-4). Not all Spark operations can happen in a single stage, so they may be divided into multiple stages. Often stages are delineated on the operator’s computation boundaries, where they dictate data transfer among Spark executors.


Spark Tasks

Each stage is comprised of Spark tasks (a unit of execution), which are then federated across each Spark executor; each task maps to a single core and works on a single par‐ tition of data (Figure 2-5). As such, an executor with 16 cores can have 16 or more tasks working on 16 or more partitions in parallel, making the execution of Spark’s tasks exceedingly parallel!


Transformations, Actions, and Lazy Evaluation

Spark operations on distributed data can be classified into two types: transformations and actions. Transformations, as the name suggests, transform a Spark DataFrame into a new DataFrame without altering the original data, giving it the property of immutability. Put another way, an operation such as select() or filter() will not change the original DataFrame; instead, it will return the transformed results of the operation as a new DataFrame.

All transformations are evaluated lazily. That is, their results are not computed immediately, but they are recorded or remembered as a lineage. A recorded lineage allows Spark, at a later time in its execution plan, to rearrange certain transformations, coalesce them, or optimize transformations into stages for more efficient execution. Lazy evaluation is Spark’s strategy for delaying execution until an action is invoked or data is “touched” (read from or written to disk).
An action triggers the lazy evaluation of all the recorded transformations. In
Figure 2-6, all transformations T are recorded until the action A is invoked. Each transformation T produces a new DataFrame.

While lazy evaluation allows Spark to optimize your queries by peeking into your chained transformations, lineage and data immutability provide fault tolerance. Because Spark records each transformation in its lineage and the DataFrames are immutable between transformations, it can reproduce its original state by simply replaying the recorded lineage, giving it resiliency in the event of failures.
Table 2-1 lists some examples of transformations and actions.


Narrow and Wide Transformations

As noted, transformations are operations that Spark evaluates lazily. A huge advantage of the lazy evaluation scheme is that Spark can inspect your computational query and ascertain how it can optimize it. This optimization can be done by either joining or pipelining some operations and assigning them to a stage, or breaking them into stages by determining which operations require a shuffle or exchange of data across clusters.

Transformations can be classified as having either narrow dependencies or wide dependencies. Any transformation where a single output partition can be computed from a single input partition is a narrow transformation. For example, in the previous code snippet, filter() and contains() represent narrow transformations because they can operate on a single partition and produce the resulting output partition without any exchange of data.

However, groupBy() or orderBy() instruct Spark to perform wide transformations, where data from other partitions is read in, combined, and written to disk. Since each partition will have its own count of the word that contains the “Spark” word in its row of data, a count (groupBy()) will force a shuffle of data from each of the executor’s partitions across the cluster. In this transformation, orderBy() requires output from other partitions to compute the final aggregation.
Figure 2-7 illustrates the two types of dependencies.


The Spark UI

Spark includes a graphical user interface that you can use to inspect or monitor Spark applications in their various stages of decomposition—that is jobs, stages, and tasks. Depending on how Spark is deployed, the driver launches a web UI, running by default on port 4040, where you can view metrics and details such as:

  • A list of scheduler stages and tasks
  • A summary of RDD sizes and memory usage
  • Information about the environment
  • Information about the running executors
  • All the Spark SQL queries

In local mode, you can access this interface at http://<localhost>:4040 in a web browser.


Set environment variables

How to Set and List Environment Variables in Linux

  • env – The command allows you to run another program in a custom environment without modifying the current one. When used without an argument it will print a list of the current environment variables.

  • printenv – The command prints all or the specified environment variables.

  • set – The command sets or unsets shell variables. When used without an argument it will print a list of all variables including environment and shell variables, and shell functions.

  • unset – The command deletes shell and environment variables.

  • export – The command sets environment variables.

  • /etc/environment - Use this file to set up system-wide environment variables.

  • /etc/profile - Variables set in this file are loaded whenever a bash login shell is entered. ```

  • Per-user shell specific configuration files. For example, if you are using Bash, you can declare the variables in the ~/.bashrc:

Set your SPARK_HOME environment variable to the root-level directory where you installed Spark on your local machine.

1
2
➜  spark-3.2.1-bin-hadoop3.2 pwd
/Users/zacks/Git/Data Science Projects/spark-3.2.1-bin-hadoop3.2
1
2
3
➜  spark-3.2.1-bin-hadoop3.2 echo 'export SPARK_HOME="/Users/zacks/Git/Data Science Projects/spark-3.2.1-bin-hadoop3.2"' >> ~/.zshrc

➜ spark-3.2.1-bin-hadoop3.2 source ~/.zshrc

Review ~/.zshrc

1
2
➜  spark-3.2.1-bin-hadoop3.2 tail -1 ~/.zshrc
export SPARK_HOME="/Users/zacks/Git/Data Science Projects/spark-3.2.1-bin-hadoop3.2"

To avoid having verbose INFO messages printed to the console, copy the log4j.properties.template file to log4j.properties and set log4j.rootCategory=WARN in the conf/log4j.properties file.

Content in log4j.properties.template

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
➜  spark-3.2.1-bin-hadoop3.2 cat conf/log4j.properties.template 
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell/spark-sql log level to WARN. When running the
# spark-shell/spark-sql, the log level for these classes is used to overwrite
# the root logger's log level, so that the user can have different defaults
# for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
log4j.logger.org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

# For deploying Spark ThriftServer
# SPARK-34128:Suppress undesirable TTransportException warnings involved in THRIFT-4805
log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter
log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message
log4j.appender.console.filter.1.AcceptOnMatch=false
1
➜  spark-3.2.1-bin-hadoop3.2 cp conf/log4j.properties.template conf/log4j.properties

After edit log4j.rootCategory in log4j.properties

1
2
➜  spark-3.2.1-bin-hadoop3.2 cat conf/log4j.properties | grep log4j.rootCategory
log4j.rootCategory=WARN, console

Spark Job

mnmcount.py

Under LearningSparkV2/chapter2/py/src

1
2
➜  src git:(dev) ✗ source ~/.zshrc
➜ src git:(dev) ✗ $SPARK_HOME/bin/spark-submit mnmcount.py data/mnm_dataset.csv

Output:

1st table: head of df
2nd table: sum Count group by State, Color
3rd table: sum Count group by State, Color where Color = 'CA'

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/zacks/Git/Data%20Science%20Projects/spark-3.2.1-bin-hadoop3.2/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
22/02/17 22:34:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+-----+------+-----+
|State|Color |Count|
+-----+------+-----+
|TX |Red |20 |
|NV |Blue |66 |
|CO |Blue |79 |
|OR |Blue |71 |
|WA |Yellow|93 |
+-----+------+-----+
only showing top 5 rows

+-----+------+----------+
|State|Color |sum(Count)|
+-----+------+----------+
|CA |Yellow|100956 |
|WA |Green |96486 |
|CA |Brown |95762 |
|TX |Green |95753 |
|TX |Red |95404 |
|CO |Yellow|95038 |
|NM |Red |94699 |
|OR |Orange|94514 |
|WY |Green |94339 |
|NV |Orange|93929 |
|TX |Yellow|93819 |
|CO |Green |93724 |
|CO |Brown |93692 |
|CA |Green |93505 |
|NM |Brown |93447 |
|CO |Blue |93412 |
|WA |Red |93332 |
|WA |Brown |93082 |
|WA |Yellow|92920 |
|NM |Yellow|92747 |
|NV |Brown |92478 |
|TX |Orange|92315 |
|AZ |Brown |92287 |
|AZ |Green |91882 |
|WY |Red |91768 |
|AZ |Orange|91684 |
|CA |Red |91527 |
|WA |Orange|91521 |
|NV |Yellow|91390 |
|UT |Orange|91341 |
|NV |Green |91331 |
|NM |Orange|91251 |
|NM |Green |91160 |
|WY |Blue |91002 |
|UT |Red |90995 |
|CO |Orange|90971 |
|AZ |Yellow|90946 |
|TX |Brown |90736 |
|OR |Blue |90526 |
|CA |Orange|90311 |
|OR |Red |90286 |
|NM |Blue |90150 |
|AZ |Red |90042 |
|NV |Blue |90003 |
|UT |Blue |89977 |
|AZ |Blue |89971 |
|WA |Blue |89886 |
|OR |Green |89578 |
|CO |Red |89465 |
|NV |Red |89346 |
|UT |Yellow|89264 |
|OR |Brown |89136 |
|CA |Blue |89123 |
|UT |Brown |88973 |
|TX |Blue |88466 |
|UT |Green |88392 |
|OR |Yellow|88129 |
|WY |Orange|87956 |
|WY |Yellow|87800 |
|WY |Brown |86110 |
+-----+------+----------+

Total Rows = 60
+-----+------+----------+
|State|Color |sum(Count)|
+-----+------+----------+
|CA |Yellow|100956 |
|CA |Brown |95762 |
|CA |Green |93505 |
|CA |Red |91527 |
|CA |Orange|90311 |
|CA |Blue |89123 |
+-----+------+----------+

Spark’s Structured APIs

Spark: What’s Underneath an RDD?

The RDD is the most basic abstraction in Spark. There are three vital characteristics associated with an RDD:

  • Dependencies
  • Partitions (with some locality information)
  • Compute function: Partition => Iterator[T]

All three are integral to the simple RDD programming API model upon which all higher-level functionality is constructed. First, a list of dependencies that instructs Spark how an RDD is constructed with its inputs is required. When necessary to reproduce results, Spark can recreate an RDD from these dependencies and replicate operations on it. This characteristic gives RDDs resiliency.

Second, partitions provide Spark the ability to split the work to parallelize computation on partitions across executors. In some cases—for example, reading from HDFS—Spark will use locality information to send work to executors close to the data. That way less data is transmitted over the network.

And finally, an RDD has a compute function that produces an Iterator[T] for the data that will be stored in the RDD.


Structuring Spark

Key Metrics and Benefits

Structure yields a number of benefits, including better performance and space efficiency across Spark components. We will explore these benefits further when we talk about the use of the DataFrame and Dataset APIs shortly, but for now we’ll concentrate on the other advantages: expressivity, simplicity, composability, and uniformity.

Let’s demonstrate expressivity and composability first, with a simple code snippet. In the following example, we want to aggregate all the ages for each name, group by name, and then average the ages—a common pattern in data analysis and discovery. If we were to use the low-level RDD API for this, the code would look as follows:

1
2
3
4
5
6
7
8
9
10
# In Python
# Create an RDD of tuples (name, age)
dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)])
# Use map and reduceByKey transformations with their lambda
# expressions to aggregate and then compute average

agesRDD = (dataRDD
.map(lambda x: (x[0], (x[1], 1)))
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
.map(lambda x: (x[0], x[1][0]/x[1][1])))

No one would dispute that this code, which tells Spark how to aggregate keys and compute averages with a string of lambda functions, is cryptic and hard to read. In other words, the code is instructing Spark how to compute the query. It’s completely opaque to Spark, because it doesn’t communicate the intention. Furthermore, the equivalent RDD code in Scala would look very different from the Python code shown here.

By contrast, what if we were to express the same query with high-level DSL operators and the DataFrame API, thereby instructing Spark what to do? Have a look:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# In Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
# Create a DataFrame using SparkSession
spark = (SparkSession
.builder
.appName("AuthorsAges")
.getOrCreate())

# Create a DataFrame
data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)], ["name", "age"])

# Group the same names together, aggregate their ages, and compute an average
avg_df = data_df.groupBy("name").agg(avg("age"))

# Show the results of the final execution
avg_df.show()
+------+--------+
| name|avg(age)|
+------+--------+
|Brooke| 22.5|
| Denny| 31.0|
| Jules| 30.0|
| TD| 35.0|
+------+--------+

The DataFrame API


Spark’s Basic Data Types

Data Types


Spark’s Structured and Complex Data Types


Schemas and Creating DataFrames

A schema in Spark defines the column names and associated data types for a DataFrame.

Defining a schema up front as opposed to taking a schema-on-read approach offers three benefits:

  • You relieve Spark from the onus of inferring data types.
  • You prevent Spark from creating a separate job just to read a large portion of your file to ascertain the schema, which for a large data file can be expensive and time-consuming.
  • You can detect errors early if data doesn’t match the schema.

So, we encourage you to always define your schema up front whenever you want to read a large file from a data source.

Two ways to define a schema

Spark allows you to define a schema in two ways. One is to define it programmatically, and the other is to employ a Data Definition Language (DDL) string, which is much simpler and easier to read.

To define a schema programmatically for a DataFrame with three named columns, author, title, and pages, you can use the Spark DataFrame API. For example:

1
2
3
4
5
# In Python
from pyspark.sql.types import *
schema = StructType([StructField("author", StringType(), False),
StructField("title", StringType(), False),
StructField("pages", IntegerType(), False)])

Defining the same schema using DDL is much simpler:

1
2
# In Python
schema = "author STRING, title STRING, pages INT"

You can choose whichever way you like to define a schema. For many examples, we will use both:

Example-3_6-DataFrame_API.ipynb
Example-3_6-DDL.ipynb

1
2
3
4
5
6
7
8
9
# define schema by using Spark DataFrame API
schema = StructType([
StructField("Id", IntegerType(), False),
StructField("First", StringType(), False),
StructField("Last", StringType(), False),
StructField("Url", StringType(), False),
StructField("Published", StringType(), False),
StructField("Hits", IntegerType(), False),
StructField("Campaigns", ArrayType(StringType()), False)])
1
2
3
4
# define schema by using Definition Language (DDL)
schema = "Id INT, First STRING, Last STRING, Url STRING, Published STRING, Hits INT, Campaigns ARRAY<STRING>"
# avoid reserved word conflict
# schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

Columns and Expressions

Named columns in DataFrames are conceptually similar to named columns in pandas or R DataFrames or in an RDBMS table: they describe a type of field. You can list all the columns by their names, and you can perform operations on their values using relational or computational expressions. In Spark’s supported languages, columns are objects with public methods (represented by the Column type).

You can also use logical or mathematical expressions on columns. For example, you could create a simple expression using expr("columnName * 5") or (expr("columnName - 5") > col(anothercolumnName)), where columnName is a Spark type (integer, string, etc.). expr() is part of the pyspark.sql.functions (Python) and org.apache.spark.sql.functions (Scala) packages. Like any other function in those packages, expr() takes arguments that Spark will parse as an expression, computing the result.

Scala, Java, and Python all have public methods associated with columns. You’ll note that the Spark documentation refers to both col and Column. Column is the name of the object, while col() is a standard built-in function that returns a Column.

Example-3_7.ipynb

We have only scratched the surface here, and employed just a couple of methods on Column objects. For a complete list of all public methods for Column objects, we refer you to the Spark documentation.

Column objects in a DataFrame can’t exist in isolation; each column is part of a row in a record and all the rows together constitute a DataFrame, which as we will see later in the chapter is really a Dataset[Row] in Scala.


Rows

A row in Spark is a generic Row object, containing one or more columns. Each column may be of the same data type (e.g., integer or string), or they can have different types (integer, string, map, array, etc.). Because Row is an object in Spark and an ordered collection of fields, you can instantiate a Row in each of Spark’s supported languages and access its fields by an index starting at 0:

1
2
3
4
5
>>> from pyspark.sql import Row
>>> blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015", ["twitter", "LinkedIn"])
>>> # access using index for individual items
>>> blog_row[1]
'Reynold'

Row objects can be used to create DataFrames if you need them for quick interactivity and exploration:

1
2
3
4
5
6
7
8
9
>>> rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
>>> authors_df = spark.createDataFrame(rows, ["Authors", "State"])
>>> authors_df.show()
+-------------+-----+
| Authors|State|
+-------------+-----+
|Matei Zaharia| CA|
| Reynold Xin| CA|
+-------------+-----+

Common DataFrame Operations

To perform common data operations on DataFrames, you’ll first need to load a Data‐ Frame from a data source that holds your structured data. Spark provides an inter‐ face, DataFrameReader, that enables you to read data into a DataFrame from myriad data sources in formats such as JSON, CSV, Parquet, Text, Avro, ORC, etc. Likewise, to write a DataFrame back to a data source in a particular format, Spark uses DataFrameWriter.

Using DataFrameReader and DataFrameWriter

Dataset: Fire-Incidents

By defining a schema and use DataFrameReader class and its methods to tell Spark what to do, it’s more efficient to define a schema than have Spark infer it.

  • inferSchema

If you don’t want to specify the schema, Spark can infer schema from a sample at a lesser cost. For example, you can use the samplingRatio option(under chapter3 directory):

1
2
3
4
5
sampleDF = (spark
.read
.option("sampleingRatio", 0.001)
.option("header", True)
.csv("data/sf-fire-calls.csv"))

The following example shows how to read DataFrame with a schema:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from pyspark.sql.types import *

# Programmatic way to define a schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
StructField('UnitID', StringType(), True),
StructField('IncidentNumber', IntegerType(), True),
StructField('CallType', StringType(), True),
StructField('CallDate', StringType(), True),
StructField('WatchDate', StringType(), True),
StructField('CallFinalDisposition', StringType(), True),
StructField('AvailableDtTm', StringType(), True),
StructField('Address', StringType(), True),
StructField('City', StringType(), True),
StructField('Zipcode', IntegerType(), True),
StructField('Battalion', StringType(), True),
StructField('StationArea', StringType(), True),
StructField('Box', StringType(), True),
StructField('OriginalPriority', StringType(), True),
StructField('Priority', StringType(), True),
StructField('FinalPriority', IntegerType(), True),
StructField('ALSUnit', BooleanType(), True),
StructField('CallTypeGroup', StringType(), True),
StructField('NumAlarms', IntegerType(), True),
StructField('UnitType', StringType(), True),
StructField('UnitSequenceInCallDispatch', IntegerType(), True),
StructField('FirePreventionDistrict', StringType(), True),
StructField('SupervisorDistrict', StringType(), True),
StructField('Neighborhood', StringType(), True),
StructField('Location', StringType(), True),
StructField('RowID', StringType(), True),
StructField('Delay', FloatType(), True)])

fire_df = (spark
.read
.csv("data/sf-fire-calls.csv", header=True, schema=fire_schema))

The spark.read.csv() function reads in the CSV file and returns a DataFrame of rows and named columns with the types dictated in the schema.

To write the DataFrame into an external data source in your format of choice, you can use the DataFrameWriter interface. Like DataFrameReader, it supports multiple data sources. Parquet, a popular columnar format, is the default format; it uses snappy compression to compress the data. If the DataFrame is written as Parquet, the schema is preserved as part of the Parquet metadata. In this case, subsequent reads back into a DataFrame do not require you to manually supply a schema.

Saving a DataFrame as a Parquet file or SQL table. A common data operation is to explore and transform your data, and then persist the DataFrame in Parquet format or save it as a SQL table. Persisting a transformed DataFrame is as easy as reading it. For exam‐ ple, to persist the DataFrame we were just working with as a file after reading it you would do the following:

1
2
3
# In Python to save as Parquet files
parquet_path = "..."
fire_df.write.format("parquet").save(parquet_path)

Alternatively, you can save it as a table, which registers metadata with the Hive metastore (we will cover SQL managed and unmanaged tables, metastores, and Data-Frames in the next chapter):

1
2
3
# In Python to save as a Hive metastore
parquet_table = "..." # name of the table
fire_df.write.format("parquet").saveAsTable(parquet_table)

Transformations and actions

Example-3_8.ipynb

Projections and filters

Projections and filters. A projection in relational parlance is a way to return only the rows matching a certain relational condition by using filters. In Spark, projections are done with the select() method, while filters can be expressed using the filter() or where() method. We can use this technique to examine specific aspects of our SF Fire Department data set:

1
2
3
4
fire_parquet.select("IncidentNumber", "AvailableDtTm", "CallType") \
.where(col("CallType") != "Medical Incident") \
.orderBy("IncidentNumber") \
.show(5, truncate=False)
1
2
3
4
5
6
7
8
9
10
+--------------+----------------------+-----------------------------+
|IncidentNumber|AvailableDtTm |CallType |
+--------------+----------------------+-----------------------------+
|30636 |04/12/2000 10:18:53 PM|Alarms |
|30773 |04/13/2000 10:34:32 AM|Citizen Assist / Service Call|
|30781 |04/13/2000 10:53:48 AM|Alarms |
|30840 |04/13/2000 01:39:00 PM|Structure Fire |
|30942 |04/13/2000 07:42:53 PM|Outside Fire |
+--------------+----------------------+-----------------------------+
only showing top 5 rows

What if we want to know how many distinct CallTypes were recorded as the causes of the fire calls? These simple and expressive queries do the job:

1
2
3
4
5
(fire_parquet
.select("CallType")
.where(col("CallType").isNotNull())
.agg(countDistinct("CallType").alias("DistinctCallTypes"))
.show())
1
2
3
4
5
+-----------------+
|DistinctCallTypes|
+-----------------+
| 30|
+-----------------+

We can list the distinct call types in the data set using these queries:

1
2
3
4
5
6
# In Python, filter for only distinct non-null CallTypes from all the rows
(fire_parquet
.select("CallType")
.where(col("CallType").isNotNull())
.distinct()
.show(10, False))
Renaming, adding, and dropping columns

Renaming, adding, and dropping columns. Sometimes you want to rename particular columns for reasons of style or convention, and at other times for readability or brevity. The original column names in the SF Fire Department data set had spaces in them. For example, the column name IncidentNumber was Incident Number. Spaces in column names can be problematic, especially when you want to write or save a DataFrame as a Parquet file (which prohibits this).

By specifying the desired column names in the schema with StructField, as we did, we effectively changed all names in the resulting DataFrame.

Alternatively, you could selectively rename columns with the withColumnRenamed() method. For instance, let’s change the name of our Delay column to ResponseDelayedinMins and take a look at the response times that were longer than five minutes:

1
2
3
4
5
6
7
8
9
# create a new dataframe new_fire_parquet from fire_parquet
# rename "Delay" with "ResponseDelayedinMins" in new_fire_parquet
new_fire_parquet = fire_parquet.withColumnRenamed("Delay",
"ResponseDelayedinMins")
# select ResponseDelayedinMins > 5 mins
(new_fire_parquet
.select("ResponseDelayedinMins")
.where(col("ResponseDelayedinMins") > 5)
.show(5, False))
1
2
3
4
5
6
7
8
9
10
+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.0833335 |
|7.2166667 |
|8.666667 |
|5.7166667 |
|16.016666 |
+---------------------+
only showing top 5 rows

Because DataFrame transformations are immutable, when we rename a column using withColumnRenamed() we get a new DataFrame while retaining the original with the old column name.

Modifying the contents of a column or its type are common operations during data exploration. In some cases the data is raw or dirty, or its types are not amenable to being supplied as arguments to relational operators. For example, in our SF Fire Department data set, the columns CallDate, WatchDate, and AlarmDtTm are strings rather than either Unix timestamps or SQL dates, both of which Spark supports and can easily manipulate during transformations or actions (e.g., during a date- or time- based analysis of the data).

So how do we convert them into a more usable format? It’s quite simple, thanks to some high-level API methods. spark.sql.functions has a set of to/from date/timestamp functions such as to_timestamp() and to_date() that we can use for just this purpose:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# transform CallDate from "MM/dd/yyyy" to timestamp with name IncidentDate and drop CallDate
# transform WatchDate from "MM/dd/yyyy" to timestamp with name OnWatchDate and drop WatchDate
# transform AvailableDtTm from "MM/dd/yyyy hh:mm:ss a" to timestamp with name AvailableDtTS and drop AvailableDtTm
fire_ts_parquet = (new_fire_parquet
.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
.drop("CallDate")
.withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
.drop("WatchDate")
.withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),
"MM/dd/yyyy hh:mm:ss a"))
.drop("AvailableDtTm"))
# Select the converted columns
(fire_ts_parquet
.select("IncidentDate", "OnWatchDate", "AvailableDtTS")
.show(5, False))
  1. Convert the existing column’s data type from string to a Spark-supported timestamp.
  2. Use the new format specified in the format string “MM/dd/yyyy” or “MM/dd/yyyy hh:mm:ss a” where appropriate.
  3. After converting to the new data type, drop() the old column and append the new one specified in the first argument to the withColumn() method.
  4. Assign the new modified DataFrame to fire_ts_parquet.
1
2
3
4
5
6
7
8
9
10
+-------------------+-------------------+-------------------+
|IncidentDate |OnWatchDate |AvailableDtTS |
+-------------------+-------------------+-------------------+
|2000-07-03 00:00:00|2000-07-02 00:00:00|2000-07-03 07:38:35|
|2000-07-03 00:00:00|2000-07-02 00:00:00|2000-07-03 07:05:25|
|2000-07-03 00:00:00|2000-07-02 00:00:00|2000-07-03 08:03:06|
|2000-07-03 00:00:00|2000-07-03 00:00:00|2000-07-03 09:01:03|
|2000-07-03 00:00:00|2000-07-03 00:00:00|2000-07-03 11:26:57|
+-------------------+-------------------+-------------------+
only showing top 5 rows

Now that we have modified the dates, we can query using functions from spark.sql.functions like month(), year(), and day() to explore our data further. We could find out how many calls were logged in the last seven days, or we could see how many years’ worth of Fire Department calls are included in the data set with this query:

1
2
3
4
5
6
# select distinct year(IncidentDate) from fire_ts_parquet order by 1
(fire_ts_parquet
.select(year("IncidentDate"))
.distinct()
.orderBy(year("IncidentDate"))
.show())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
+------------------+
|year(IncidentDate)|
+------------------+
| 2000|
| 2001|
| 2002|
| 2003|
| 2004|
| 2005|
| 2006|
| 2007|
| 2008|
| 2009|
| 2010|
| 2011|
| 2012|
| 2013|
| 2014|
| 2015|
| 2016|
| 2017|
| 2018|
+------------------+
Aggregations

A handful of transformations and actions on DataFrames, such as groupBy(), orderBy(), and count(), offer the ability to aggregate by column names and then aggregate counts across them.

For larger DataFrames on which you plan to conduct frequent or repeated queries, you could benefit from caching. We will cover DataFrame caching strategies and their benefits in later chapters.

1
2
3
4
5
6
7
8
# select CallType, count(*) as `count` from fire_ts_parquet where CallType is not null group by CallType
(fire_ts_parquet
.select("CallType")
.where(col("CallType").isNotNull())
.groupBy("CallType")
.count()
.orderBy("count", ascending=False)
.show(n=10, truncate=False))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+-------------------------------+------+
|CallType |count |
+-------------------------------+------+
|Medical Incident |113794|
|Structure Fire |23319 |
|Alarms |19406 |
|Traffic Collision |7013 |
|Citizen Assist / Service Call |2524 |
|Other |2166 |
|Outside Fire |2094 |
|Vehicle Fire |854 |
|Gas Leak (Natural and LP Gases)|764 |
|Water Rescue |755 |
+-------------------------------+------+
only showing top 10 rows

The DataFrame API also offers the collect() method, but for extremely large DataFrames this is resource-heavy (expensive) and dangerous, as it can cause out-of-memory (OOM) exceptions. Unlike count(), which returns a single number to the driver, collect() returns a collection of all the Row objects in the entire DataFrame or Dataset. If you want to take a peek at some Row records you’re better off with take(n), which will return only the first n Row objects of the DataFrame.


Other common DataFrame operations

Other common DataFrame operations. Along with all the others we’ve seen, the Data‐ Frame API provides descriptive statistical methods like min(), max(), sum(), and avg(). Let’s take a look at some examples showing how to compute them with our SF Fire Department data set.

Here we compute the sum of alarms, the average response time, and the minimum and maximum response times to all fire calls in our data set, importing the PySpark functions in a Pythonic way so as not to conflict with the built-in Python functions:

1
2
3
4
5
6
7
8
import pyspark.sql.functions as F

(fire_ts_parquet
.select(F.sum("NumAlarms"),
F.avg("ResponseDelayedinMins"),
F.min("ResponseDelayedinMins"),
F.max("ResponseDelayedinMins"))
.show())
1
2
3
4
5
+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
| 176170| 3.892364154521585| 0.016666668| 1844.55|
+--------------+--------------------------+--------------------------+--------------------------+

For more advanced statistical needs common with data science workloads, read the API documentation for methods like stat(), describe(), correlation(), covariance(), sampleBy(), approxQuantile(), frequentItems(), and so on.

As you can see, it’s easy to compose and chain expressive queries with DataFrames’ high-level API and DSL operators. We can’t imagine the opacity and comparative unreadability of the code if we were to try to do the same with RDDs!


End-to-End DataFrame example

There are many possibilities for exploratory data analysis, ETL, and common data operations on the San Francisco Fire Department public data set, above and beyond what we’ve shown here.
For brevity we won’t include all the example code here, but the book’s GitHub repo provides Python and Scala notebooks for you to try to complete an end-to-end Data‐ Frame example using this data set. The notebooks explore and answer the following common questions that you might ask, using the DataFrame API and DSL relational operators:

  • What were all the different types of fire calls in 2018?
  • What months within the year 2018 saw the highest number of fire calls?
  • Which neighborhood in San Francisco generated the most fire calls in 2018?
  • Which neighborhoods had the worst response times to fire calls in 2018?
  • Which week in the year in 2018 had the most fire calls?
  • Is there a correlation between neighborhood, zip code, and number of fire calls?
  • How can we use Parquet files or SQL tables to store this data and read it back?

So far we have extensively discussed the DataFrame API, one of the Structured APIs that span Spark’s MLlib and Structured Streaming components, which we cover later in the book.

Next, we’ll shift our focus to the Dataset API and explore how the two APIs provide a unified, structured interface to developers for programming Spark. We’ll then examine the relationship between the RDD, DataFrame, and Dataset APIs, and help you determine when to use which API and why.


The Dataset API

Spark 2.0 unified the DataFrame and Dataset APIs as Structured APIs with similar interfaces so that developers would only have to learn a single set of APIs. Datasets take on two characteristics: typed and untyped APIs, as shown in Figure 3-1.

Conceptually, you can think of a DataFrame in Scala as an alias for a collection of generic objects, Dataset[Row], where a Row is a generic untyped JVM object that may hold different types of fields. A Dataset, by contrast, is a collection of strongly typed JVM objects in Scala or a class in Java. Or, as the Dataset documentation puts it, a Dataset is:

a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each Dataset [in Scala] also has an untyped view called a DataFrame, which is a Dataset of Row.

Typed Objects, Untyped Objects, and Generic Rows

In Spark’s supported languages, Datasets make sense only in Java and Scala, whereas in Python and R only DataFrames make sense. This is because Python and R are not compile-time type-safe; types are dynamically inferred or assigned during execution, not during compile time. The reverse is true in Scala and Java: types are bound to variables and objects at compile time. In Scala, however, a DataFrame is just an alias for untyped Dataset[Row]. Table 3-6 distills it in a nutshell.

SKIP for now since Dataset API only supports JAVA and Scala.


DataFrames Versus Datasets

By now you may be wondering why and when you should use DataFrames or Data‐ sets. In many cases either will do, depending on the languages you are working in, but there are some situations where one is preferable to the other. Here are a few examples:

  • If you want to tell Spark what to do, not how to do it, use DataFrames or Datasets.
  • If you want rich semantics, high-level abstractions, and DSL operators, use DataFrames or Datasets.
  • If you want strict compile-time type safety and don’t mind creating multiple case classes for a specific Dataset[T], use Datasets.
  • If your processing demands high-level expressions, filters, maps, aggregations, computing averages or sums, SQL queries, columnar access, or use of relational operators on semi-structured data, use DataFrames or Datasets.
  • If your processing dictates relational transformations similar to SQL-like queries, use DataFrames.
  • If you want to take advantage of and benefit from Tungsten’s efficient serialization with Encoders, use Datasets.
  • If you want unification, code optimization, and simplification of APIs across Spark components, use DataFrames.
  • If you are an R user, use DataFrames.
  • If you are a Python user, use DataFrames and drop down to RDDs if you need more control.
  • If you want space and speed efficiency, use DataFrames.
  • If you want errors caught during compilation rather than at runtime, choose the
    appropriate API as depicted in Figure 3-2.


When to User RDDs

You may ask: Are RDDs being relegated to second-class citizens? Are they being deprecated? The answer is a resounding no! The RDD API will continue to be supported, although all future development work in Spark 2.x and Spark 3.0 will continue to have a DataFrame interface and semantics rather than using RDDs.

There are some scenarios where you’ll want to consider using RDDs, such as when you:

  • Are using a third-party package that’s written using RDDs
  • Can forgo the code optimization, efficient space utilization, and performance benefits available with DataFrames and Datasets
  • Want to precisely instruct Spark how to do a query

What’s more, you can seamlessly move between DataFrames or Datasets and RDDs at will using a simple API method call, df.rdd. (Note, however, that this does have a cost and should be avoided unless necessary.) After all, DataFrames and Datasets are built on top of RDDs, and they get decomposed to compact RDD code during whole-stage code generation.


Spark SQL and the Underlying Engine

At a programmatic level, Spark SQL allows developers to issue ANSI SQL:2003–compatible queries on structured data with a schema. Since its introduction in Spark 1.3, Spark SQL has evolved into a substantial engine upon which many high-level structured functionalities have been built. Apart from allowing you to issue SQL-like queries on your data, the Spark SQL engine:

  • Unifies Spark components and permits abstraction to DataFrames/Datasets in Java, Scala, Python, and R, which simplifies working with structured data sets.
  • Connects to the Apache Hive metastore and tables.
  • Reads and writes structured data with a specific schema from structured file formats (JSON, CSV, Text, Avro, Parquet, ORC, etc.) and converts data into temporary tables.
  • Offers an interactive Spark SQL shell for quick data exploration.
  • Provides a bridge to (and from) external tools via standard database JDBC/ ODBC connectors.
  • Generates optimized query plans and compact code for the JVM, for final execution.

Figure 3-3 shows the components that Spark SQL interacts with to achieve all of this.

At the core of the Spark SQL engine are the Catalyst optimizer and Project Tungsten. Together, these support the high-level DataFrame and Dataset APIs and SQL queries.

The Catalyst Optimizer

The Catalyst optimizer takes a computational query and converts it into an execution plan. It goes through four transformational phases, as shown in Figure 3-4:

  1. Analysis
  2. Logical optimization
  3. Physical planning
  4. Code generation

For example, consider one of the queries from our M&Ms example in Chapter 2. Both of the following sample code blocks will go through the same process, eventually ending up with a similar query plan and identical bytecode for execution. That is, regardless of the language you use, your computation undergoes the same journey and the resulting bytecode is likely the same:

1
2
3
4
5
6
7
# In Python
count_mnm_df = (mnm_df
.select("State", "Color", "Count")
.groupBy("State", "Color")
.agg(count("Count")
.alias("Total"))
.orderBy("Total", ascending=False))
1
2
3
4
5
-- In SQL
SELECT State, Color, Count, sum(Count) AS Total
FROM MNM_TABLE_NAME
GROUP BY State, Color, Count
ORDER BY Total DESC

To see the different stages the Python code goes through, you can use the count_mnm_df.explain(True) method on the DataFrame. Or, to get a look at the different logical and physical plans, in Scala you can call df.queryExecution.logical or df.queryExecution.optimizedPlan. (In Chapter 7, we will discuss more about tuning and debugging Spark and how to read query plans.) This gives us the follow‐ ing output:

Under directory chapter2/py/src:

1
pyspark
1
2
3
4
5
6
7
8
9
10
import pyspark.sql.functions as F

mnm_df = spark.read.csv("data/mnm_dataset.csv", header=True)
count_mnm_df = (mnm_df
.select("State", "Color", "Count")
.groupBy("State", "Color")
.agg(F.count("Count")
.alias("Total"))
.orderBy("Total", ascending=False))
count_mnm_df.show(5)
1
2
3
4
5
6
7
8
9
10
+-----+------+-----+
|State| Color|Total|
+-----+------+-----+
| CA|Yellow| 1807|
| WA| Green| 1779|
| OR|Orange| 1743|
| TX| Green| 1737|
| TX| Red| 1725|
+-----+------+-----+
only showing top 5 rows
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
== Parsed Logical Plan ==
'Sort ['Total DESC NULLS LAST], true
+- Aggregate [State#201, Color#202], [State#201, Color#202, count(Count#203) AS Total#214L]
+- Project [State#201, Color#202, Count#203]
+- Relation [State#201,Color#202,Count#203] csv

== Analyzed Logical Plan ==
State: string, Color: string, Total: bigint
Sort [Total#214L DESC NULLS LAST], true
+- Aggregate [State#201, Color#202], [State#201, Color#202, count(Count#203) AS Total#214L]
+- Project [State#201, Color#202, Count#203]
+- Relation [State#201,Color#202,Count#203] csv

== Optimized Logical Plan ==
Sort [Total#214L DESC NULLS LAST], true
+- Aggregate [State#201, Color#202], [State#201, Color#202, count(Count#203) AS Total#214L]
+- Relation [State#201,Color#202,Count#203] csv

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [Total#214L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(Total#214L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#259]
+- HashAggregate(keys=[State#201, Color#202], functions=[count(Count#203)], output=[State#201, Color#202, Total#214L])
+- Exchange hashpartitioning(State#201, Color#202, 200), ENSURE_REQUIREMENTS, [id=#256]
+- HashAggregate(keys=[State#201, Color#202], functions=[partial_count(Count#203)], output=[State#201, Color#202, count#228L])
+- FileScan csv [State#201,Color#202,Count#203] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/zacks/Git/Data Science Projects/LearningSparkV2/chapter2/p..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<State:string,Color:string,Count:string>

After going through an initial analysis phase, the query plan is transformed and rear-ranged by the Catalyst optimizer as shown in Figure 3-5.

Let’s go through each of the four query optimization phases..

Phase 1: Analysis
The Spark SQL engine begins by generating an abstract syntax tree (AST) for the SQL or DataFrame query. In this initial phase, any columns or table names will be resolved by consulting an internal Catalog, a programmatic interface to Spark SQL that holds a list of names of columns, data types, functions, tables, databases, etc. Once they’ve all been successfully resolved, the query proceeds to the next phase.

Phase 2: Logical optimization
As Figure 3-4 shows, this phase comprises two internal stages. Applying a standard- rule based optimization approach, the Catalyst optimizer will first construct a set of multiple plans and then, using its cost-based optimizer (CBO), assign costs to each plan. These plans are laid out as operator trees (like in Figure 3-5); they may include, for example, the process of constant folding, predicate pushdown, projection prun‐ ing, Boolean expression simplification, etc. This logical plan is the input into the physical plan.

Phase 3: Physical planning
In this phase, Spark SQL generates an optimal physical plan for the selected logical plan, using physical operators that match those available in the Spark execution engine.

Phase 4: Code generation
The final phase of query optimization involves generating efficient Java bytecode to run on each machine. Because Spark SQL can operate on data sets loaded in memory, Spark can use state-of-the-art compiler technology for code generation to speed up execution. In other words, it acts as a compiler. Project Tungsten, which facilitates whole-stage code generation, plays a role here.

Just what is whole-stage code generation? It’s a physical query optimization phase that collapses the whole query into a single function, getting rid of virtual function calls and employing CPU registers for intermediate data. The second-generation Tungsten engine, introduced in Spark 2.0, uses this approach to generate compact RDD code for final execution. This streamlined strategy significantly improves CPU efficiency and performance.


Summary

In this chapter, we took a deep dive into Spark’s Structured APIs, beginning with a look at the history and merits of structure in Spark.

Through illustrative common data operations and code examples, we demonstrated that the high-level DataFrame and Dataset APIs are far more expressive and intuitive than the low-level RDD API. Designed to make processing of large data sets easier, the Structured APIs provide domain-specific operators for common data operations, increasing the clarity and expressiveness of your code.
We explored when to use RDDs, DataFrames, and Datasets, depending on your use case scenarios.

And finally, we took a look under the hood to see how the Spark SQL engine’s main components—the Catalyst optimizer and Project Tungsten—support structured high- level APIs and DSL operators. As you saw, no matter which of the Spark-supported languages you use, a Spark query undergoes the same optimization journey, from log‐ ical and physical plan construction to final compact code generation.

The concepts and code examples in this chapter have laid the groundwork for the next two chapters, in which we will further illustrate the seamless interoperability between DataFrames, Datasets, and Spark SQL.


Spark SQL and DataFrames: Introduction to Built-in Data Sources

In particular, Spark SQL:

  • Provides the engine upon which the high-level Structured APIs we explored in Chapter 3 are built.
  • Can read and write data in a variety of structured formats (e.g., JSON, Hive tables, Parquet, Avro, ORC, CSV).
  • Lets you query data using JDBC/ODBC connectors from external business intelligence (BI) data sources such as Tableau, Power BI, Talend, or from RDBMSs such as MySQL and PostgreSQL.
  • Provides a programmatic interface to interact with structured data stored as tables or views in a database from a Spark application.
  • Offers an interactive shell to issue SQL queries on your structured data.
  • Supports ANSI SQL:2003-compliant commands and HiveQL.


Using Spark SQL in Spark Applications

The SparkSession provides a unified entry point for programming Spark with the Structured APIs. You can use a SparkSession to access Spark functionality: just import the class and create an instance in your code.

To issue any SQL query, use the sql() method on the SparkSession instance, spark, such as spark.sql("SELECT * FROM myTableName"). All spark.sql queries executed in this manner return a DataFrame on which you may perform further Spark operations if you desire.


Basic Query Examples

Spark SQL
Dataset: Airline On-Time Performance and Causes of Flight Delays

To use Spark SQL:

  1. Read dataset into a DataFrame.
  2. Register the DataFrame as a temporary view (more on temporary views shortly) so we can query it with SQL.

Normally, in a standalone Spark application, you will create a SparkSession instance manually, as shown in the following example. However, in a Spark shell (or Databricks notebook), the SparkSession is created for you and accessible via the appropriately named variable spark.

Example-4_1.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark.sql import SparkSession

# Create a SparkSession
spark = (SparkSession
.builder
.appName("SparkSQLExampleApp")
.getOrCreate())

# Path to data set
csv_file = "../../../databricks-datasets/learning-spark-v2/flights/departuredelays.csv"

# Read and create a temporary view
# Infer schema (note that for larger files you may want to specify the schema)
df = (spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(csv_file))
df.createOrReplaceTempView("us_delay_flights_tbl")

Now that we have a temporary view, we can issue SQL queries using Spark SQL. These queries are no different from those you might issue against a SQL table in, say, a MySQL or PostgreSQL database. The point here is to show that Spark SQL offers an ANSI:2003–compliant SQL interface, and to demonstrate the interoperability between SQL and DataFrames.

The US flight delays data set has five columns:

  • The date column contains a string like 02190925. When converted, this maps to 02-19 09:25 am.
  • The delay column gives the delay in minutes between the scheduled and actual departure times. Early departures show negative numbers.
  • The distance column gives the distance in miles from the origin airport to the destination airport.
  • The origin column contains the origin IATA airport code.
  • The destination column contains the destination IATA airport code.

With that in mind, let’s try some example queries against this data set. First, we’ll find all flights whose distance is greater than 1,000 miles:

1
2
3
4
5
6
7
8
sqlquery = """
select distance, origin, destination
from us_delay_flights_tbl
where distance > 1000
order by distance desc
"""

spark.sql(sqlquery).show(10)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
+--------+------+-----------+
only showing top 10 rows

As the results show, all of the longest flights were between Honolulu (HNL) and New York (JFK). Next, we’ll find all flights between San Francisco (SFO) and Chicago (ORD) with at least a two-hour delay:

1
2
3
4
5
6
7
8
9
10
sqlquery = """
select date, delay, origin, destination
from us_delay_flights_tbl
where delay > 120
and origin = 'SFO'
and destination = 'ORD'
order by delay desc
"""

spark.sql(sqlquery).show(10)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+-------+-----+------+-----------+
| date|delay|origin|destination|
+-------+-----+------+-----------+
|2190925| 1638| SFO| ORD|
|1031755| 396| SFO| ORD|
|1022330| 326| SFO| ORD|
|1051205| 320| SFO| ORD|
|1190925| 297| SFO| ORD|
|2171115| 296| SFO| ORD|
|1071040| 279| SFO| ORD|
|1051550| 274| SFO| ORD|
|3120730| 266| SFO| ORD|
|1261104| 258| SFO| ORD|
+-------+-----+------+-----------+
only showing top 10 rows

It seems there were many significantly delayed flights between these two cities, on different dates. (As an exercise, convert the date column into a readable format and find the days or months when these delays were most common. Were the delays related to winter months or holidays?)

Let’s try a more complicated query where we use the CASE clause in SQL. In the following example, we want to label all US flights, regardless of origin and destination, with an indication of the delays they experienced: Very Long Delays (> 6 hours), Long Delays (2–6 hours), etc. We’ll add these human-readable labels in a new column called Flight_Delays:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Spark SQL API
sqlquery = """
select
delay, origin, destination,
case
when delay > 360 then 'Very Long Delays'
when delay > 120 and delay <= 360 then 'Long Delays'
when delay > 60 and delay <= 120 then 'Short Delays'
when delay > 0 and delay <= 60 then 'Tolerable Delays'
when delay = 0 then 'No Delays'
else 'Early'
end as Flight_Delays
from us_delay_flights_tbl
order by origin, delay desc
"""

spark.sql(sqlquery).show(10)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
| 333| ABE| ATL| Long Delays|
| 305| ABE| ATL| Long Delays|
| 275| ABE| ATL| Long Delays|
| 257| ABE| ATL| Long Delays|
| 247| ABE| ATL| Long Delays|
| 247| ABE| DTW| Long Delays|
| 219| ABE| ORD| Long Delays|
| 211| ABE| ATL| Long Delays|
| 197| ABE| DTW| Long Delays|
| 192| ABE| ORD| Long Delays|
+-----+------+-----------+-------------+
only showing top 10 rows
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Spark SQL API
sqlquery = """
with cte as
(
select
delay, origin, destination,
case
when delay > 360 then 'Very Long Delays'
when delay > 120 and delay <= 360 then 'Long Delays'
when delay > 60 and delay <= 120 then 'Short Delays'
when delay > 0 and delay <= 60 then 'Tolerable Delays'
when delay = 0 then 'No Delays'
else 'Early'
end as Flight_Delays
from us_delay_flights_tbl
order by origin, delay desc
)
select Flight_Delays, count(*) as counts
from cte
group by Flight_Delays
order by 2 desc
"""

spark.sql(sqlquery).show(10)
1
2
3
4
5
6
7
8
9
10
+----------------+------+
| Flight_Delays|counts|
+----------------+------+
| Early|668729|
|Tolerable Delays|497742|
| No Delays|131122|
| Short Delays| 61039|
| Long Delays| 31447|
|Very Long Delays| 1499|
+----------------+------+

As with the DataFrame and Dataset APIs, with the spark.sql interface you can conduct common data analysis operations like those we explored in the previous chapter. The computations undergo an identical journey in the Spark SQL engine (see “The Catalyst Optimizer” on page 77 in Chapter 3 for details), giving you the same results.
All three of the preceding SQL queries can be expressed with an equivalent DataFrame API query. For example, the first query can be expressed in the Python DataFrame API as:

1
2
3
4
5
6
# DataFrame API
from pyspark.sql.functions import col, desc

(df.select("distance", "origin", "destination")
.where(col("distance") > 1000)
.orderBy(desc("distance"))).show(10)

Or

1
2
3
4
5
6
# DataFrame API
from pyspark.sql.functions import col, desc

(df.select("distance", "origin", "destination")
.where("distance > 1000")
.orderBy("distance", ascending=False)).show(10)

This produces the same results as the SQL query:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
| 4330| HNL| JFK|
+--------+------+-----------+
only showing top 10 rows

Try to perform the same query through DataFrame API.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from pyspark.sql.functions import expr, asc, desc

# Spark SQL API
case = """
case
when delay > 360 then 'Very Long Delays'
when delay > 120 and delay <= 360 then 'Long Delays'
when delay > 60 and delay <= 120 then 'Short Delays'
when delay > 0 and delay <= 60 then 'Tolerable Delays'
when delay = 0 then 'No Delays'
else 'Early'
end as Flight_Delays
"""

(df.select("delay", "origin", "destination", expr(case))
.orderBy(asc("origin"), desc("delay"))).show(10)

Or

1
2
(df.select("delay", "origin", "destination", expr(case))
.orderBy(col("origin").asc(), col("delay").desc())).show(10)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
| 333| ABE| ATL| Long Delays|
| 305| ABE| ATL| Long Delays|
| 275| ABE| ATL| Long Delays|
| 257| ABE| ATL| Long Delays|
| 247| ABE| ATL| Long Delays|
| 247| ABE| DTW| Long Delays|
| 219| ABE| ORD| Long Delays|
| 211| ABE| ATL| Long Delays|
| 197| ABE| DTW| Long Delays|
| 192| ABE| ORD| Long Delays|
+-----+------+-----------+-------------+

SQL Tables and Views

Tables hold data. Associated with each table in Spark is its relevant metadata, which is information about the table and its data: the schema, description, table name, database name, column names, partitions, physical location where the actual data resides, etc. All of this is stored in a central metastore.

Instead of having a separate metastore for Spark tables, Spark by default uses the Apache Hive metastore, located at /user/hive/warehouse, to persist all the metadata about your tables. However, you may change the default location by setting the Spark config variable spark.sql.warehouse.dir to another location, which can be set to a local or external distributed storage.


Managed Versus UnmanagedTables

Spark allows you to create two types of tables: managed and unmanaged.

  • For a managed table, Spark manages both the metadata and the data in the file store. This could be a local filesystem, HDFS, or an object store such as Amazon S3 or Azure Blob.
  • For an unmanaged table, Spark only manages the metadata, while you manage the data yourself in an external data source such as Cassandra.

With a managed table, because Spark manages everything, a SQL command such as DROP TABLE table_name deletes both the metadata and the data. With an unmanaged table, the same command will delete only the metadata, not the actual data. We will look at some examples of how to create managed and unmanaged tables in the next section.


Creating SQL Databases and Tables

Example-4_2.py

Tables reside within a database. By default, Spark creates tables under the default database. To create your own database name, you can issue a SQL command from your Spark application or notebook. Using the US flight delays data set, let’s create both a managed and an unmanaged table. To begin, we’ll create a database called learn_spark_db and tell Spark we want to use that database:

1
2
3
4
5
6
7
8
9
10
11
12
from os.path import abspath
from pyspark.sql import SparkSession

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = (SparkSession
.builder
.appName("SparkSQLHiveExample")
.config("spark.sql.warehouse.dir", warehouse_location)
.enableHiveSupport()
.getOrCreate())
1
2
3
4
5
6
7
8
9
10
# Path to data set
csv_file = "../../../databricks-datasets/learning-spark-v2/flights/departuredelays.csv"

# Read and create a temporary view
# Infer schema (note that for larger files you may want to specify the schema)
df = (spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(csv_file))
df.createOrReplaceTempView("us_delay_flights_tbl")
1
2
3
# create database and use database
spark.sql("create database learn_spark_db")
spark.sql("use learn_spark_db")

From this point, any commands we issue in our application to create tables will result in the tables being created in this database and residing under the database name learn_spark_db.

Creating a managed table

To create a managed table within the database learn_spark_db, you can issue a SQL query like the following:

1
2
3
4
5
6
sqlQuery = """
create table if not exists managed_us_delay_flights_tbl as
select * from us_delay_flights_tbl
"""

spark.sql(sqlQuery)

You can do the same thing using the DataFrame API like this:

1
2
3
4
5
6
# drop managed_us_delay_flights_tbl we create through Spark SQL API
spark.sql("drop table if exists managed_us_delay_flights_tbl")
# Schema as defined in the preceding example
schema="date STRING, delay INT, distance INT, origin STRING, destination STRING"
flights_df = spark.read.csv(csv_file, schema=schema)
flights_df.write.saveAsTable("managed_us_delay_flights_tbl")

Both of these statements will create the managed table us_delay_flights_tbl in the learn_spark_db database.


Creating an unmanaged table

By contrast, you can create unmanaged tables from your own data sources—say, Parquet, CSV, or JSON files stored in a file store accessible to your Spark application.

To create an unmanaged table from a data source such as a CSV file, in SQL use:

1
2
3
4
5
6
sqlQuery = f"""
CREATE TABLE us_delay_flights_tbl(date STRING, delay INT, distance INT, origin STRING, destination STRING)
USING csv OPTIONS (PATH "{csv_file}")
"""

spark.sql(sqlQuery)

And within the DataFrame API use:

1
2
3
4
5
6
7
# drop table we created in previous steps
spark.sql("drop table if exists us_delay_flights_tbl")
# create table and specify where to sotre
(flights_df
.write
.option("path", "/tmp/data/us_flights_delay")
.saveAsTable("us_delay_flights_tbl"))

Creating Views

In addition to creating tables, Spark can create views on top of existing tables. Views can be global (visible across all SparkSessions on a given cluster) or session-scoped (visible only to a single SparkSession), and they are temporary: they disappear after your Spark application terminates.

Creating views has a similar syntax to creating tables within a database. Once you cre‐ ate a view, you can query it as you would a table. The difference between a view and a table is that views don’t actually hold the data; tables persist after your Spark applica‐ tion terminates, but views disappear.

You can create a view from an existing table using SQL. For example, if you wish to work on only the subset of the US flight delays data set with origin airports of New York (JFK) and San Francisco (SFO), the following queries will create global temporary and temporary views consisting of just that slice of the table:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sqlQuery = """
CREATE OR REPLACE GLOBAL TEMP VIEW us_origin_airport_SFO_global_tmp_view AS
SELECT date, delay, origin, destination
from us_delay_flights_tbl
WHERE origin = 'SFO';
"""

spark.sql(sqlQuery)

sqlQuery = """
CREATE OR REPLACE TEMP VIEW us_origin_airport_JFK_tmp_view AS
SELECT date, delay, origin, destination
from us_delay_flights_tbl
WHERE origin = 'JFK'
"""

spark.sql(sqlQuery)

Once you’ve created these views, you can issue queries against them just as you would against a table. Keep in mind that when accessing a global temporary view you must use the prefix global_temp.<view_name>, because Spark creates global temporary views in a global temporary database called global_temp. For example:

1
2
# query global view from global_temp
spark.sql("select * from global_temp.us_origin_airport_SFO_global_tmp_view").show(5)
1
2
3
4
5
6
7
8
9
10
+--------+-----+------+-----------+
| date|delay|origin|destination|
+--------+-----+------+-----------+
|01011250| 55| SFO| JFK|
|01012230| 0| SFO| JFK|
|01010705| -7| SFO| JFK|
|01010620| -3| SFO| MIA|
|01010915| -3| SFO| LAX|
+--------+-----+------+-----------+
only showing top 5 rows

By contrast, you can access the normal temporary view without the global_temp prefix:

1
2
# query temporary view from current database
spark.sql("select * from us_origin_airport_JFK_tmp_view").show(5)
1
2
3
4
5
6
7
8
9
10
+--------+-----+------+-----------+
| date|delay|origin|destination|
+--------+-----+------+-----------+
|02010900| -1| JFK| LAX|
|02011200| -5| JFK| LAX|
|02011030| -6| JFK| LAX|
|02011900| -1| JFK| LAX|
|02011700| -3| JFK| LAS|
+--------+-----+------+-----------+
only showing top 5 rows

You can accomplish the same thing with the DataFrame API as follows:

1
2
3
4
5
df_sfo = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin = 'SFO'")
df_jfk = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin = 'JFK'")
# Create a temporary and global temporary view
df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")
df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")

You can also query table from DataFrame API:

1
2
# query global view from global_temp
spark.read.table("global_temp.us_origin_airport_SFO_global_tmp_view").show(5)
1
2
# query temporary view from current database
spark.read.table("us_origin_airport_JFK_tmp_view").show(5)

To show views:

1
2
# List all views in global temp view database and current database
spark.sql("show views in global_temp;").show(truncate=False)

Or:

1
2
# List all views in global temp view database and current database
spark.catalog.listTables(dbName="global_temp")

To drop views:

1
2
3
# drop views
spark.sql("drop view if exists us_origin_airport_sfo_global_tmp_view")
spark.sql("drop view if exists us_origin_airport_jfk_tmp_view")

Or

1
2
3
# drop views
spark.catalog.dropGlobalTempView("us_origin_airport_sfo_global_tmp_view")
spark.catalog.dropTempView("us_origin_airport_jfk_tmp_view")

Temporary views versus global temporary views

The difference between temporary and global temporary views being subtle, it can be a source of mild confusion among developers new to Spark. A temporary view is tied to a single SparkSession within a Spark application. In contrast, a global temporary view is visible across multiple SparkSessions within a Spark application. Yes, you can create multiple SparkSessions within a single Spark application—this can be handy, for example, in cases where you want to access (and combine) data from two different SparkSessions that don’t share the same Hive metastore configurations.


Viewing the Metadata

As mentioned previously, Spark manages the metadata associated with each managed or unmanaged table. This is captured in the Catalog, a high-level abstraction in Spark SQL for storing metadata. The Catalog’s functionality was expanded in Spark 2.x with new public methods enabling you to examine the metadata associated with your databases, tables, and views. Spark 3.0 extends it to use external catalog (which we briefly discuss in Chapter 12).

For example, within a Spark application, after creating the SparkSession variable spark, you can access all the stored metadata through methods like these:

1
2
3
4
5
spark.catalog.listDatabases()
# List all views in global temp view database and current database
spark.catalog.listTables(dbName="global_temp")
spark.catalog.listTables()
spark.catalog.listColumns("us_delay_flights_tbl")

Caching SQL Tables

Although we will discuss table caching strategies in the next chapter, it’s worth mentioning here that, like DataFrames, you can cache and uncache SQL tables and views. In Spark 3.0, in addition to other options, you can specify a table as LAZY, meaning that it should only be cached when it is first used instead of immediately:

1
2
3
-- In SQL
CACHE [LAZY] TABLE <table-name>
UNCACHE TABLE <table-name>

Reading Tables into DataFrames

Often, data engineers build data pipelines as part of their regular data ingestion and ETL processes. They populate Spark SQL databases and tables with cleansed data for consumption by applications downstream.

Let’s assume you have an existing database, learn_spark_db, and table, us_delay_flights_tbl, ready for use. Instead of reading from an external JSON file, you can simply use SQL to query the table and assign the returned result to a DataFrame:

1
pyspark
1
2
spark.catalog.listDatabases()
[Database(name='default', description='Default Hive database', locationUri='file:/Users/zacks/Git/Data%20Science%20Projects/LearningSparkV2/spark-warehouse'), Database(name='learn_spark_db', description='', locationUri='file:/Users/zacks/Git/Data%20Science%20Projects/LearningSparkV2/spark-warehouse/learn_spark_db.db')]

The second database learn_spark_db was created by us in previous steps.

1
2
spark.catalog.listTables('learn_spark_db')
[Table(name='managed_us_delay_flights_tbl', database='learn_spark_db', description=None, tableType='MANAGED', isTemporary=False)]
1
2
3
4
# In Python
spark.sql("use learn_spark_db")
us_flights_df = spark.sql("SELECT * FROM managed_us_delay_flights_tbl")
us_flights_df2 = spark.table("managed_us_delay_flights_tbl")

Data Sources for DataFrames and SQL Tables

As shown in Figure 4-1, Spark SQL provides an interface to a variety of data sources. It also provides a set of common methods for reading and writing data to and from these data sources using the Data Sources API.

In this section we will cover some of the built-in data sources, available file formats, and ways to load and write data, along with specific options pertaining to these data sources. But first, let’s take a closer look at two high-level Data Source API constructs that dictate the manner in which you interact with different data sources: DataFrameReader and DataFrameWriter.


DataFrameReader

DataFrameReader is the core construct for reading data from a data source into a DataFrame. It has a defined format and a recommended pattern for usage:

1
DataFrameReader.format(args).option("key", "value").schema(args).load()

This pattern of stringing methods together is common in Spark, and easy to read.

Note that you can only access a DataFrameReader through a SparkSession instance. That is, you cannot create an instance of DataFrameReader. To get an instance handle to it, use:

1
2
# read static data
SparkSession.read
1
2
# read streaming data
SparkSession.readStream

While read returns a handle to DataFrameReader to read into a DataFrame from a static data source, readStream returns an instance to read from a streaming source.

Arguments to each of the public methods to DataFrameReader take different values. Table 4-1 enumerates these, with a subset of the supported arguments.

The documentation for Python, Scala, R, and Java offers suggestions and guidance.

In general, no schema is needed when reading from a static Parquet data source—the Parquet metadata usually contains the schema, so it’s inferred. However, for streaming data sources you will have to provide a schema.
Parquet is the default and preferred data source for Spark because it’s efficient, uses columnar storage, and employs a fast compression algorithm. You will see additional benefits later (such as columnar pushdown), when we cover the Catalyst optimizer in greater depth.


DataFrameWriter

DataFrameWriter does the reverse of its counterpart: it saves or writes data to a specified built-in data source. Unlike with DataFrameReader, you access its instance not from a SparkSession but from the DataFrame you wish to save. It has a few recommended usage patterns:

1
2
3
4
5
6
7
DataFrameWriter.format(args)
.option(args)
.bucketBy(args)
.partitionBy(args)
.save(path)

DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table)

To get an instance handle, use:

1
2
# write static data
DataFrame.write
1
2
# write streaming data
DataFrame.writeStream


Parqet

Parquet is the default data source in Spark. Supported and widely used by many big data processing frameworks and platforms, Parquet is an open source columnar file format that offers many I/O optimizations (such as compression, which saves storage space and allows for quick access to data columns).

Because of its efficiency and these optimizations, we recommend that after you have transformed and cleansed your data, you save your DataFrames in the Parquet format for downstream consumption. (Parquet is also the default table open format for Delta Lake, which we will cover in Chapter 9.)


Reading Parquet files into a DataFrame

Parquet files are stored in a directory structure that contains the data files, metadata, a number of compressed files, and some status files. Metadata in the footer contains the version of the file format, the schema, and column data such as the path, etc.
For example, a directory in a Parquet file might contain a set of files like this:

1
2
3
4
_SUCCESS
_committed_1799640464332036264
_started_1799640464332036264
part-00000-tid-1799640464332036264-91273258-d7ef-4dc7-<...>-c000.snappy.parquet

There may be a number of part-XXXX compressed files in a directory (the names shown here have been shortened to fit on the page).

To read Parquet files into a DataFrame, you simply specify the format and path:

1
2
3
file = """databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet/"""
df = spark.read.format("parquet").load(file)
df.show(5)
1
2
3
4
5
6
7
8
9
10
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Romania| 1|
| United States| Ireland| 264|
| United States| India| 69|
| Egypt| United States| 24|
|Equatorial Guinea| United States| 1|
+-----------------+-------------------+-----+
only showing top 5 rows

Unless you are reading from a streaming data source there’s no need to supply the schema, because Parquet saves it as part of its metadata.


Reading Parquet files into a Spark SQL table

As well as reading Parquet files into a Spark DataFrame, you can also create a Spark SQL unmanaged table or view directly using SQL:

1
2
3
4
5
6
7
sqlQuery = """
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl USING parquet
OPTIONS (path "databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet/")
"""

spark.sql(sqlQuery)
spark.sql("select * from us_delay_flights_tbl").show(5)
1
2
3
4
5
6
7
8
9
10
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Romania| 1|
| United States| Ireland| 264|
| United States| India| 69|
| Egypt| United States| 24|
|Equatorial Guinea| United States| 1|
+-----------------+-------------------+-----+
only showing top 5 rows

Writing DataFrames to Parquet files

Writing or saving a DataFrame as a table or file is a common operation in Spark. To write a DataFrame you simply use the methods and arguments to the DataFrameWriter, supplying the location to save the Parquet files to. For example:

1
2
3
4
5
# In Python
(df.write.format("parquet")
.mode("overwrite")
.option("compression", "snappy")
.save("/tmp/data/parquet/df_parquet"))

Recall that Parquet is the default file format. If you don’t include the format() method, the DataFrame will still be saved as a Parquet file.

This will create a set of compact and compressed Parquet files at the specified path. Since we used snappy as our compression choice here, we’ll have snappy compressed files. For brevity, this example generated only one file; normally, there may be a dozen or so files created:

1
2
-rw-r--r--  1 jules  wheel    0 May 19 10:58 _SUCCESS
-rw-r--r-- 1 jules wheel 966 May 19 10:58 part-00000-<...>-c000.snappy.parquet

Writing DataFrames to Spark SQL tables

Writing a DataFrame to a SQL table is as easy as writing to a file—just use saveAsTable() instead of save(). This will create a managed table called us_delay_flights_tbl:

1
2
3
4
# In Python
(df.write
.mode("overwrite")
.saveAsTable("us_delay_flights_tbl"))

To sum up, Parquet is the preferred and default built-in data source file format in Spark, and it has been adopted by many other frameworks. We recommend that you use this format in your ETL and data ingestion processes.


JSON

JavaScript Object Notation (JSON) is also a popular data format. It came to prominence as an easy-to-read and easy-to-parse format compared to XML. It has two representational formats: single-line mode and multiline mode. Both modes are supported in Spark.

You can read JSON files in single-line or multi-line mode. In single-line mode, a file can be split into many parts and read in parallel. In multi-line mode, a file is loaded as a whole entity and cannot be split.

In single-line mode each line denotes a single JSON object, whereas in multiline mode the entire multiline object constitutes a single JSON object. To read in this mode, set multiLine to true in the option() method.


Reading JSON files into a DataFrame

You can read a JSON file into a DataFrame the same way you did with Parquet—just specify “json“ in the format() method:

1
2
3
4
5
6
7
8
➜  LearningSparkV2 git:(dev) ✗ ll databricks-datasets/learning-spark-v2/flights/summary-data/json
total 280
-rwxr-xr-x 1 zacks staff 21K Feb 17 20:09 2010-summary.json
-rwxr-xr-x 1 zacks staff 21K Feb 17 20:09 2011-summary.json
-rwxr-xr-x 1 zacks staff 20K Feb 17 20:09 2012-summary.json
-rwxr-xr-x 1 zacks staff 20K Feb 17 20:09 2013-summary.json
-rwxr-xr-x 1 zacks staff 20K Feb 17 20:09 2014-summary.json
-rwxr-xr-x 1 zacks staff 21K Feb 17 20:09 2015-summary.json
1
2
3
4
# read multiple JSON files
>>> file = "databricks-datasets/learning-spark-v2/flights/summary-data/json/*"
>>> df = spark.read.format("json").load(file)
>>> df.show(5)
1
2
3
4
5
6
7
8
9
10
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Romania| 15|
| United States| Croatia| 1|
| United States| Ireland| 344|
| Egypt| United States| 15|
| United States| India| 62|
+-----------------+-------------------+-----+
only showing top 5 rows

Reading JSON files into a Spark SQL table

You can also create a SQL table from a JSON file just like you did with Parquet:

1
2
3
4
5
6
7
sqlQuery = """
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl USING json
OPTIONS (path "databricks-datasets/learning-spark-v2/flights/summary-data/json/*")
"""

spark.sql(sqlQuery)
spark.sql("select * from us_delay_flights_tbl").show(5)
1
2
3
4
5
6
7
8
9
10
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Romania| 15|
| United States| Croatia| 1|
| United States| Ireland| 344|
| Egypt| United States| 15|
| United States| India| 62|
+-----------------+-------------------+-----+
only showing top 5 rows

Writing DataFrames to JSON files

Saving a DataFrame as a JSON file is simple. Specify the appropriate DataFrameWriter methods and arguments, and supply the location to save the JSON files to:

1
2
3
4
(df.write.format("json")
.mode("overwrite")
.option("compression", "snappy")
.save("/tmp/data/json/df_json"))

This creates a directory at the specified path populated with a set of compact JSON files:

1
2
-rw-r--r--  1 jules  wheel   0 May 16 14:44 _SUCCESS
-rw-r--r-- 1 jules wheel 71 May 16 14:44 part-00000-<...>-c000.json

JSON data source options

Table 4-3 describes common JSON options for DataFrameReader and DataFrame Writer. For a comprehensive list, we refer you to the documentation.


CSV

As widely used as plain text files, this common text file format captures each datum or field delimited by a comma; each line with comma-separated fields represents a record. Even though a comma is the default separator, you may use other delimiters to separate fields in cases where commas are part of your data. Popular spreadsheets can generate CSV files, so it’s a popular format among data and business analysts.


Reading a CSV file into a DataFrame

As with the other built-in data sources, you can use the DataFrameReader methods and arguments to read a CSV file into a DataFrame:

1
2
3
4
5
6
7
8
9
file = "databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"
schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"
df = (spark.read.format("csv")
.option("header", "true")
.schema(schema)
.option("mode", "FAILFAST") # Exit if any errors
.option("nullValue", "") # Replace any null data field with quotes
.load(file))
df.show(5)
1
2
3
4
5
6
7
8
9
10
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Romania| 1|
| United States| Ireland| 264|
| United States| India| 69|
| Egypt| United States| 24|
|Equatorial Guinea| United States| 1|
+-----------------+-------------------+-----+
only showing top 5 rows

Reading a CSV file into a Spark SQL table

Creating a SQL table from a CSV data source is no different from using Parquet or JSON:

1
2
3
4
5
6
7
8
9
10
11
12
13
sqlQuery = """
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl
USING csv
OPTIONS (
path "databricks-datasets/learning-spark-v2/flights/summary-data/csv/*",
header "true",
inferSchema "true",
mode "FAILFAST"
)
"""

spark.sql(sqlQuery)
spark.sql("select * from us_delay_flights_tbl").show(5)
1
2
3
4
5
6
7
8
9
10
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Romania| 1|
| United States| Ireland| 264|
| United States| India| 69|
| Egypt| United States| 24|
|Equatorial Guinea| United States| 1|
+-----------------+-------------------+-----+
only showing top 5 rows

Once you’ve created the table, you can read data into a DataFrame using SQL as before:


Writing DataFrames to CSV files

Saving a DataFrame as a CSV file is simple. Specify the appropriate DataFrameWriter methods and arguments, and supply the location to save the CSV files to:

1
df.write.format("csv").mode("overwrite").save("/tmp/data/csv/df_csv")

This generates a folder at the specified location, populated with a bunch of com‐
pressed and compact files:

1
2
-rw-r--r--  1 jules  wheel   0 May 16 12:17 _SUCCESS
-rw-r--r-- 1 jules wheel 36 May 16 12:17 part-00000-251690eb-<...>-c000.csv

CSV data source options

Table 4-4 describes some of the common CSV options for DataFrameReader and DataFrameWriter. Because CSV files can be complex, many options are available; for a comprehensive list we refer you to the documentation.


Avro

Apache Avro Data Source Guide
The spark-avro module is external and not included in spark-submit or spark-shell by default.

Read & Write Avro files using Spark DataFrame

Introduced in Spark 2.4 as a built-in data source, the Avro format is used, for example, by Apache Kafka for message serializing and deserializing. It offers many benefits, including direct mapping to JSON, speed and efficiency, and bindings available for many programming languages.


Reading an Avro file into a DataFrame

Reading an Avro file into a DataFrame using DataFrameReader is consistent in usage with the other data sources we have discussed in this section:

1
2
3
4
5
6
7
8
9
10
11
12
➜  LearningSparkV2 git:(dev) ✗ ll databricks-datasets/learning-spark-v2/flights/summary-data/avro/*
total 40
-rwxr-xr-x 1 zacks staff 0B Feb 17 20:09 _SUCCESS
-rwxr-xr-x 1 zacks staff 214B Feb 17 20:09 _committed_4810919456678777831
-rwxr-xr-x 1 zacks staff 204B Feb 17 20:09 _committed_7090025417172081385
-rwxr-xr-x 1 zacks staff 205B Feb 17 20:09 _committed_7128780539805330008
-rwxr-xr-x 1 zacks staff 114B Feb 17 20:09 _committed_8540998754385237107
-rwxr-xr-x 1 zacks staff 0B Feb 17 20:09 _started_4810919456678777831
-rwxr-xr-x 1 zacks staff 0B Feb 17 20:09 _started_7090025417172081385
-rwxr-xr-x 1 zacks staff 0B Feb 17 20:09 _started_7128780539805330008
-rwxr-xr-x 1 zacks staff 0B Feb 17 20:09 _started_8540998754385237107
-rwxr-xr-x 1 zacks staff 3.1K Feb 17 20:09 part-00000-tid-7128780539805330008-467d814d-6f80-4951-a951-f9f7fb8e3930-1434-1-c000.avro
1
2
df = (spark.read.format("avro").load("databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"))
df.show(truncate=False)

Reading an Avro file into a Spark SQL table

Again, creating SQL tables using an Avro data source is no different from using Par‐ quet, JSON, or CSV:

1
2
3
4
5
6
7
-- In SQL
sqlQuery = """CREATE OR REPLACE TEMPORARY VIEW episode_tbl USING avro
OPTIONS (path "databricks-datasets/learning-spark-v2/flights/summary-data/avro/*")
"""

spark.sql(sqlQuery)
spark.sql("select * from episode_tbl").show(5)

Writing DataFrames to Avro files

Writing a DataFrame as an Avro file is simple. As usual, specify the appropriate DataFrameWriter methods and arguments, and supply the location to save the Avro files to:

1
2
3
4
(df.write
.format("avro")
.mode("overwrite")
.save("/tmp/data/avro/df_avro"))

This generates a folder at the specified location, populated with a bunch of compressed and compact files:

1
2
-rw-r--r--  1 jules  wheel    0 May 17 11:54 _SUCCESS
-rw-r--r-- 1 jules wheel 526 May 17 11:54 part-00000-ffdf70f4-<...>-c000.avro

Avro data source options

Table 4-5 describes common options for DataFrameReader and DataFrameWriter. A comprehensive list of options is in the documentation.


ORC

As an additional optimized columnar file format, Spark 2.x supports a vectorized ORC reader. Two Spark configurations dictate which ORC implementation to use. When spark.sql.orc.impl is set to native and spark.sql.orc.enableVectorizedReader is set to true, Spark uses the vectorized ORC reader. A vectorized reader reads blocks of rows (often 1,024 per block) instead of one row at a time, streamlining operations and reducing CPU usage for intensive operations like scans, filters, aggre‐ gations, and joins.

For Hive ORC SerDe (serialization and deserialization) tables created with the SQL command USING HIVE OPTIONS (fileFormat ‘ORC’), the vectorized reader is used when the Spark configuration parameter spark.sql.hive.convertMetastoreOrc is set to true.


Reading an ORC file into a DataFrame

To read in a DataFrame using the ORC vectorized reader, you can just use the normal DataFrameReader methods and options:

1
2
3
4
# In Python
file = "databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"
df = spark.read.format("orc").option("path", file).load()
df.show(5, False)
1
2
3
4
5
6
7
8
9
10
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States |Romania |1 |
|United States |Ireland |264 |
|United States |India |69 |
|Egypt |United States |24 |
|Equatorial Guinea|United States |1 |
+-----------------+-------------------+-----+
only showing top 5 rows

Reading an ORC file into a Spark SQL table

There is no difference from Parquet, JSON, CSV, or Avro when creating a SQL view using an ORC data source:

1
2
3
4
5
6
7
sqlQuery = """
CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl USING orc
OPTIONS (path "databricks-datasets/learning-spark-v2/flights/summary-data/orc/*")
"""

spark.sql(sqlQuery)
spark.sql("select * from us_delay_flights_tbl").show(5)
1
2
3
4
5
6
7
8
9
10
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Romania| 1|
| United States| Ireland| 264|
| United States| India| 69|
| Egypt| United States| 24|
|Equatorial Guinea| United States| 1|
+-----------------+-------------------+-----+
only showing top 5 rows

Writing DataFrames to ORC files

Writing back a transformed DataFrame after reading is equally simple using the DataFrameWriter methods:

1
2
3
4
(df.write.format("orc")
.mode("overwrite")
.option("compression", "snappy")
.save("/tmp/data/orc/flights_orc"))

The result will be a folder at the specified location containing some compressed ORC files:

1
2
-rw-r--r--  1 jules  wheel    0 May 16 17:23 _SUCCESS
-rw-r--r-- 1 jules wheel 547 May 16 17:23 part-00000-<...>-c000.snappy.orc

Images

In Spark 2.4 the community introduced a new data source, image files, to support deep learning and machine learning frameworks such as TensorFlow and PyTorch. For computer vision–based machine learning applications, loading and processing image data sets is important.


Reading an image file into a DataFrame

As with all of the previous file formats, you can use the DataFrameReader methods and options to read in an image file as shown here:

1
2
3
4
5
6
from pyspark.ml import image

#image_dir = "databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
image_dir = "/tmp/cctvVideos/train_images/"
images_df = spark.read.format("image").load(image_dir)
images_df.printSchema()
1
2
3
4
5
from pyspark.ml import image

image_dir = "databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
images_df = spark.read.format("image").load(image_dir)
images_df.printSchema()
1
2
3
4
5
6
7
8
9
root
|-- image: struct (nullable = true)
| |-- origin: string (nullable = true)
| |-- height: integer (nullable = true)
| |-- width: integer (nullable = true)
| |-- nChannels: integer (nullable = true)
| |-- mode: integer (nullable = true)
| |-- data: binary (nullable = true)
|-- label: integer (nullable = true)
1
images_df.select("image.height", "image.width", "image.nChannels", "image.mode", "label").show(5, truncate=False)
1
2
3
4
5
6
7
8
9
10
+------+-----+---------+----+-----+
|height|width|nChannels|mode|label|
+------+-----+---------+----+-----+
|288 |384 |3 |16 |0 |
|288 |384 |3 |16 |1 |
|288 |384 |3 |16 |0 |
|288 |384 |3 |16 |0 |
|288 |384 |3 |16 |0 |
+------+-----+---------+----+-----+
only showing top 5 rows

Binary Files

Spark 3.0 adds support for binary files as a data source. The DataFrameReader converts each binary file into a single DataFrame row (record) that contains the raw content and metadata of the file. The binary file data source produces a DataFrame with the following columns:

  • path: StringType
  • modificationTime: TimestampType
  • length: LongType
  • content: BinaryType

Reading a binary file into a DataFrame

To read binary files, specify the data source format as a binaryFile. You can load files with paths matching a given global pattern while preserving the behavior of partition discovery with the data source option pathGlobFilter. For example, the following code reads all JPG files from the input directory with any partitioned directories:

1
2
3
4
5
path = "databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
binary_files_df = (spark.read.format("binaryFile")
.option("pathGlobFilter", "*.jpg")
.load(path))
binary_files_df.show(5)
1
2
3
4
5
6
7
8
9
10
+--------------------+--------------------+------+--------------------+-----+
| path| modificationTime|length| content|label|
+--------------------+--------------------+------+--------------------+-----+
|file:/Users/zacks...|2022-02-17 20:09:...| 55037|[FF D8 FF E0 00 1...| 0|
|file:/Users/zacks...|2022-02-17 20:09:...| 54634|[FF D8 FF E0 00 1...| 1|
|file:/Users/zacks...|2022-02-17 20:09:...| 54624|[FF D8 FF E0 00 1...| 0|
|file:/Users/zacks...|2022-02-17 20:09:...| 54505|[FF D8 FF E0 00 1...| 0|
|file:/Users/zacks...|2022-02-17 20:09:...| 54475|[FF D8 FF E0 00 1...| 0|
+--------------------+--------------------+------+--------------------+-----+
only showing top 5 rows

To ignore partitioning data discovery in a directory, you can set recursiveFileLookup to “true“:

1
2
3
4
5
6
path = "databricks-datasets/learning-spark-v2/cctvVideos/train_images/"
binary_files_df = (spark.read.format("binaryFile")
.option("pathGlobFilter", "*.jpg")
.option("recursiveFileLookup", "true")
.load(path))
binary_files_df.show(5)
1
2
3
4
5
6
7
8
9
10
+--------------------+--------------------+------+--------------------+
| path| modificationTime|length| content|
+--------------------+--------------------+------+--------------------+
|file:/Users/zacks...|2022-02-17 20:09:...| 55037|[FF D8 FF E0 00 1...|
|file:/Users/zacks...|2022-02-17 20:09:...| 54634|[FF D8 FF E0 00 1...|
|file:/Users/zacks...|2022-02-17 20:09:...| 54624|[FF D8 FF E0 00 1...|
|file:/Users/zacks...|2022-02-17 20:09:...| 54505|[FF D8 FF E0 00 1...|
|file:/Users/zacks...|2022-02-17 20:09:...| 54475|[FF D8 FF E0 00 1...|
+--------------------+--------------------+------+--------------------+
only showing top 5 rows

Note that the label column is absent when the recursiveFileLookup option is set to “true“.

Currently, the binary file data source does not support writing a DataFrame back to the original file format.


Summary

To recap, this chapter explored the interoperability between the DataFrame API and Spark SQL. In particular, you got a flavor of how to use Spark SQL to:

  • Create managed and unmanaged tables using Spark SQL and the DataFrame API.
  • Read from and write to various built-in data sources and file formats.
  • Employ the spark.sql programmatic interface to issue SQL queries on structured data stored as Spark SQL tables or views.
  • Peruse the Spark Catalog to inspect metadata associated with tables and views.
  • Use the DataFrameWriter and DataFrameReader APIs.

Spark SQL and DataFrames: Interacting with External Data Sources

In this chapter, we will focus on how Spark SQL interfaces with external components. Specifically, we discuss how Spark SQL allows you to:

  • Use user-defined functions for both Apache Hive and Apache Spark.
  • Connect with external data sources such as JDBC and SQL databases, PostgreSQL, MySQL, Tableau, Azure Cosmos DB, and MS SQL Server.
  • Work with simple and complex types, higher-order functions, and common relational operators.
    We’ll also look at some different options for querying Spark using Spark SQL, such as the Spark SQL shell, Beeline, and Tableau.

Spark SQL and Apache Hive

Spark SQL is a foundational component of Apache Spark that integrates relational processing with Spark’s functional programming API. Its genesis was in previous work on Shark. Shark was originally built on the Hive codebase on top of Apache Spark and became one of the first interactive SQL query engines on Hadoop systems. It demonstrated that it was possible to have the best of both worlds; as fast as an enterprise data warehouse, and scaling as well as Hive/MapReduce.

Spark SQL lets Spark programmers leverage the benefits of faster performance and relational programming (e.g., declarative queries and optimized storage), as well as call complex analytics libraries (e.g., machine learning). As discussed in the previous chapter, as of Apache Spark 2.x, the SparkSession provides a single unified entry point to manipulate data in Spark.


User-Defined Functions

While Apache Spark has a plethora of built-in functions, the flexibility of Spark allows for data engineers and data scientists to define their own functions too. These are known as user-defined functions (UDFs).


Spark SQL UDFs

The benefit of creating your own PySpark or Scala UDFs is that you (and others) will be able to make use of them within Spark SQL itself. For example, a data scientist can wrap an ML model within a UDF so that a data analyst can query its predictions in Spark SQL without necessarily understanding the internals of the model.

Here’s a simplified example of creating a Spark SQL UDF. Note that UDFs operate per session and they will not be persisted in the underlying metastore:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# In Python
from pyspark.sql.types import LongType

# Create cubed function
def cubed(s):
return s * s * s

# Register UDF
spark.udf.register("cubed", cubed, LongType())

# Generate temporary view
spark.range(1, 9).createOrReplaceTempView("udf_test")

# Query the cubed UDF
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()
1
2
3
4
5
6
7
8
9
10
11
12
+---+--------+                                                                  
| id|id_cubed|
+---+--------+
| 1| 1|
| 2| 8|
| 3| 27|
| 4| 64|
| 5| 125|
| 6| 216|
| 7| 343|
| 8| 512|
+---+--------+

Evaluation order and null checking in Spark SQL

Spark SQL (this includes SQL, the DataFrame API, and the Dataset API) does not guarantee the order of evaluation of subexpressions. For example, the following query does not guarantee that the s is NOT NULL clause is executed prior to the strlen(s) > 1 clause:

1
spark.sql("SELECT s FROM test1 WHERE s IS NOT NULL AND strlen(s) > 1")

Therefore, to perform proper null checking, it is recommended that you do the
following:

  1. Make the UDF itself null-aware and do null checking inside the UDF.
  2. Use IF or CASE WHEN expressions to do the null check and invoke the UDF in a conditional branch.

Speeding up and distributing PySpark UDFs with Pandas UDFs

One of the previous prevailing issues with using PySpark UDFs was that they had slower performance than Scala UDFs. This was because the PySpark UDFs required data movement between the JVM and Python, which was quite expensive. To resolve this problem, Pandas UDFs (also known as vectorized UDFs) were introduced as part of Apache Spark 2.3. A Pandas UDF uses Apache Arrow to transfer data and Pandas to work with the data. You define a Pandas UDF using the keyword pandas_udf as the decorator, or to wrap the function itself. Once the data is in Apache Arrow format, there is no longer the need to serialize/pickle the data as it is already in a format consumable by the Python process. Instead of operating on individual inputs row by row, you are operating on a Pandas Series or DataFrame (i.e., vectorized execution).

From Apache Spark 3.0 with Python 3.6 and above, Pandas UDFs were split into two API categories: Pandas UDFs and Pandas Function APIs.

Pandas UDFs
With Apache Spark 3.0, Pandas UDFs infer the Pandas UDF type from Python type hints in Pandas UDFs such as pandas.Series, pandas.DataFrame, Tuple, and Iterator. Previously you needed to manually define and specify each Pandas UDF type. Currently, the supported cases of Python type hints in Pandas UDFs are Series to Series, Iterator of Series to Iterator of Series, Iterator of Multiple Series to Iterator of Series, and Series to Scalar (a single value).

Pandas Function APIs
Pandas Function APIs allow you to directly apply a local Python function to a PySpark DataFrame where both the input and output are Pandas instances. For Spark 3.0, the supported Pandas Function APIs are grouped map, map, co- grouped map.

The following is an example of a scalar Pandas UDF for Spark 3.0:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# In Python
# Import pandas
import pandas as pd

# Import various pyspark SQL functions including pandas_udf
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the cubed function
def cubed(a: pd.Series) -> pd.Series:
return a * a * a

# Create the pandas UDF for the cubed function
cubed_udf = pandas_udf(cubed, returnType=LongType())

The preceding code snippet declares a function called cubed() that performs a cubed operation. This is a regular Pandas function with the additional cubed_udf = pandas_udf() call to create our Pandas UDF.

Let’s start with a simple Pandas Series (as defined for x) and then apply the local func‐ tion cubed() for the cubed calculation:

1
2
3
4
5
# Create a Pandas Series
x = pd.Series([1, 2, 3])

# The function for a pandas_udf executed with local Pandas data
print(cubed(x))
1
2
3
4
0     1
1 8
2 27
dtype: int64

Now let’s switch to a Spark DataFrame. We can execute this function as a Spark vec‐ torized UDF as follows:

1
2
3
4
5
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.range(1, 4)

# Execute function as a Spark vectorized UDF
df.select("id", cubed_udf(col("id"))).show()
1
2
3
4
5
6
7
+---+---------+                                                                 
| id|cubed(id)|
+---+---------+
| 1| 1|
| 2| 8|
| 3| 27|
+---+---------+

Notice the column name is cubed although we called cubed_udf.

For SQL:

1
2
3
4
df.createOrReplaceTempView("_df")
spark.udf.register("cubed_udf", cubed_udf)

spark.sql("select id, cubed_udf(id) from _df").show()
1
2
3
4
5
6
7
+---+-------------+                                                             
| id|cubed_udf(id)|
+---+-------------+
| 1| 1|
| 2| 8|
| 3| 27|
+---+-------------+

As opposed to a local function, using a vectorized UDF will result in the execution of Spark jobs; the previous local function is a Pandas function executed only on the Spark driver. This becomes more apparent when viewing the Spark UI for one of the stages of this pandas_udf function (Figure 5-1).

For a deeper dive into Pandas UDFs, refer to pandas user-defined functions documentation.

Figure 5-1. Spark UI stages for executing a Pandas UDF on a Spark DataFrame

Like many Spark jobs, the job starts with parallelize() to send local data (Arrow binary batches) to executors and calls mapPartitions() to convert the Arrow binary batches to Spark’s internal data format, which can be distributed to the Spark workers. There are a number of WholeStageCodegen steps, which represent a fundamental step up in performance (thanks to Project Tungsten’s whole-stage code generation, which significantly improves CPU efficiency and performance). But it is the ArrowEvalPython step that identifies that (in this case) a Pandas UDF is being executed.


Querying with the Spark SQL Shell, Beeline, and Tableau

There are various mechanisms to query Apache Spark, including the Spark SQL shell, the Beeline CLI utility, and reporting tools like Tableau and Power BI.

In this section, we include instructions for Tableau; for Power BI, please refer to the documentation.


Using the Spark SQL Shell

A convenient tool for executing Spark SQL queries is the spark-sql CLI. While this utility communicates with the Hive metastore service in local mode, it does not talk to the Thrift JDBC/ODBC server (a.k.a. Spark Thrift Server or STS). The STS allows JDBC/ODBC clients to execute SQL queries over JDBC and ODBC protocols on Apache Spark.

To start the Spark SQL CLI, execute the following command in the $SPARK_HOME folder:

1
./bin/spark-sql

Or

1
$SPARK_HOME/bin/spark-sql

Once you’ve started the shell, you can use it to interactively perform Spark SQL queries. Let’s take a look at a few examples.


Create a table

To create a new permanent Spark SQL table, execute the following statement:

1
2
3
-- we created learn_spark_db in previous chapters
spark-sql> USE learn_spark_db;
spark-sql> CREATE TABLE people (name STRING, age int);

Your output should be similar to this, noting the creation of the Spark SQL table
people as well as its file location (/user/hive/warehouse/people):


Insert data into the table

You can insert data into a Spark SQL table by executing a statement similar to:

1
INSERT INTO people SELECT name, age FROM ...

As you’re not dependent on loading data from a preexisting table or file, you can insert data into the table using INSERT...VALUES statements. These three statements insert three individuals (their names and ages, if known) into the people table:

1
2
3
4
5
6
spark-sql> INSERT INTO people VALUES ("Michael", NULL);
Time taken: 0.837 seconds
spark-sql> INSERT INTO people VALUES ("Andy", 30);
Time taken: 0.352 seconds
spark-sql> INSERT INTO people VALUES ("Samantha", 19);
Time taken: 0.285 seconds

Running a Spark SQL query

Now that you have data in your table, you can run Spark SQL queries against it. Let’s start by viewing what tables exist in our metastore:

1
2
3
spark-sql> SHOW TABLES;
people
Time taken: 0.019 seconds, Fetched 1 row(s)

Next, let’s find out how many people in our table are younger than 20 years of age:

1
2
3
spark-sql> SELECT * FROM people WHERE age < 20;
Samantha 19
Time taken: 0.127 seconds, Fetched 1 row(s)

As well, let’s see who the individuals are who did not specify their age:

1
2
3
spark-sql> SELECT name FROM people WHERE age IS NULL;
Michael
Time taken: 0.164 seconds, Fetched 1 row(s)

Working with Beeline

If you’ve worked with Apache Hive you may be familiar with the command-line tool Beeline, a common utility for running HiveQL queries against HiveServer2. Beeline is a JDBC client based on the SQLLine CLI. You can use this same utility to execute Spark SQL queries against the Spark Thrift server. Note that the currently implemented Thrift JDBC/ODBC server corresponds to HiveServer2 in Hive 1.2.1. You can test the JDBC server with the following Beeline script that comes with either Spark or Hive 1.2.1.


Start the Thrift server

To start the Spark Thrift JDBC/ODBC server, execute the following command from the $SPARK_HOME folder:

1
./sbin/start-thriftserver.sh

Or

1
$SPARK_HOME/sbin/start-thriftserver.sh

If you have not already started your Spark driver and worker, execute the following command prior to start-thriftserver.sh:

1
./sbin/start-all.sh

Or

1
$SPARK_HOME/sbin/start-all.sh

Connect to the Thrift server via Beeline
To test the Thrift JDBC/ODBC server using Beeline, execute the following command:

1
./bin/beeline

Or

1
2
$SPARK_HOME/bin/beeline
Beeline version 2.3.9 by Apache Hive

Then configure Beeline to connect to the local Thrift server:

1
!connect jdbc:hive2://localhost:10000

By default, Beeline is in non-secure mode. Thus, the username is your login (e.g., user@learningspark.org) and the password is blank.

For example:

1
2
3
4
5
6
7
8
beeline> !connect jdbc:hive2://localhost:10000
Connecting to jdbc:hive2://localhost:10000
Enter username for jdbc:hive2://localhost:10000: zacks
Enter password for jdbc:hive2://localhost:10000:
Connected to: Spark SQL (version 3.2.1)
Driver: Hive JDBC (version 2.3.9)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000>

Execute a Spark SQL query with Beeline

From here, you can run a Spark SQL query similar to how you would run a Hive query with Beeline. Here are a few sample queries and their output:

1
2
0: jdbc:hive2://localhost:10000> USE learn_spark_db;
0: jdbc:hive2://localhost:10000> SHOW tables;
1
2
3
4
5
6
+-----------------+------------+--------------+
| namespace | tableName | isTemporary |
+-----------------+------------+--------------+
| learn_spark_db | people | false |
+-----------------+------------+--------------+
1 row selected (0.081 seconds)
1
0: jdbc:hive2://localhost:10000> SELECT * FROM people;
1
2
3
4
5
6
7
8
+-----------+-------+
| name | age |
+-----------+-------+
| Michael | NULL |
| Samantha | 19 |
| Andy | 30 |
+-----------+-------+
3 rows selected (1.373 seconds)

Stop the Thrift server

Once you’re done, you can stop the Thrift server with the following command:

1
./sbin/stop-thriftserver.sh

Or

1
$SPARK_HOME/sbin/stop-thriftserver.sh

Working with Tableau

Similar to running queries through Beeline or the Spark SQL CLI, you can connect your favorite BI tool to Spark SQL via the Thrift JDBC/ODBC server. In this section, we will show you how to connect Tableau Desktop (version 2019.2) to your local Apache Spark instance.

You will need to have the Tableau’s Spark ODBC driver version 1.2.0 or above already installed. If you have installed (or upgraded to) Tableau 2018.1 or greater, this driver should already be preinstalled.


Start the Thrift server

To start the Spark Thrift JDBC/ODBC server, execute the following command from the $SPARK_HOME folder:

1
./sbin/start-thriftserver.sh

Or

1
$SPARK_HOME/sbin/start-thriftserver.sh

If you have not already started your Spark driver and worker, execute the following command prior to start-thriftserver.sh:

1
./sbin/start-all.sh

Or

1
$SPARK_HOME/sbin/start-all.sh

Skip for now since I don’t have Tableau license


External Data Sources

JDBC and SQL Databases

Spark SQL includes a data source API that can read data from other databases using
JDBC. It simplifies querying these data sources as it returns the results as a DataFrame, thus providing all of the benefits of Spark SQL (including performance and the ability to join with other data sources).

To get started, you will need to specify the JDBC driver for your JDBC data source and it will need to be on the Spark classpath. From the $SPARK_HOME folder, you’ll issue a command like the following:

1
./bin/spark-shell --driver-class-path $database.jar --jars $database.jar

Using the data source API, the tables from the remote database can be loaded as a DataFrame or Spark SQL temporary view. Users can specify the JDBC connection properties in the data source options. Table 5-1 contains some of the more common connection properties (case-insensitive) that Spark supports.

For the full list of connection properties, see the Spark SQL documentation.


Example: PySpark & MySQL

Download JDBC Connector

How to use JDBC source to write and read data in (Py)Spark?

For MacOS, download and unzip MySQL JDBC connector - Platform Independent on MySQL.

After unzip the MySQL JDBC file, you will get a connector directory.
Inside the directory, you can see there is a .jar file


Save JDBC Connector inside Spark root directory

Make a directory named jdbc under your Spark root directoy:

1
➜  spark-3.2.1-bin-hadoop3.2 mkdir jdbc

Move the whole MYSQL JDBC directory(e.g., the connector dir was in my Downloads dir) to jdbc:

1
➜  spark-3.2.1-bin-hadoop3.2 mv ~/Downloads/mysql-connector-java-8.0.28 jdbc

Check:
Notice MySQL JDBC connector .jar file’s path is jdbc/mysql-connector-java-8.0.28/mysql-connector-java-8.0.28.jar

1
2
3
4
5
6
7
8
9
10
➜  spark-3.2.1-bin-hadoop3.2 ll jdbc/mysql-connector-java-8.0.28 
total 5792
-rw-r--r--@ 1 zacks staff 267K Dec 15 16:25 CHANGES
-rw-r--r--@ 1 zacks staff 185B Dec 15 16:25 INFO_BIN
-rw-r--r--@ 1 zacks staff 136B Dec 15 16:25 INFO_SRC
-rw-r--r--@ 1 zacks staff 102K Dec 15 16:25 LICENSE
-rw-r--r--@ 1 zacks staff 1.2K Dec 15 16:25 README
-rw-r--r--@ 1 zacks staff 88K Dec 15 16:25 build.xml
-rw-r--r--@ 1 zacks staff 2.4M Dec 15 16:25 mysql-connector-java-8.0.28.jar
drwxr-xr-x@ 8 zacks staff 256B Dec 15 16:25 src

Specify JDBC Connector for Spark classpath

Include applicable JDBC driver when you submit the application or start shell:

1
bin/pyspark --driver-class-path jdbc/mysql-connector-java-8.0.28/mysql-connector-java-8.0.28.jar --jars jdbc/mysql-connector-java-8.0.28/mysql-connector-java-8.0.28.jar

Test: PySpark
1
2
3
4
5
6
7
8
9
10
# Loading data from a JDBC source using load 
jdbcDF = (spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("dbtable", "[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.load())
1
2
3
4
5
6
7
8
9
10
11
>>> jdbcDF.show()
+--------+----------+-------+--------+---------+
|order_id|order_date|item_id|buyer_id|seller_id|
+--------+----------+-------+--------+---------+
| 1|2019-08-01| 4| 1| 2|
| 2|2019-08-02| 2| 1| 3|
| 3|2019-08-03| 3| 2| 3|
| 4|2019-08-04| 1| 4| 2|
| 5|2019-08-04| 1| 3| 4|
| 6|2019-08-05| 2| 2| 4|
+--------+----------+-------+--------+---------+

The importance of partitioning

When transferring large amounts of data between Spark SQL and a JDBC external source, it is important to partition your data source. All of your data is going through one driver connection, which can saturate and significantly slow down the performance of your extraction, as well as potentially saturate the resources of your source system. While these JDBC properties are optional, for any large-scale operations it is highly recommended to use the properties shown in Table 5-2.

Let’s take a look at an example to help you understand how these properties work. Suppose we use the following settings:

  • numPartitions:10
  • lowerBound:1000
  • upperBound:10000

Then the stride is equal to 1,000, and 10 partitions will be created. This is the equivalent of executing these 10 queries (one for each partition):

  • SELECT *FROM table WHERE partitionColumn BETWEEN 1000 and 2000
  • SELECT* FROM table WHERE partitionColumn BETWEEN 2000 and 3000
  • ...
  • SELECT * FROM table WHERE partitionColumn BETWEEN 9000 and 10000

While not all-encompassing, the following are some hints to keep in mind when using these properties:

  • A good starting point for numPartitions is to use a multiple of the number of Spark workers. For example, if you have four Spark worker nodes, then perhaps start with 4 or 8 partitions. But it is also important to note how well your source system can handle the read requests. For systems that have processing windows, you can maximize the number of concurrent requests to the source system; for systems lacking processing windows (e.g., an OLTP system continuously processing data), you should reduce the number of concurrent requests to prevent saturation of the source system.
  • Initially, calculate the lowerBound and upperBound based on the minimum and maximum partitionColumn actual values. For example, if you choose {numPartitions:10, lowerBound: 1000, upperBound: 10000}, but all of the values are between 2000 and 4000, then only 2 of the 10 queries (one for each partition) will be doing all of the work. In this scenario, a better configuration would be {numPartitions:10, lowerBound: 2000, upperBound: 4000}.
  • Choose a partitionColumn that can be uniformly distributed to avoid data skew. For example, if the majority of your partitionColumn has the value 2500, with {numPartitions:10, lowerBound: 1000, upperBound: 10000} most of the work will be performed by the task requesting the values between 2000 and 3000. Instead, choose a different partitionColumn, or if possible generate a new one (perhaps a hash of multiple columns) to more evenly distribute your partitions.

PostgreSQL

To connect to a PostgreSQL database, build or download the JDBC jar from Maven and add it to your classpath. Then start a Spark shell (spark-shell or pyspark), specifying that jar:

1
bin/spark-shell --jars postgresql-42.2.6.jar

The following examples show how to load from and save to a PostgreSQL database using the Spark SQL data source API and JDBC in PySpark:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Read Option 1: Loading data from a JDBC source using load method
jdbcDF1 = (spark
.read
.format("jdbc")
.option("url", "jdbc:postgresql://[DBSERVER]")
.option("dbtable", "[SCHEMA].[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.load())

# Read Option 2: Loading data from a JDBC source using jdbc method
jdbcDF2 = (spark
.read
.jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]",
properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))

# Write Option 1: Saving data to a JDBC source using save method
(jdbcDF1
.write
.format("jdbc")
.option("url", "jdbc:postgresql://[DBSERVER]")
.option("dbtable", "[SCHEMA].[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.save())

# Write Option 2: Saving data to a JDBC source using jdbc method
(jdbcDF2
.write
.jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]",
properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))

MySQL

To connect to a MySQL database, build or download the JDBC jar from Maven or MySQL (the latter is easier!) and add it to your classpath. Then start a Spark shell (spark-shell or pyspark), specifying that jar:

1
bin/spark-shell --jars mysql-connector-java_8.0.16-bin.jar

The following examples show how to load data from and save it to a MySQL database using the Spark SQL data source API and JDBC in PySpark:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# In Python
# Loading data from a JDBC source using load
jdbcDF = (spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("dbtable", "[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.load())

# Saving data to a JDBC source using save
(jdbcDF
.write
.format("jdbc")
.option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("dbtable", "[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.save())

Azure Cosmos DB

To connect to an Azure Cosmos DB database, build or download the JDBC jar from Maven or GitHub and add it to your classpath. Then start a Scala or PySpark shell, specifying this jar (note that this example is using Spark 2.4):

1
bin/spark-shell --jars azure-cosmosdb-spark_2.4.0_2.11-1.3.5-uber.jar

You also have the option of using --packages to pull the connector from Spark Packages using its Maven coordinates:

1
2
export PKG="com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.3.5"
bin/spark-shell --packages $PKG

The following examples show how to load data from and save it to an Azure Cosmos DB database using the Spark SQL data source API and JDBC in Scala and PySpark. Note that it is common to use the query_custom configuration to make use of the various indexes within Cosmos DB:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# In Python
# Loading data from Azure Cosmos DB
# Read configuration
query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"
readConfig = {
"Endpoint" : "https://[ACCOUNT].documents.azure.com:443/",
"Masterkey" : "[MASTER KEY]",
"Database" : "[DATABASE]",
"preferredRegions" : "Central US;East US2",
"Collection" : "[COLLECTION]",
"SamplingRatio" : "1.0",
"schema_samplesize" : "1000",
"query_pagesize" : "2147483647",
"query_custom" : query
}

# Connect via azure-cosmosdb-spark to create Spark DataFrame
df = (spark
.read
.format("com.microsoft.azure.cosmosdb.spark")
.options(**readConfig)
.load())

# Count the number of flights
df.count()

# Saving data to Azure Cosmos DB
# Write configuration
writeConfig = {
"Endpoint" : "https://[ACCOUNT].documents.azure.com:443/",
"Masterkey" : "[MASTER KEY]",
"Database" : "[DATABASE]",
"Collection" : "[COLLECTION]",
"Upsert" : "true"
}

# Upsert the DataFrame to Azure Cosmos DB
(df.write
.format("com.microsoft.azure.cosmosdb.spark")
.options(**writeConfig)
.save())

For more information, please refer to the Azure Cosmos DB documentation.


MS SQL Server

To connect to an MS SQL Server database, download the JDBC jar and add it to your classpath. Then start a Scala or PySpark shell, specifying this jar:

1
bin/spark-shell --jars mssql-jdbc-7.2.2.jre8.jar

The following examples show how to load data from and save it to an MS SQL Server database using the Spark SQL data source API and JDBC in Scala and PySpark:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Configure jdbcUrl
jdbcUrl = "jdbc:sqlserver://[DBSERVER]:1433;database=[DATABASE]"

# Loading data from a JDBC source
jdbcDF = (spark
.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.load())

# Saving data to a JDBC source
(jdbcDF
.write
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.save())

Other External Sources

There are just some of the many external data sources Apache Spark can connect to; other popular data sources include: