Apache Spark tutorial
Apache Spark is an open-source distributed computing framework with its own processing engine to perform transformations and analyses over big data both in batch and streaming processing. It offers APIs for Scala, Python, Java, R, and SQL.
Spark is designed to split a job into tasks running on several servers at the same time, turning a long task on a large dataset into smaller tasks on portions of data.
This tutorial provides pieces of information to use spark-submit
, spark-shell
, and pyspark
to develop and execute Spark code on TDP.
Requirements
This tutorial assumes you are running a cluster based on TDP getting started, an easy-to-launch TDP environment for testing purposes. This deployment provides you with:
tdp_user
, a user with the ability tokinit
for authentication.- An edge node accessible by SSH
- HDFS directories:
/user/tdp_user
Note: When using another TDP deployment than tdp-getting-started, some commands require adaptation to your deployment.
Before beginning the tutorial, connect to the cluster and authenticate yourself with kinit
using the following commands:
# Connect to edge-01.tdp
vagrant ssh edge-01
# Switch user to tdp_user
sudo su tdp_user
# Authenticate the user with his Kerberos principal and password
kinit -kt ~/tdp_user.keytab tdp_user@REALM.TDP
Running a Spark application on TDP
Spark commands are executed using a Spark client located on an edge node of a cluster. An edge node is a space, a VM in the case of the getting-started cluster, used to interact with the rest of the cluster. Among other things, one of its uses is to provide a shell environment provisioned with all the binaries and configurations required to communicate with the services. You securely connect to it with SSH.
The spark-submit
command launches a distributed application on a cluster via a resource manager, YARN in the case of TDP. Spark can also be run on Kubernetes and even in standalone mode. The spark-submit
command takes a range of parameters that define where, how, and with what resources the application needs to be executed.
The following commands illustrate how to submit an application, check its status, and how to stop it.
Run a Spark application locally
YARN is not required, for the sake of purpose, running spark in standalone involves the following spark-submit
command:
# Move to the Spark directory
cd /opt/tdp/spark
# Run the application localy
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[4] \
./examples/jars/spark-examples_2.11-2.3.5-TDP-0.1.0-SNAPSHOT.jar \
100
The options are the following:
--class
: the name of the Spark application class to run (e.g.org.apache.spark.examples.SparkPi
).--master
: the master URL for the cluster (e.g. Execute it on localhost using 4 threads).application-jar
: the jar containing the Spark application class.application-arguments
: arguments passed to the main method of main class, in this case 100.
Submit a Spark application to YARN
The following spark-submit
command allows to submit a distributed Spark application to YARN:
# Move to the Spark directory
cd /opt/tdp/spark
# Submit the application to YARN
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue default \
./examples/jars/spark-examples_2.11-2.3.5-TDP-0.1.0-SNAPSHOT.jar \
100
In this case additional options are used to tune the resource, such as the memory requested for the driver and executors, the cores for the executors and the deploy mode. Note that these parameters won’t override the limits set on the user or the YARN queue.
The options are the following:
--master yarn
: execute on a TDP cluster using YARN.--deploy-mode cluster
: host driver program on a worker (not localhost).--driver-memory 4g
: requested memory for driver program.--executor-memory 2g
: requested memory for executors.--executor-cores 1
: requested cores for executors.--queue default
: which yarn queue to submit to.
Status of application
The yarn
command is used to see the status of running applications:
yarn application --list
Stop a Spark job
The applicationID
of running jobs can be retrieved with the previous command. It can be used to perform operations on a particular job such as stopping it:
yarn application --kill <application_id>
Spark Shell with Scala
The spark-shell
command is used to quickly explore or analyze data from the command line.
Execute the below commands to add test data to HDFS and launch the spark-shell
.
# Download moby-dick to the edge
curl --create-dirs -o /home/tdp_user/moby-dick.txt https://www.gutenberg.org/files/2701/2701-0.txt
# Copy moby-dick.txt to hdfs
hdfs dfs \
-copyFromLocal /home/tdp_user/moby-dick.txt \
/user/tdp_user
# Launch the spark-shell client
spark-shell
RDD operations
Transformations and Actions are RDD (resilient distributed datasets) operations. This is a core feature of Spark, it allows table distribution and parallel operations.
- A Transformation is a function that produces a new RDD from the existing RDDs.
- An Action is performed when you want to work with the actual dataset. Unlike a transformation, an action returns a result but not a new RDD.
Transformation operation
The following command is a transformation. It describes a computation to be done in the future. The transformation sets up for a future Spark action by reading the contents of the book and storing it in a variable classed textFile
. It needs to be run within the spark-shell
client opened previously:
val textFile = spark.read.textFile("/user/tdp_user/moby-dick.txt")
Action operation
Actions can be used on the textFile
variable. Actions trigger computation and return a result.
// Returns the line count
textFile.count()
// Returns the first line
textFile.first()
Combine Transformations and Actions
Transformations and actions can be chained together to create complex yet concise code. For example, the next command returns the number of words in the largest line.
textFile
.map(line => line.split(" ").size)
.reduce((a, b) => if (a > b) a else b)
Spark Optimization - Caching
The Spark driver optimizes the query execution plan to prune redundant operations and increase performance. Users can also force the caching of certain small and regularly queried datasets to further optimize.
textFile.cache()
Spark Shell with Python
For users that prefer the Python syntax, Spark provides the option to use the pyspark
shell instead of the spark-shell
shell.
pyspark
The following commands perform the same transformation and actions performed in the previous section.
# transformation: read the text in a variable
textFile = spark.read.text("/user/tdp_user/moby-dick.txt")
# action: returns the line count
textFile.count()
# action: returns the first line
textFile.first()
# combination of action and transformation: returns the maximum number of words in a line
from pyspark.sql.functions import *
textFile \
.select(size(split(textFile.value, "\s+")) \
.name("numWords")) \
.agg(max(col("numWords"))) \
.collect()
Further reading
For a deeper dive into Spark please review our Spark Structured Streaming documentation page. To learn about Spark architecture and its use cases, refer to Spark Basics.