Spark performance tuning

Apache Spark contains lots of parameters to configure to improve the execution times of your application. They range from cluster sizing, resource management, and configuration settings. A lot of them are interconnected: changing one influences the behavior of others. For example, if you increase the number of nodes, you have higher parallelism, but also more shuffles. Therefore, performance optimization is often a trial-and-error process, where we are looking for the best compromise. At the end, we have to keep in mind the quality of code and volume of data read into the job. Even though Spark SQL uses Catalyst Optimizer to produce the most optimal execution plan, be aware that this does not replace well-written code.

How to pass parameters

The general way of launching a Spark application is shown in the following example, where:

  • class is a class containing the main method of the application
  • master is the master URL
  • deploy-mode is a driver deployment mode
  • conf are additional options, passed as a key-value pair
  • application-jar contains a code for the application.
  • application-arguments are the input arguments for the application.
./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

Note that there are many more options, which you can look up here. Below are some frequent ones you can optimize to speed up your application.

Executors

The resources need to be provisioned with the task and dataset properties in mind. Total resource capacity is important, but so is their distribution among the executors. Co-localization of executors on worker nodes diminishes data transfer over the network, which has a major impact on performance.

For example, ETL jobs don’t require much shuffling. Therefore, it is not important if the CPUs are located on the same or different nodes, as long as there are enough of them to achieve a high level of parallelism. They also don’t require large memory. In contrast, data analytics jobs with lots of joins and consequently lots of shuffling might run faster on fewer executors with larger memories due to less data transfer between the workers. In summary, resource locality, together with data locality, speeds up the process.

The number of executors we want in a cluster is configured using the --num-executors option. It depends on the number of available cores. One core per worker node should be dedicated to the background processes, the rest can be divided among the executors. It was shown that the optimal number of CPUs per executor is 5, it is configured using the --executor-cores option. Therefore, the max number of executors is (total worker CPUs - no. of workers) / 5.

Memory size and garbage collection

Increasing memory size increases the volume of data that is stored and transformed at a given time. On the other hand, this also increases the garbage collection (GC) time, which might completely outweigh the gains of additional RAM. Cloudera advises 64 GB as the upper limit. Besides carefully selecting the memory size per executor (more is not always better), we can also select the garbage collecting algorithm. The amount of time Spark spends doing garbage collection appears either in Spark UI, tab Executors/Task Time (GC time), or in logs, located on individual nodes. You should also leave 1 GB per node for system processes.

Spark features three different GC algorithms:

  • Parallel (default), set with -XX:+UseParallelGC
  • CMS (Concurrent Mark Sweep), set with -XX:+UseConcMarkSweepGC
  • G1 (Garbage First), set with -XX:+UseG1GC

We select them using the setting spark.executor.extraJavaOptions= (i.e.: spark.executor.extraJavaOptions=-XX:+UseG1GC). They have different throughput and latency, thus they perform differently in function of size and longevity of the objects created by your application. To understand better how they work and how to tune them, read this article.

To be able to compare their performance, we add the options to print out the garbage collection events with -XX:+PrintGCDetails -XX:+PrintGCTimeStamps.

# Use G1 garbage collector and print out the details of garbage collection events for every node
./bin/spark-submit --name "My app" \
                   --master local[4] \
                   --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
                   myApp.jar

CPUs and partitions

CPUs and data partitions are intimately related. Generally, one CPU core executes one task at a time (i.e. it processes one partition). To fully utilize the cluster resources, we want to have at least as many partitions as we have cores. It is suggested to have at least 2 or 3 times more partitions than cores.

In Spark we distinguish three different notions of partitions:

  • Input partitions: When you are reading from HDFS, if the spark.sql.files.maxPartitionBytes setting is equal to the HDFS blocks size, each block will be read into one Spark partition. After the read, if you end up with less partitions than available cores, you will not exploit all the resources. In this case, you can decrease spark.sql.files.maxPartitionBytes in favor of higher number of partitions and higher parallelism.
  • Output partitions: Spark persists one partition to one part-file by default. Even though the file number and size are not affecting the writing process, it has a great effect on the feature reads (read about skewed data below). If needed, repartition() before writing. The maxRecordsPerFile option is also used to limit the size of the files (no limit by default).
  • Shuffle partitions: Shuffle partitions are the partitions Spark uses when shuffling data during wide transformations. The default number is 200. Having too many of them causes small or even empty partitions. In that case, processing time is short while network I/O is high in comparison. Conversely, having too few of them causes large partitions, resulting in long-running tasks or even out-of-memory errors. To best exploit the parallelism, the number of shuffle partitions needs to be set as a multiplier of the number of cores, using the spark.sql.shuffle.partitions=X*no. of CPUs setting. However, determining the good range of the partitions is a trial-and-error process.

Some transformations (e.g. group by) result in data skew. Imagine having sales data. When you group by article id, the top-selling articles (several articles) might create several huge partitions, while less popular articles (majority of the inventory) give many smaller partitions. For the next transformation, the small partitions will be processed faster, while the big ones will take longer time and require more RAM. If they are too big, they even result in out-of-memory error. In any case, the big partitions define the duration of the task and might slow down the whole job. If you end up with big partitions, use repartitioning to change the number of partitions to arbitrary (bigger) number. If you have big number of small partitions, coalescing (reducing the number of partitions) might help. Know that the repartition() command executes a full shuffle and is time consuming, while the coalesce() command is faster. Good illustration of this phenomenon and other skew-related issues can be found in this article.

Execution

Execution in Spark is tuned with the following:

  • Broadcasting: when joining two tables, where one is small, Spark copies and locally persists everything of the nodes. Default size for the broadcasted table is 10 MB. Depending on the cluster size, this number needs to be increased. For example, to broadcast 500 MB table, set the following setting: spark.sql.autoBroadcastJoinThreshold=1048576 * 500. Since the driver is the one doing the broadcasting, increase the driver’s memory beforehand (in respect of your driver’s size) using the following setting: spark.driver.maxResultSize=10g.
  • Adaptive Query Executor (Spark 3.0): AQE is a new feature in Spark 3.0. It contains a lot of functionalities for optimization at the query execution level. To be able to use them, you need to first globally enable them with the setting:spark.sql.adaptive.enabled = true. Then, you need to enable individually each one you want to use:
    • Dynamically coalescing shuffle partitions: previously described problem of having small or empty shuffling partitions is mitigated by coalescing multiple small partitions into a bigger one. The process creates a smaller number of partitions of balanced size. Use the following setting to enable dynamically coalescing shuffle partitions: spark.sql.adaptive.coalescePartitions.enabled=true.
    • Dynamically switching join strategies: in Spark 2.x, the optimized execution plan is developed before the execution and doesn’t change during runtime. In contrast, in Spark 3.x, the execution plan is re-evaluated and adapted during runtime. This leads to changed join strategy, since initially large tables might be pre-processed (filtered) and subsequently fall under the broadcast threshold. Use the following setting to enable dynamically switching join strategies: spark.sql.adaptive.localShuffleReader.enabled=true.
    • Dynamically optimizing skew joins: joins are particularly sensitive to skewed data. To mitigate it, AQE splits the larger partition into multiple smaller ones before the join. Use the following setting to enable dynamically optimizing skew joins: spark.sql.adaptive.skewedJoin.enabled=true.

What to learn next?

Now that you know how to improve the performance of Spark jobs, try to tune the examples from the familiar Spark tutorial.

Additionally, here is a list of articles to complement your knowledge on TDP components: