Yarn tutorial

Now that you have a better understanding of YARN’s capabilities, we’ll put them into practice in this tutorial by:

  • Choosing a set of MapReduce examples from those available in the Apache Hadoop distribution
  • Submitting and running these examples as YARN jobs with yarn jar.
  • Monitoring the submitted jobs using the YARN Web UI and the YARN CLI

Requirements

This tutorial assumes you are running a cluster based on TDP getting started, an easy-to-launch TDP environment for testing purposes. This deployment provides you with:

  • tdp_user, a user with the ability to kinit for authentication.
  • An edge node accessible by SSH
  • HDFS directories:
    • /user/tdp_user

Note: If you are using your own TDP deployment, you need to adapt the previous configuration accordingly.

Before beginning the tutorial, connect to the cluster and kinit using the following commands:

# Connect to edge-01.tdp 
vagrant ssh edge-01
# Switch user to tdp_user
sudo su tdp_user
# Authenticate the user with his Kerberos principal and password
kinit -kt ~/tdp_user.keytab tdp_user@REALM.TDP

MapReduce examples

The Apache Hadoop distribution comes with a set of MapReduce examples and benchmarks usually located in hadoop-[hadoop-version]/share/hadoop/mapreduce. This path may change depending on the Hadoop distribution you are using, so to locate the example, run:

find / -name "*hadoop-mapreduce-examples*" 2>/dev/null

This command returns several jars, including /opt/tdp/hadoop-3.1.1-TDP-0.1.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1-TDP-0.1.0-SNAPSHOT.jar (the one used in this tutorial). The other jars are other benchmarks that you can explore in the future.

To make things easier in the following steps, create an environment variable with the Hadoop examples path:

export YARN_EXAMPLES=/opt/tdp/hadoop-3.1.1-TDP-0.1.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1-TDP-0.1.0-SNAPSHOT.jar

Now you can get a list of the available examples by running:

yarn jar $YARN_EXAMPLES

This command returns something similar to this:

An example program must be given as the first argument.
Valid program names are:
  aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files.
  aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files.
  bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi.
  dbcount: An example job that counts the pageview counts from a database.
  distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.
  grep: A map/reduce program that counts the matches of a regex in the input.
  join: A job that affects a join over sorted, equally partitioned datasets
  multifilewc: A job that counts words from several files.
  pentomino: A map/reduce tile laying program to find solutions to pentomino problems.
  pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method.
  randomtextwriter: A map/reduce program that writes 10GB of random textual data per node.
  randomwriter: A map/reduce program that writes 10GB of random data per node.
  secondarysort: An example defining a secondary sort to the reduce.
  sort: A map/reduce program that sorts the data written by the random writer.
  sudoku: A sudoku solver.
  teragen: Generate data for the terasort
  terasort: Run the terasort
  teravalidate: Checking results of terasort
  wordcount: A map/reduce program that counts the words in the input files.
  wordmean: A map/reduce program that counts the average length of the words in the input files.
  wordmedian: A map/reduce program that counts the median length of the words in the input files.
  wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.

The setup is ready! Now you can try some of the examples provided.

Job submission

We choose the Hadoop TeraSort benchmark to demonstrate how to submit a YARN job and extract information about its running parameters via the YARN UI.

The TeraSort benchmark is often used to test sorting speed. It was previously used in 2008 by Yahoo! to set a record by sorting 1TB of data in 209 seconds. We will try the same in our local cluster, but don’t worry! We will only use 1GB instead.

TeraSort aims to sort large volumes of data as fast as possible using Hadoop MapReduce, the benchmark solicits almost every part of the Hadoop MapReduce framework as well as the HDFS, making it an ideal choice for fine-tuning a Hadoop cluster setup. The benchmark consists of 3 components that run sequentially:

  1. TeraGen: generates random data
  2. TeraSort: sorts using MapReduce
  3. TeraValidate: validates if the output is sorted

The following steps show you how to run the TeraSort benchmark on Hadoop TDP clusters. Don’t forget to clean up your cluster after each run, we show you how to do it at the end of the article.

TeraGen

The first step of the TeraSort benchmark is data generation. The teragen command is used to generate the input data for the TeraSort benchmark. The first parameter of teragen is the number of records and the second is the HDFS directory to generate the data. The following command generates 1 GB of data consisting of 10 million records to the tera-in directory of HDFS:

