Apache Spark tutorial
Apache Spark is an open-source distributed computing framework with its own processing engine to perform transformations and analyses over big data both in batch and streaming processing. It offers APIs for Scala, Python, Java, R, and SQL.
Spark is designed to split a job into tasks running on several servers at the same time, turning a long task on a large dataset into smaller tasks on portions of data.
This tutorial provides pieces of information to use
pyspark to develop and execute Spark code on TDP.
This tutorial assumes you are running a cluster based on TDP getting started, an easy-to-launch TDP environment for testing purposes. This deployment provides you with:
tdp_user, a user with the ability to
- An edge node accessible by SSH
- HDFS directories:
Note: When using another TDP deployment than tdp-getting-started, some commands require adaptation to your deployment.
Before beginning the tutorial, connect to the cluster and authenticate yourself with
kinit using the following commands:
# Connect to edge-01.tdp vagrant ssh edge-01 # Switch user to tdp_user sudo su tdp_user # Authenticate the user with his Kerberos principal and password kinit -kt ~/tdp_user.keytab tdp_user@REALM.TDP
Running a Spark application on TDP
Spark commands are executed using a Spark client located on an edge node of a cluster. An edge node is a space, a VM in the case of the getting-started cluster, used to interact with the rest of the cluster. Among other things, one of its uses is to provide a shell environment provisioned with all the binaries and configurations required to communicate with the services. You securely connect to it with SSH.
spark-submit command launches a distributed application on a cluster via a resource manager, YARN in the case of TDP. Spark can also be run on Kubernetes and even in standalone mode. The
spark-submit command takes a range of parameters that define where, how, and with what resources the application needs to be executed.
The following commands illustrate how to submit an application, check its status, and how to stop it.
Run a Spark application locally
YARN is not required, for the sake of purpose, running spark in standalone involves the following
# Move to the Spark directory cd /opt/tdp/spark # Run the application localy ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master local \ ./examples/jars/spark-examples_2.11-2.3.5-TDP-0.1.0-SNAPSHOT.jar \ 100
The options are the following:
--class: the name of the Spark application class to run (e.g.
--master: the master URL for the cluster (e.g. Execute it on localhost using 4 threads).
application-jar: the jar containing the Spark application class.
application-arguments: arguments passed to the main method of main class, in this case 100.
Submit a Spark application to YARN
spark-submit command allows to submit a distributed Spark application to YARN:
# Move to the Spark directory cd /opt/tdp/spark # Submit the application to YARN ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ --driver-memory 4g \ --executor-memory 2g \ --executor-cores 1 \ --queue default \ ./examples/jars/spark-examples_2.11-2.3.5-TDP-0.1.0-SNAPSHOT.jar \ 100
In this case additional options are used to tune the resource, such as the memory requested for the driver and executors, the cores for the executors and the deploy mode. Note that these parameters won’t override the limits set on the user or the YARN queue.
The options are the following:
--master yarn: execute on a TDP cluster using YARN.
--deploy-mode cluster: host driver program on a worker (not localhost).
--driver-memory 4g: requested memory for driver program.
--executor-memory 2g: requested memory for executors.
--executor-cores 1: requested cores for executors.
--queue default: which yarn queue to submit to.
Status of application
yarn command is used to see the status of running applications:
yarn application --list
Stop a Spark job
applicationID of running jobs can be retrieved with the previous command. It can be used to perform operations on a particular job such as stopping it:
yarn application --kill <application_id>
Spark Shell with Scala
spark-shell command is used to quickly explore or analyze data from the command line.
Execute the below commands to add test data to HDFS and launch the
# Download moby-dick to the edge curl --create-dirs -o /home/tdp_user/moby-dick.txt https://www.gutenberg.org/files/2701/2701-0.txt # Copy moby-dick.txt to hdfs hdfs dfs \ -copyFromLocal /home/tdp_user/moby-dick.txt \ /user/tdp_user # Launch the spark-shell client spark-shell
Transformations and Actions are RDD (resilient distributed datasets) operations. This is a core feature of Spark, it allows table distribution and parallel operations.
- A Transformation is a function that produces a new RDD from the existing RDDs.
- An Action is performed when you want to work with the actual dataset. Unlike a transformation, an action returns a result but not a new RDD.
The following command is a transformation. It describes a computation to be done in the future. The transformation sets up for a future Spark action by reading the contents of the book and storing it in a variable classed
textFile. It needs to be run within the
spark-shell client opened previously:
val textFile = spark.read.textFile("/user/tdp_user/moby-dick.txt")
Actions can be used on the
textFile variable. Actions trigger computation and return a result.
// Returns the line count textFile.count() // Returns the first line textFile.first()
Combine Transformations and Actions
Transformations and actions can be chained together to create complex yet concise code. For example, the next command returns the number of words in the largest line.
textFile .map(line => line.split(" ").size) .reduce((a, b) => if (a > b) a else b)
Spark Optimization - Caching
The Spark driver optimizes the query execution plan to prune redundant operations and increase performance. Users can also force the caching of certain small and regularly queried datasets to further optimize.
Spark Shell with Python
For users that prefer the Python syntax, Spark provides the option to use the
pyspark shell instead of the
The following commands perform the same transformation and actions performed in the previous section.
# transformation: read the text in a variable textFile = spark.read.text("/user/tdp_user/moby-dick.txt") # action: returns the line count textFile.count() # action: returns the first line textFile.first() # combination of action and transformation: returns the maximum number of words in a line from pyspark.sql.functions import * textFile \ .select(size(split(textFile.value, "\s+")) \ .name("numWords")) \ .agg(max(col("numWords"))) \ .collect()
For a deeper dive into Spark please review our Spark Structured Streaming documentation page. To learn about Spark architecture and its use cases, refer to Spark Basics.