Apache Spark tutorial

Apache Spark is an open-source distributed computing framework with its own processing engine to perform transformations and analyses over big data both in batch and streaming processing. It offers APIs for Scala, Python, Java, R, and SQL.

Spark is designed to split a job into tasks running on several servers at the same time, turning a long task on a large dataset into smaller tasks on portions of data.

This tutorial provides pieces of information to use spark-submit, spark-shell, and pyspark to develop and execute Spark code on TDP.

Requirements

This tutorial assumes you are running a cluster based on TDP getting started, an easy-to-launch TDP environment for testing purposes. This deployment provides you with:

  • tdp_user, a user with the ability to kinit for authentication.
  • An edge node accessible by SSH
  • HDFS directories:
    • /user/tdp_user

Note: When using another TDP deployment than tdp-getting-started, some commands require some customization with your environment.

Before beginning the tutorial, connect to the cluster and authenticate yourself with kinit using the following commands:

# Connect to edge-01.tdp 
vagrant ssh edge-01
# Switch user to tdp_user
sudo su tdp_user
# Authenticate the user with his Kerberos principal and password
kinit -kt ~/tdp_user.keytab tdp_user@REALM.TDP

Running a Spark application on TDP

Spark commands are executed using a Spark client located on an edge node of a cluster. An edge node is a space, a VM in the case of the getting-started cluster, used to interact with the rest of the cluster. Among other things, one of its uses is to provide a shell environment provisioned with all the binaries and configurations required to communicate with the services. You securely connect to it with SSH.

The spark-submit command launches a distributed application on a cluster via a resource manager, YARN in the case of TDP. Spark can also be run on Kubernetes and even in standalone mode. The spark-submit command takes a range of parameters that define where, how, and with what resources the application needs to be executed.

The following commands illustrate how to submit an application, check its status, and how to stop it.

Run a Spark application locally

YARN is not required, for the sake of purpose, running spark in standalone involves the following spark-submit command:

# Move to the Spark directory
cd /opt/tdp/spark
# Run the application localy
./bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master local[4] \
    ./examples/jars/spark-examples_2.11-2.3.5-TDP-0.1.0-SNAPSHOT.jar \
    100

The options are the following:

  • --class: the name of the Spark application class to run (e.g. org.apache.spark.examples.SparkPi).
  • --master: the master URL for the cluster (e.g. Execute it on localhost using 4 threads).
  • application-jar: the jar containing the Spark application class.
  • application-arguments: arguments passed to the main method of main class, in this case 100.

Submit a Spark application to YARN

The following spark-submit command allows to submit a distributed Spark application to YARN:

# Move to the Spark directory
cd /opt/tdp/spark
# Submit the application to YARN
./bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue default \
    ./examples/jars/spark-examples_2.11-2.3.5-TDP-0.1.0-SNAPSHOT.jar \
    100

In this case additional options are used to tune the resource, such as the memory requested for the driver and executors, the cores for the executors and the deploy mode. Note that these parameters won’t override the limits set on the user or the YARN queue.

The options are the following:

  • --master yarn: execute on a TDP cluster using YARN.
  • --deploy-mode cluster: host driver program on a worker (not localhost).
  • --driver-memory 4g: requested memory for driver program.
  • --executor-memory 2g: requested memory for executors.
  • --executor-cores 1: requested cores for executors.
  • --queue default: which yarn queue to submit to.

Status of application

The yarn command is used to see the status of running applications:

yarn application --list

Stop a Spark job

The applicationID of running jobs can be retrieved with the previous command. It can be used to perform operations on a particular job such as stopping it:

yarn application --kill <application_id>

Spark Shell with Scala

The spark-shell command is used to quickly explore or analyze data from the command line.

Execute the below commands to add test data to HDFS and launch the spark-shell.

# Download moby-dick to the edge
curl --create-dirs -o /home/tdp_user/moby-dick.txt https://www.gutenberg.org/files/2701/2701-0.txt
# Copy moby-dick.txt to hdfs
hdfs dfs \
  -copyFromLocal /home/tdp_user/moby-dick.txt \
  /user/tdp_user
# Launch the spark-shell client
spark-shell

RDD operations

Transformations and Actions are RDD (resilient distributed datasets) operations. This is a core feature of Spark, it allows table distribution and parallel operations.

  • A Transformation is a function that produces a new RDD from the existing RDDs.
  • An Action is performed when you want to work with the actual dataset. Unlike a transformation, an action returns a result but not a new RDD.

Transformation operation

The following command is a transformation. It describes a computation to be done in the future. The transformation sets up for a future Spark action by reading the contents of the book and storing it in a variable classed textFile. It needs to be run within the spark-shell client opened previously:

val textFile = spark.read.textFile("/user/tdp_user/moby-dick.txt")

Action operation

Actions can be used on the textFile variable. Actions trigger computation and return a result.

// Returns the line count
textFile.count()
// Returns the first line
textFile.first()

Combine Transformations and Actions

Transformations and actions can be chained together to create complex yet concise code. For example, the next command returns the number of words in the largest line.

textFile
  .map(line => line.split(" ").size)
  .reduce((a, b) => if (a > b) a else b)

Spark Optimization - Caching

The Spark driver optimizes the query execution plan to prune redundant operations and increase performance. Users can also force the caching of certain small and regularly queried datasets to further optimize.

textFile.cache()

Spark Shell with Python

For users that prefer the Python syntax, Spark provides the option to use the pyspark shell instead of the spark-shell shell.

pyspark

The following commands perform the same transformation and actions performed in the previous section.

# transformation: read the text in a variable
textFile = spark.read.text("/user/tdp_user/moby-dick.txt")
# action: returns the line count
textFile.count()
# action: returns the first line
textFile.first()
# combination of action and transformation: returns the maximum number of words in a line
from pyspark.sql.functions import *
textFile \
  .select(size(split(textFile.value, "\s+")) \
  .name("numWords")) \
  .agg(max(col("numWords"))) \
  .collect()

Further reading

For a deeper dive into Spark please review our Spark Structured Streaming documentation page. To learn about Spark architecture and its use cases, refer to Spark Basics.