Now that you have a better understanding of YARN’s capabilities, we’ll put them into practice in this tutorial by:
- Choosing a set of MapReduce examples from those available in the Apache Hadoop distribution
- Submitting and running these examples as YARN jobs with
- Monitoring the submitted jobs using the YARN Web UI and the YARN CLI
This tutorial assumes you are running a cluster based on TDP getting started, an easy-to-launch TDP environment for testing purposes. This deployment provides you with:
tdp_user, a user with the ability to
- An edge node accessible by SSH
- HDFS directories:
Note: If you are using your own TDP deployment, you need to adapt the previous configuration accordingly.
Before beginning the tutorial, connect to the cluster and
kinit using the following commands:
# Connect to edge-01.tdp vagrant ssh edge-01 # Switch user to tdp_user sudo su tdp_user # Authenticate the user with his Kerberos principal and password kinit -kt ~/tdp_user.keytab tdp_user@REALM.TDP
The Apache Hadoop distribution comes with a set of MapReduce examples and benchmarks usually located in
hadoop-[hadoop-version]/share/hadoop/mapreduce. This path may change depending on the Hadoop distribution you are using, so to locate the example, run:
find / -name "*hadoop-mapreduce-examples*" 2>/dev/null
This command returns several jars, including
/opt/tdp/hadoop-3.1.1-TDP-0.1.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1-TDP-0.1.0-SNAPSHOT.jar (the one used in this tutorial). The other jars are other benchmarks that you can explore in the future.
To make things easier in the following steps, create an environment variable with the Hadoop examples path:
Now you can get a list of the available examples by running:
yarn jar $YARN_EXAMPLES
This command returns something similar to this:
An example program must be given as the first argument. Valid program names are: aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files. aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files. bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi. dbcount: An example job that counts the pageview counts from a database. distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi. grep: A map/reduce program that counts the matches of a regex in the input. join: A job that affects a join over sorted, equally partitioned datasets multifilewc: A job that counts words from several files. pentomino: A map/reduce tile laying program to find solutions to pentomino problems. pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method. randomtextwriter: A map/reduce program that writes 10GB of random textual data per node. randomwriter: A map/reduce program that writes 10GB of random data per node. secondarysort: An example defining a secondary sort to the reduce. sort: A map/reduce program that sorts the data written by the random writer. sudoku: A sudoku solver. teragen: Generate data for the terasort terasort: Run the terasort teravalidate: Checking results of terasort wordcount: A map/reduce program that counts the words in the input files. wordmean: A map/reduce program that counts the average length of the words in the input files. wordmedian: A map/reduce program that counts the median length of the words in the input files. wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.
The setup is ready! Now you can try some of the examples provided.
We choose the Hadoop TeraSort benchmark to demonstrate how to submit a YARN job and extract information about its running parameters via the YARN UI.
The TeraSort benchmark is often used to test sorting speed. It was previously used in 2008 by Yahoo! to set a record by sorting 1TB of data in 209 seconds. We will try the same in our local cluster, but don’t worry! We will only use 1GB instead.
TeraSort aims to sort large volumes of data as fast as possible using Hadoop MapReduce, the benchmark solicits almost every part of the Hadoop MapReduce framework as well as the HDFS, making it an ideal choice for fine-tuning a Hadoop cluster setup. The benchmark consists of 3 components that run sequentially:
- TeraGen: generates random data
- TeraSort: sorts using MapReduce
- TeraValidate: validates if the output is sorted
The following steps show you how to run the TeraSort benchmark on Hadoop TDP clusters. Don’t forget to clean up your cluster after each run, we show you how to do it at the end of the article.
The first step of the TeraSort benchmark is data generation. The
teragen command is used to generate the input data for the TeraSort benchmark. The first parameter of
teragen is the number of records and the second is the HDFS directory to generate the data. The following command generates 1 GB of data consisting of 10 million records to the
tera-in directory of HDFS:
yarn jar $YARN_EXAMPLES teragen 10000000 tera-in
Once a YARN application is submitted, it is possible to monitor its execution via the YARN ResourceManager UI as described in the next section. It also sends log information to the console.
After establishing a connection and passing Kerberos authentication, the logs show the job submission and its execution process with the percentage of the Map and Reduce tasks performed. When the job is complete, a summary of the process performed and resources used is displayed:
... 2022-05-20 12:25:39,648 INFO impl.YarnClientImpl: Submitted application application_1653043847641_0004 2022-05-20 12:25:39,717 INFO mapreduce.Job: The url to track the job: https://master-02.tdp:8090/proxy/application_1653043847641_0004/ 2022-05-20 12:25:39,720 INFO mapreduce.Job: Running job: job_1653043847641_0004 2022-05-20 12:25:48,938 INFO mapreduce.Job: Job job_1653043847641_0004 running in uber mode : false 2022-05-20 12:25:48,939 INFO mapreduce.Job: map 0% reduce 0% 2022-05-20 12:26:03,263 INFO mapreduce.Job: map 100% reduce 0% 2022-05-20 12:26:04,331 INFO mapreduce.Job: Job job_1653043847641_0004 completed successfully 2022-05-20 12:26:04,509 INFO mapreduce.Job: Counters: 34 File System Counters FILE: Number of bytes read=0 FILE: Number of bytes written=451644 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=167 HDFS: Number of bytes written=1000000000 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Job Counters Killed map tasks=1 Launched map tasks=2 Other local map tasks=2 Total time spent by all maps in occupied slots (ms)=25011 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=25011 Total vcore-milliseconds taken by all map tasks=25011 Total megabyte-milliseconds taken by all map tasks=25611264 Map-Reduce Framework Map input records=10000000 Map output records=10000000 Input split bytes=167 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=271 CPU time spent (ms)=18630 Physical memory (bytes) snapshot=723050496 Virtual memory (bytes) snapshot=5107007488 Total committed heap usage (bytes)=559939584 Peak Map Physical memory (bytes)=384413696 Peak Map Virtual memory (bytes)=2563887104 org.apache.hadoop.examples.terasort.TeraGen$Counters CHECKSUM=21472776955442690 File Input Format Counters Bytes Read=0 File Output Format Counters Bytes Written=1000000000
The given output is provided in all three steps. Let’s check some of the information provided:
- The URL to track the specific job on a web interface.
- FileSystem Counters: Number of bytes read by the file system (local and HDFS) by the map and reduce tasks.
- Job Counters: MapReduce job-level statistics, such as maps launched and the time consumed per mapping.
- Map-Reduce Framework: Will provide additional information on the job memory usage, number of shuffles performed and failed, etc.
What is generated is the following data format per row:
<10 bytes key><10 bytes rowid><78 bytes filler>\r\n
keys consist of random text of 10 bytes, the
rowid is the int row id that is used for sorting, and the
filler consists of 7 blocks of 10 characters and 1 block of 8 characters.
A fast cluster should complete the mapping in a few seconds using the default block size of 128MB.
Once the job is completed, let’s take a look at your HDFS home directory by running:
hdfs dfs -ls
tera-in folder has been created:
Found 1 item drwxr-xr-x - tdp_user tdp_user 0 2022-05-26 16:32 tera-in
Let’s see what’s inside that folder by running:
hdfs dfs -ls tera-in
There are 3 files in this directory (1 metadata file and 2 data files) of 1 GB in total:
Found 3 items -rw-r--r-- 1 tdp_user tdp_user 0 2022-05-26 16:32 tera-in/_SUCCESS -rw-r--r-- 1 tdp_user tdp_user 500000000 2022-05-26 16:32 tera-in/part-m-00000 -rw-r--r-- 1 tdp_user tdp_user 500000000 2022-05-26 16:32 tera-in/part-m-00001
The second step of the TeraSort benchmark is the execution of the TeraSort MapReduce computation on the data generated in step 1 using the following command:
yarn jar $YARN_EXAMPLES terasort tera-in tera-out
The first parameter of the
terasort command is the input of HDFS data directory, and the second part of the
terasort command is the output of the HDFS data directory.
TeraSort is implemented as a MapReduce sort job with a custom partitioner that uses a sorted list of n-1 sampled keys (initial step) that define the key range for each reduce. As data is moved between nodes to be sorted, the
terasort command causes some shuffling, which increases latency (the job takes longer to complete).
There are now 2 folders in your HDFS directory:
Found 2 items drwxr-xr-x - tdp_user tdp_user 0 2022-05-26 16:32 tera-in drwx------ - tdp_user tdp_user 0 2022-05-26 16:39 tera-out
Terasort only orders previously created data (it does not generate or delete data), which means that the
tera-out folder contains 1GB of data as shown here:
Found 3 items -rw-r--r-- 1 tdp_user tdp_user 0 2022-05-26 16:39 tera-out/_SUCCESS -rw-r--r-- 10 tdp_user tdp_user 0 2022-05-26 16:37 tera-out/_partition.lst -rw-r--r-- 1 tdp_user tdp_user 1000000000 2022-05-26 16:39 tera-out/part-r-00000
The last step of the TeraSort benchmark is the validation of the results. TeraValidate validates the sorted output to ensure that the keys are sorted within each file. If something goes wrong with the sorted output, the output of this reducer reports the problem. This can be done using the
teravalidate application as follows. The first parameter is the directory with the sorted data and the second parameter is the directory to store the report containing the results.
yarn jar $YARN_EXAMPLES teravalidate tera-out tera-validate
There are now 3 folders in your HDFS home directory:
Found 3 items drwxr-xr-x - tdp_user tdp_user 0 2022-05-26 16:32 tera-in drwx------ - tdp_user tdp_user 0 2022-05-26 16:39 tera-out drwxr-xr-x - tdp_user tdp_user 0 2022-05-26 16:44 tera-validate
As the result of these 3 steps, we submitted 3 YARN applications sequentially.
You can monitor the application submission ID, the user who submitted the application, the name of the application, the queue in which the application is submitted, the start time and end time of completed applications, and the final status of the application, using the ResourceManager UI.
To access the ResourceManager web UI in TDP refer to this link: https://master-02.tdp:8090/cluster/apps.
Note: The ResourceManager UI is Kerberos protected, and you must pass the Kerberos authentication to access it. Information on how to configure this can be found here.
You can search for applications on the Applications page:
In the preceding example, we submitted the TeraGen application with the ID
application_1653043847641_0006. To view the details, refer to the Application List tab, where we can see all the attempts that were performed while executing this application:
Detailed information about each attempt is available on the Attempt Info page, where you can explore how many containers were launched and on what nodes specifically:
The Nodes page on the YARN Web user interface allows you to view information about the cluster nodes on which the NodeManagers are running.
We used the YARN CLI during this tutorial to submit the various jobs, but it is also a useful tool when it comes to monitoring and managing jobs on the cluster without having to access a web UI (common for administrators).
Here are some useful commands that you can use:
- To get the list of YARN commands
- To get the logs of a specified application
yarn logs -applicationId <app-id>
- To list the applications
yarn application -list # can be used with -appTypes, -appStates or -AppTags for filtering
- To kill an application
yarn application -kill <app-id>
For more information about the different commands and their specificities, check the official Apache Yarn Commands page
After performing all these steps, don’t forget to delete the TeraSort input and output directories, so that the next benchmark can be run without storage problems.
hdfs dfs -rm -r -skipTrash tera*
What to learn next?
To get more comfortable with YARN, we recommend that you run and monitor the other MapReduce examples available. YARN’s multi-tenancy allows you to run other distributed processing engines, such as Spark.