yarn jar $YARN_EXAMPLES teragen 10000000 tera-in

Once a YARN application is submitted, it is possible to monitor its execution via the YARN ResourceManager UI as described in the next section. It also sends log information to the console.

After establishing a connection and passing Kerberos authentication, the logs show the job submission and its execution process with the percentage of the Map and Reduce tasks performed. When the job is complete, a summary of the process performed and resources used is displayed:

...
2022-05-20 12:25:39,648 INFO impl.YarnClientImpl: Submitted application application_1653043847641_0004
2022-05-20 12:25:39,717 INFO mapreduce.Job: The url to track the job: https://master-02.tdp:8090/proxy/application_1653043847641_0004/
2022-05-20 12:25:39,720 INFO mapreduce.Job: Running job: job_1653043847641_0004
2022-05-20 12:25:48,938 INFO mapreduce.Job: Job job_1653043847641_0004 running in uber mode : false
2022-05-20 12:25:48,939 INFO mapreduce.Job:  map 0% reduce 0%
2022-05-20 12:26:03,263 INFO mapreduce.Job:  map 100% reduce 0%
2022-05-20 12:26:04,331 INFO mapreduce.Job: Job job_1653043847641_0004 completed successfully
2022-05-20 12:26:04,509 INFO mapreduce.Job: Counters: 34
        File System Counters
                FILE: Number of bytes read=0
                FILE: Number of bytes written=451644
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=167
                HDFS: Number of bytes written=1000000000
                HDFS: Number of read operations=12
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=4
        Job Counters 
                Killed map tasks=1
                Launched map tasks=2
                Other local map tasks=2
                Total time spent by all maps in occupied slots (ms)=25011
                Total time spent by all reduces in occupied slots (ms)=0
                Total time spent by all map tasks (ms)=25011
                Total vcore-milliseconds taken by all map tasks=25011
                Total megabyte-milliseconds taken by all map tasks=25611264
        Map-Reduce Framework
                Map input records=10000000
                Map output records=10000000
                Input split bytes=167
                Spilled Records=0
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=271
                CPU time spent (ms)=18630
                Physical memory (bytes) snapshot=723050496
                Virtual memory (bytes) snapshot=5107007488
                Total committed heap usage (bytes)=559939584
                Peak Map Physical memory (bytes)=384413696
                Peak Map Virtual memory (bytes)=2563887104
        org.apache.hadoop.examples.terasort.TeraGen$Counters
                CHECKSUM=21472776955442690
        File Input Format Counters 
                Bytes Read=0
        File Output Format Counters 
                Bytes Written=1000000000

The given output is provided in all three steps. Let’s check some of the information provided:

  • The URL to track the specific job on a web interface.
  • FileSystem Counters: Number of bytes read by the file system (local and HDFS) by the map and reduce tasks.
  • Job Counters: MapReduce job-level statistics, such as maps launched and the time consumed per mapping.
  • Map-Reduce Framework: Will provide additional information on the job memory usage, number of shuffles performed and failed, etc.

What is generated is the following data format per row:

<10 bytes key><10 bytes rowid><78 bytes filler>\r\n

Where the keys consist of random text of 10 bytes, the rowid is the int row id that is used for sorting, and the filler consists of 7 blocks of 10 characters and 1 block of 8 characters.

A fast cluster should complete the mapping in a few seconds using the default block size of 128MB.

Once the job is completed, let’s take a look at your HDFS home directory by running:

hdfs dfs -ls

The tera-in folder has been created:

Found 1 item
drwxr-xr-x   - tdp_user tdp_user          0 2022-05-26 16:32 tera-in

Let’s see what’s inside that folder by running:

hdfs dfs -ls tera-in

There are 3 files in this directory (1 metadata file and 2 data files) of 1 GB in total:

Found 3 items
-rw-r--r--   1 tdp_user tdp_user          0 2022-05-26 16:32 tera-in/_SUCCESS
-rw-r--r--   1 tdp_user tdp_user  500000000 2022-05-26 16:32 tera-in/part-m-00000
-rw-r--r--   1 tdp_user tdp_user  500000000 2022-05-26 16:32 tera-in/part-m-00001

TeraSort

