SparkR

R is a programming language and statistical environment widely used in data science and statistical analysis. R offers a wide range of advanced statistical features and specialised packages for performing complex statistical analyses. It is particularly well suited to statistical modelling, data mining and visualisation. It provides a rich ecosystem of machine learning algorithms. R’s graphical capabilities enable the creation of high-quality visualizations, including scatter plots, histograms, facilitating effective data exploration and presentation.

SparkR is an R package that provides an interface to Apache Spark, allowing users to use Spark’s distributed computing capabilities in the R programming language.

Requirements

SparkR is not currently integrated in the cluster. If you wish to use it, you must first install it with the script provided in the documentation.

Note: if you build your own Spark you need to activate the R profile with an R version higher than 3.5.

This tutorial assumes you are running a cluster based on TDP getting started, an easy-to-launch TDP environment for testing purposes. This deployment provides you with:

  • tdp_user, a user with the ability to kinit for authentication.
  • An edge node accessible by SSH
  • HDFS directories:
    • /user/tdp_user

Note: When using another TDP deployment than tdp-getting-started, some commands require adaptation to your deployment.

Before beginning the tutorial, connect to the cluster and authenticate yourself with kinit using the following commands:

# Connect to edge-01.tdp 
vagrant ssh edge-01
# Switch user to tdp_user
sudo su tdp_user
# Authenticate the user with his Kerberos principal and password
kinit -kt ~/tdp_user.keytab tdp_user@REALM.TDP

Tutorial

SparkR provides a DataFrame API that supports operations such as SQL queries, DataFrame transformations and statistical functions. A SparkDataFrame is the equivalent of a table in a relational database. It is built from different types of data: structured data files, tables in Hive and external databases.

Most of spark connectors directly provides DataFrame API, without RDD equivalent. It is although still possible to get an RDD from a DataFrame through :

converted.rdd <- SparkR:::toRDD(dataframe)

In the following example, the data is loaded from a csv file into a DataFrame.

First create a data.csv with the following content :

id,author,genre,quantity
1,hunter.fields,romance,15
2,leonard.lewis,thriller,81
3,jason.dawson,thriller,90
4,andre.grant,thriller,25
5,earl.walton,romance,40
6,alan.hanson,romance,24
7,clyde.matthews,thriller,31
8,josephine.leonard,thriller,1
9,owen.boone,sci-fi,27
10,max.mcBride,romance,75
# Copy data.csv to hdfs
hdfs dfs -put /home/tdp_user/data.csv /user/tdp_user

You can use SparkR from the Spark3r-shell :

# Launch Spark3r-shell
sparkr3-shell

Now that you’re on sparkR you can load the data into a dataframe :

# Create a DataFrame
df <- read.df("/user/tdp_user/data.csv", source = "csv", header = "true", inferSchema = "true")

Here are some examples of commands to display the data :

# Display first row 
first(df)

#  Display the first N rows
head(df,N)

# Display all data 
collect(df)

# Examine the structure of the dataFrame
str(df)

Spark DataFrame Operations :

# Selection
head(select(df, df$genre))

# Count the number of items by gender
head(summarize(groupBy(df, df$genre), count = n(df$genre)))

# Convert the 'quantity' column to an integer
df2 <- withColumn(df, "quantity", cast(df$quantity, "integer"))

# Find the minimum, maximum and average of 'quantity'
result <- agg(df2, min(df2$quantity), max(df2$quantity), avg(df2$quantity))

# show result
showDF(result)

To obtain data that meets certain conditions, it is useful to use filters. There are two ways of filtering data:

filter(df, condition)
# Or
where(df, condition)

Here’s an example of an application:

# Filter when the quantity is greater than 80
filtered_df <- filter(df, df$quantity > 80)
# Or
filtered_df <- where(df, df$quantity > 80)

showDF(filtered_df)
##id  author       genre   quantity
##2  leonard.lewis   thriller  81
##3  jason.dawson thriller  90

With SparkR, SQL queries can be executed by saving the SparkDataFrame as a temporary view :

# Create the temporary view
createOrReplaceTempView(df, "df")

# Running the SQL query
result_sql <- sql("SELECT * FROM df WHERE quantity >= 80")

showDF(result_sql)
##id  author        genre     quantity
##2  leonard.lewis   thriller 81
##3  jason.dawson  thriller  90

Machine learning

K-means

R offers the possibility to use implemented machine learning models such as K-Means, Naive Bayes, SVM, LDA and so on. Here, we focus on testing a clustering algorithm on a given dataset. Clustering is an unsupervised machine learning technique that aims to group similar data points together based on their characteristics.

Copy the following data to hdfs :

branch_size,trunk_perimeter,tree_type
5.1,4.9,Oak
4.9,4.2,Oak
4.7,4.7,Oak
5.0,4.6,Oak
5.2,5.0,Oak
4.9,4.5,Oak
5.4,5.1,Oak
4.8,4.9,Oak
5.8,5.1,Oak
5.0,4.7,Oak
6.4,6.1,Pine
5.8,5.2,Pine
6.3,5.0,Pine
5.9,5.1,Pine
6.5,5.9,Pine
6.2,5.4,Pine
6.4,5.5,Pine
6.7,5.7,Pine
6.3,5.6,Pine
6.2,5.2,Pine
7.9,6.4,Maple
7.7,6.3,Maple
7.7,6.1,Maple
7.2,5.9,Maple
7.1,5.8,Maple
7.6,6.6,Maple
7.3,6.3,Maple
7.2,6.1,Maple
7.4,6.4,Maple
7.9,6.3,Maple

Now go back to the R shell :

# Load SparkR library
library(SparkR)

# Initialize SparkR session
sparkR.session()

# Load the CSV file
data <- read.df("/user/tdp_user/dataspark/data.csv", source = "csv", header = "true", inferSchema = "true")

# Split the data into training and validation datasets
splits <- randomSplit(data, c(0.7, 0.3), seed = 42)
trainData <- splits[[1]]
validationData <- splits[[2]]

# Apply k-means algorithm on the training data
model <- spark.kmeans(trainData, ~ branch_size + trunk_perimeter, k = 3, maxIter = 20, initMode = "k-means||", seed = 42)

# Predict on the validation data
predictions <- predict(model, validationData)

# Show the results
showDF(predictions)

# Stop the SparkR session
sparkR.stop()