Yarn tutorial
Now that you have a better understanding of YARN’s capabilities, we’ll put them into practice in this tutorial by:
- Choosing a set of MapReduce examples from those available in the Apache Hadoop distribution
- Submitting and running these examples as YARN jobs with
yarn jar
. - Monitoring the submitted jobs using the YARN Web UI and the YARN CLI
Requirements
This tutorial assumes you are running a cluster based on TDP getting started, an easy-to-launch TDP environment for testing purposes. This deployment provides you with:
tdp_user
, a user with the ability tokinit
for authentication.- An edge node accessible by SSH
- HDFS directories:
/user/tdp_user
Note: If you are using your own TDP deployment, you need to adapt the previous configuration accordingly.
Before beginning the tutorial, connect to the cluster and kinit
using the following commands:
# Connect to edge-01.tdp
vagrant ssh edge-01
# Switch user to tdp_user
sudo su tdp_user
# Authenticate the user with his Kerberos principal and password
kinit -kt ~/tdp_user.keytab tdp_user@REALM.TDP
MapReduce examples
The Apache Hadoop distribution comes with a set of MapReduce examples and benchmarks usually located in hadoop-[hadoop-version]/share/hadoop/mapreduce
. This path may change depending on the Hadoop distribution you are using, so to locate the example, run:
find / -name "*hadoop-mapreduce-examples*" 2>/dev/null
This command returns several jars, including /opt/tdp/hadoop-3.1.1-TDP-0.1.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1-TDP-0.1.0-SNAPSHOT.jar
(the one used in this tutorial). The other jars are other benchmarks that you can explore in the future.
To make things easier in the following steps, create an environment variable with the Hadoop examples path:
export YARN_EXAMPLES=/opt/tdp/hadoop-3.1.1-TDP-0.1.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1-TDP-0.1.0-SNAPSHOT.jar
Now you can get a list of the available examples by running:
yarn jar $YARN_EXAMPLES
This command returns something similar to this:
An example program must be given as the first argument.
Valid program names are:
aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files.
aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files.
bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi.
dbcount: An example job that counts the pageview counts from a database.
distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.
grep: A map/reduce program that counts the matches of a regex in the input.
join: A job that affects a join over sorted, equally partitioned datasets
multifilewc: A job that counts words from several files.
pentomino: A map/reduce tile laying program to find solutions to pentomino problems.
pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method.
randomtextwriter: A map/reduce program that writes 10GB of random textual data per node.
randomwriter: A map/reduce program that writes 10GB of random data per node.
secondarysort: An example defining a secondary sort to the reduce.
sort: A map/reduce program that sorts the data written by the random writer.
sudoku: A sudoku solver.
teragen: Generate data for the terasort
terasort: Run the terasort
teravalidate: Checking results of terasort
wordcount: A map/reduce program that counts the words in the input files.
wordmean: A map/reduce program that counts the average length of the words in the input files.
wordmedian: A map/reduce program that counts the median length of the words in the input files.
wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.
The setup is ready! Now you can try some of the examples provided.
Job submission
We choose the Hadoop TeraSort benchmark to demonstrate how to submit a YARN job and extract information about its running parameters via the YARN UI.
The TeraSort benchmark is often used to test sorting speed. It was previously used in 2008 by Yahoo! to set a record by sorting 1TB of data in 209 seconds. We will try the same in our local cluster, but don’t worry! We will only use 1GB instead.
TeraSort aims to sort large volumes of data as fast as possible using Hadoop MapReduce, the benchmark solicits almost every part of the Hadoop MapReduce framework as well as the HDFS, making it an ideal choice for fine-tuning a Hadoop cluster setup. The benchmark consists of 3 components that run sequentially:
- TeraGen: generates random data
- TeraSort: sorts using MapReduce
- TeraValidate: validates if the output is sorted
The following steps show you how to run the TeraSort benchmark on Hadoop TDP clusters. Don’t forget to clean up your cluster after each run, we show you how to do it at the end of the article.
TeraGen
The first step of the TeraSort benchmark is data generation. The teragen
command is used to generate the input data for the TeraSort benchmark. The first parameter of teragen
is the number of records and the second is the HDFS directory to generate the data. The following command generates 1 GB of data consisting of 10 million records to the tera-in
directory of HDFS:
yarn jar $YARN_EXAMPLES teragen 10000000 tera-in
Once a YARN application is submitted, it is possible to monitor its execution via the YARN ResourceManager UI as described in the next section. It also sends log information to the console.
After establishing a connection and passing Kerberos authentication, the logs show the job submission and its execution process with the percentage of the Map and Reduce tasks performed. When the job is complete, a summary of the process performed and resources used is displayed:
...
2022-05-20 12:25:39,648 INFO impl.YarnClientImpl: Submitted application application_1653043847641_0004
2022-05-20 12:25:39,717 INFO mapreduce.Job: The url to track the job: https://master-02.tdp:8090/proxy/application_1653043847641_0004/
2022-05-20 12:25:39,720 INFO mapreduce.Job: Running job: job_1653043847641_0004
2022-05-20 12:25:48,938 INFO mapreduce.Job: Job job_1653043847641_0004 running in uber mode : false
2022-05-20 12:25:48,939 INFO mapreduce.Job: map 0% reduce 0%
2022-05-20 12:26:03,263 INFO mapreduce.Job: map 100% reduce 0%
2022-05-20 12:26:04,331 INFO mapreduce.Job: Job job_1653043847641_0004 completed successfully
2022-05-20 12:26:04,509 INFO mapreduce.Job: Counters: 34
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=451644
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=167
HDFS: Number of bytes written=1000000000
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Job Counters
Killed map tasks=1
Launched map tasks=2
Other local map tasks=2
Total time spent by all maps in occupied slots (ms)=25011
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=25011
Total vcore-milliseconds taken by all map tasks=25011
Total megabyte-milliseconds taken by all map tasks=25611264
Map-Reduce Framework
Map input records=10000000
Map output records=10000000
Input split bytes=167
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=271
CPU time spent (ms)=18630
Physical memory (bytes) snapshot=723050496
Virtual memory (bytes) snapshot=5107007488
Total committed heap usage (bytes)=559939584
Peak Map Physical memory (bytes)=384413696
Peak Map Virtual memory (bytes)=2563887104
org.apache.hadoop.examples.terasort.TeraGen$Counters
CHECKSUM=21472776955442690
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=1000000000
The given output is provided in all three steps. Let’s check some of the information provided:
- The URL to track the specific job on a web interface.
- FileSystem Counters: Number of bytes read by the file system (local and HDFS) by the map and reduce tasks.
- Job Counters: MapReduce job-level statistics, such as maps launched and the time consumed per mapping.
- Map-Reduce Framework: Will provide additional information on the job memory usage, number of shuffles performed and failed, etc.
What is generated is the following data format per row:
<10 bytes key><10 bytes rowid><78 bytes filler>\r\n
Where the keys
consist of random text of 10 bytes, the rowid
is the int row id that is used for sorting, and the filler
consists of 7 blocks of 10 characters and 1 block of 8 characters.
A fast cluster should complete the mapping in a few seconds using the default block size of 128MB.
Once the job is completed, let’s take a look at your HDFS home directory by running:
hdfs dfs -ls
The tera-in
folder has been created:
Found 1 item
drwxr-xr-x - tdp_user tdp_user 0 2022-05-26 16:32 tera-in
Let’s see what’s inside that folder by running:
hdfs dfs -ls tera-in
There are 3 files in this directory (1 metadata file and 2 data files) of 1 GB in total:
Found 3 items
-rw-r--r-- 1 tdp_user tdp_user 0 2022-05-26 16:32 tera-in/_SUCCESS
-rw-r--r-- 1 tdp_user tdp_user 500000000 2022-05-26 16:32 tera-in/part-m-00000
-rw-r--r-- 1 tdp_user tdp_user 500000000 2022-05-26 16:32 tera-in/part-m-00001
TeraSort
The second step of the TeraSort benchmark is the execution of the TeraSort MapReduce computation on the data generated in step 1 using the following command:
yarn jar $YARN_EXAMPLES terasort tera-in tera-out
The first parameter of the terasort
command is the input of HDFS data directory, and the second part of the terasort
command is the output of the HDFS data directory.
TeraSort is implemented as a MapReduce sort job with a custom partitioner that uses a sorted list of n-1 sampled keys (initial step) that define the key range for each reduce. As data is moved between nodes to be sorted, the terasort
command causes some shuffling, which increases latency (the job takes longer to complete).
There are now 2 folders in your HDFS directory:
Found 2 items
drwxr-xr-x - tdp_user tdp_user 0 2022-05-26 16:32 tera-in
drwx------ - tdp_user tdp_user 0 2022-05-26 16:39 tera-out
Terasort only orders previously created data (it does not generate or delete data), which means that the tera-out
folder contains 1GB of data as shown here:
Found 3 items
-rw-r--r-- 1 tdp_user tdp_user 0 2022-05-26 16:39 tera-out/_SUCCESS
-rw-r--r-- 10 tdp_user tdp_user 0 2022-05-26 16:37 tera-out/_partition.lst
-rw-r--r-- 1 tdp_user tdp_user 1000000000 2022-05-26 16:39 tera-out/part-r-00000
TeraValidate
The last step of the TeraSort benchmark is the validation of the results. TeraValidate validates the sorted output to ensure that the keys are sorted within each file. If something goes wrong with the sorted output, the output of this reducer reports the problem. This can be done using the teravalidate
application as follows. The first parameter is the directory with the sorted data and the second parameter is the directory to store the report containing the results.
yarn jar $YARN_EXAMPLES teravalidate tera-out tera-validate
There are now 3 folders in your HDFS home directory:
Found 3 items
drwxr-xr-x - tdp_user tdp_user 0 2022-05-26 16:32 tera-in
drwx------ - tdp_user tdp_user 0 2022-05-26 16:39 tera-out
drwxr-xr-x - tdp_user tdp_user 0 2022-05-26 16:44 tera-validate
As the result of these 3 steps, we submitted 3 YARN applications sequentially.
ResourceManager UI
You can monitor the application submission ID, the user who submitted the application, the name of the application, the queue in which the application is submitted, the start time and end time of completed applications, and the final status of the application, using the ResourceManager UI.
To access the ResourceManager web UI in TDP refer to this link: https://master-02.tdp:8090/cluster/apps.
Note: The ResourceManager UI is Kerberos protected, and you must pass the Kerberos authentication to access it. Information on how to configure this can be found here.
Monitoring applications
You can search for applications on the Applications page:
In the preceding example, we submitted the TeraGen application with the ID application_1653043847641_0006
. To view the details, refer to the Application List tab, where we can see all the attempts that were performed while executing this application:
Detailed information about each attempt is available on the Attempt Info page, where you can explore how many containers were launched and on what nodes specifically:
Monitoring nodes
The Nodes page on the YARN Web user interface allows you to view information about the cluster nodes on which the NodeManagers are running.
YARN CLI
We used the YARN CLI during this tutorial to submit the various jobs, but it is also a useful tool when it comes to monitoring and managing jobs on the cluster without having to access a web UI (common for administrators).
Here are some useful commands that you can use:
- To get the list of YARN commands
yarn -help
- To get the logs of a specified application
yarn logs -applicationId <app-id>
- To list the applications
yarn application -list # can be used with -appTypes, -appStates or -AppTags for filtering
- To kill an application
yarn application -kill <app-id>
For more information about the different commands and their specificities, check the official Apache Yarn Commands page
Cleaning clusters
After performing all these steps, don’t forget to delete the TeraSort input and output directories, so that the next benchmark can be run without storage problems.
hdfs dfs -rm -r -skipTrash tera*
What to learn next?
To get more comfortable with YARN, we recommend that you run and monitor the other MapReduce examples available. YARN’s multi-tenancy allows you to run other distributed processing engines, such as Spark.