The second step of the TeraSort benchmark is the execution of the TeraSort MapReduce computation on the data generated in step 1 using the following command:

yarn jar $YARN_EXAMPLES terasort tera-in tera-out

The first parameter of the terasort command is the input of HDFS data directory, and the second part of the terasort command is the output of the HDFS data directory.

TeraSort is implemented as a MapReduce sort job with a custom partitioner that uses a sorted list of n-1 sampled keys (initial step) that define the key range for each reduce. As data is moved between nodes to be sorted, the terasort command causes some shuffling, which increases latency (the job takes longer to complete).

There are now 2 folders in your HDFS directory:

Found 2 items
drwxr-xr-x   - tdp_user tdp_user          0 2022-05-26 16:32 tera-in
drwx------   - tdp_user tdp_user          0 2022-05-26 16:39 tera-out

Terasort only orders previously created data (it does not generate or delete data), which means that the tera-out folder contains 1GB of data as shown here:

Found 3 items
-rw-r--r--   1 tdp_user tdp_user          0 2022-05-26 16:39 tera-out/_SUCCESS
-rw-r--r--  10 tdp_user tdp_user          0 2022-05-26 16:37 tera-out/_partition.lst
-rw-r--r--   1 tdp_user tdp_user 1000000000 2022-05-26 16:39 tera-out/part-r-00000

TeraValidate

The last step of the TeraSort benchmark is the validation of the results. TeraValidate validates the sorted output to ensure that the keys are sorted within each file. If something goes wrong with the sorted output, the output of this reducer reports the problem. This can be done using the teravalidate application as follows. The first parameter is the directory with the sorted data and the second parameter is the directory to store the report containing the results.

yarn jar $YARN_EXAMPLES teravalidate tera-out tera-validate

There are now 3 folders in your HDFS home directory:

Found 3 items
drwxr-xr-x   - tdp_user tdp_user          0 2022-05-26 16:32 tera-in
drwx------   - tdp_user tdp_user          0 2022-05-26 16:39 tera-out
drwxr-xr-x   - tdp_user tdp_user          0 2022-05-26 16:44 tera-validate

As the result of these 3 steps, we submitted 3 YARN applications sequentially.

ResourceManager UI

You can monitor the application submission ID, the user who submitted the application, the name of the application, the queue in which the application is submitted, the start time and end time of completed applications, and the final status of the application, using the ResourceManager UI.

To access the ResourceManager web UI in TDP refer to this link: https://master-02.tdp:8090/cluster/apps.

Note: The ResourceManager UI is Kerberos protected, and you must pass the Kerberos authentication to access it. Information on how to configure this can be found here.

Monitoring applications

You can search for applications on the Applications page:

Monitoring applications in YARN ResourceManager UI

In the preceding example, we submitted the TeraGen application with the ID application_1653043847641_0006. To view the details, refer to the Application List tab, where we can see all the attempts that were performed while executing this application:

Monitoring applications details in YARN ResourceManager UI

Detailed information about each attempt is available on the Attempt Info page, where you can explore how many containers were launched and on what nodes specifically:

Monitoring applications attempt in YARN ResourceManager UI

Monitoring nodes

The Nodes page on the YARN Web user interface allows you to view information about the cluster nodes on which the NodeManagers are running.

Monitoring nodes in YARN ResourceManager UI

YARN CLI

We used the YARN CLI during this tutorial to submit the various jobs, but it is also a useful tool when it comes to monitoring and managing jobs on the cluster without having to access a web UI (common for administrators).

Here are some useful commands that you can use:

  • To get the list of YARN commands
yarn -help
  • To get the logs of a specified application
yarn logs -applicationId <app-id>
  • To list the applications
yarn application -list # can be used with -appTypes, -appStates or -AppTags for filtering 
  • To kill an application
yarn application -kill <app-id>

For more information about the different commands and their specificities, check the official Apache Yarn Commands page

Cleaning clusters

After performing all these steps, don’t forget to delete the TeraSort input and output directories, so that the next benchmark can be run without storage problems.

hdfs dfs -rm -r -skipTrash tera*

What to learn next?

To get more comfortable with YARN, we recommend that you run and monitor the other MapReduce examples available. YARN’s multi-tenancy allows you to run other distributed processing engines, such as Spark.