Tutorial: Kafka and Spark Structured Streaming

Spark Structured Streaming is a powerful stream processing engine built on Spark SQL, designed to handle scalable and fault-tolerant processing of streaming data. It enables you to express streaming computations in a similar way to batch computations on static data, enabling incremental and continuous execution of computations as new streaming data arrives.

Key features and concepts of Spark Structured Streaming include:

  1. Streaming Data Sources: Structured Streaming reads data from various streaming sources like Kafka, Flume, HDFS, S3, treating it as an unbounded table that continuously receives new data.
  2. Event-Time Processing: Structured Streaming supports event-time processing, allowing handling of late and out-of-order data, and performing window-based aggregations based on timestamps in the data.
  3. Time Windows and Watermarking: Structured Streaming defines time-based windows (fixed-size, sliding, session) and uses watermarking to handle late data by setting a threshold for considering and processing late-arriving data.
  4. Streaming Queries: Structured Streaming provides a high-level API for expressing streaming computations through SQL-like queries, DataFrame operations, and UDFs, supporting filtering, aggregation, joining, and windowing operations.
  5. Output Modes: Structured Streaming offers output modes (complete, append, update) for writing query results to an output sink, specifying how incremental updates are handled.
  6. Fault Tolerance and Exactly-Once Semantics: Structured Streaming ensures fault tolerance and exactly-once processing by leveraging Spark’s built-in mechanisms for lineage information, data checkpointing, and recovery, guaranteeing durable and consistent results.

In this tutorial, we will explore how to ingest data into Kafka using Python and apply transformations and aggregations using Spark Structured Streaming.

Requirements

To follow along with this tutorial, you will need a cluster based on TDP getting started, which provides an easily deployable TDP environment for testing. This deployment provides you with:

  • tdp_user, a user with the ability to kinit for authentication.
  • An edge node accessible by SSH

Note: When using another TDP deployment than tdp-getting-started, some commands require some customization with your environment.

Before beginning the tutorial, connect to the cluster and authenticate yourself with kinit using the following commands:

# Connect to edge-01.tdp 
vagrant ssh edge-01
# Switch user to tdp_user
sudo su tdp_user
# Authenticate the user with his Kerberos principal and password
kinit -kt ~/tdp_user.keytab tdp_user@REALM.TDP

Downloading the dataset

This tutorial is based on NYC Green Taxi Trip dataset, which must be accessible in HDFS at /user/tdp_user/data/nyc_green_taxi_trip/.

For ease of use, we provide a script and instructions to download the dataset on the dataset page.

Preparing the Environment

To set up the working environment, we need to install the following Python packages:

  • PyArrow: This library enables efficient data interchange and processing. It will be used to read Parquet files from HDFS.
  • Kafka-Python: A Python client library that simplifies interactions with Apache Kafka.
  • GSSAPI (Generic Security Service API): It provides a standard interface for working with security services. It is commonly used with Kerberos. This package requires the system package krb5-devel.

Run the following commands to prepare the environment:

# Create a directory named kcd nyc-taxi-stream and change to it
mkdir nyc-taxi-stream && cd nyc-taxi-stream
# Create a virtual environment named venv
python3 -m venv venv
# Activate the virtual environment
source venv/bin/activate
# Upgrade pip to the latest version
pip install --upgrade pip
# Install the required Python packages
pip install kafka-python pyarrow gssapi pandas

1. Creating a Kafka Topic

To begin, let’s create a Kafka topic named tdp-user-nyc-taxi with three partitions. Each partition will have three replicas. We will use the --command-config option to specify the client properties file for Kafka authentication.

Run the following command to create the topic:

# Create tdp-user-taxirides topic
/bin/kafka-topics.sh --create \
--topic tdp-user-nyc-taxi \
--replication-factor 3 \
--partitions 3 \
--command-config /etc/kafka/conf/client.properties

2. Ingesting Taxi Data into Kafka

In this step, we will simulate the data flow by using a data generator that reads Parquet files stored in HDFS and streams the data to a Kafka topic. In real-world scenarios, the data sources would typically be devices installed in taxi cabs.

To interact with files stored in HDFS, we’ll use PyArrow’s HadoopFileSystem class. We’ll create a Kafka producer object, configuring it with the necessary settings. Then, we’ll read each Parquet file, convert it to a DataFrame, and send each row as JSON to the Kafka topic using the producer. Please note that for the sake of simplicity in this tutorial, we will only stream one Parquet file.

Create a new file named stream_taxi_data_kafka_producer.py and paste the following code into it.

import os
import time
import pyarrow as pa
import pyarrow.parquet as pq
from kafka import KafkaProducer

# Set Hadoop and Arrow environment variables
os.environ['ARROW_LIBHDFS_DIR'] = '/opt/tdp/hadoop/lib/native/'
os.environ["CLASSPATH"] = os.popen("/usr/bin/hadoop classpath --glob").read().strip()

# Connect to HadoopFileSystem and get the path of the first Parquet file
hdfs = pa.HadoopFileSystem('default', port=9871, kerb_ticket='/tmp/krb5cc_1101')
file_path = hdfs.ls('/user/tdp_user/data/nyc_green_taxi_trip/')[0]

# Set Kafka brokers and topic
kafka_brokers = "worker-01.tdp:9092,worker-02.tdp:9092,worker-03.tdp:9092"
kafka_topic = "tdp-user-nyc-taxi"

# Create a Kafka producer
producer = KafkaProducer(
    bootstrap_servers=kafka_brokers,
    security_protocol='SASL_SSL',
    sasl_mechanism='GSSAPI'
)

# Open the Parquet file and read data
with hdfs.open(file_path, 'rb') as f:
    print('File path:', file_path)
    df = pq.read_pandas(f).to_pandas()

    # Send each row to the Kafka topic
    for _, row in df.iterrows():
        message = row.to_json().encode('utf-8')
        producer.send(kafka_topic, value=message)
        print(message)
        # Add a small delay between messages
        time.sleep(0.1)

# Close the Kafka producer
producer.close()

3. Spark Structured Streaming integration with Kafka

Once the data ingestion into Kafka is prepared, the next step is to leverage the power of Spark Structured Streaming for data transformation and aggregation. Our goal is to calculate an aggregated summary of all rides by zone every hour.

Create a new file named stream_taxi_data_spark.py and copy the following code snippets into it.

3.1. Creating a Spark Session

To interact with Spark’s functionality, we first need to create a Spark Session. This will serve as the entry point for our application.

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("NycTaxiStreaming").getOrCreate()
spark.sparkContext.setLogLevel('WARN')

3.2. Reading data from Kafka

Next, we’ll create a streaming DataFrame by reading data from the Kafka topic. We’ll specify the Kafka brokers, the topic, and the required security settings. To ensure that we read the most recent data available in the Kafka topic, we’ll set the startingOffsets option to “latest”. Finally, we’ll select only the “value” column, which contains the actual data of interest stored by Kafka.

# Read data from Kafka
kafka_brokers = "worker-01.tdp:9092,worker-02.tdp:9092,worker-03.tdp:9092"
kafka_topic = "tdp-user-nyc-taxi"
df = spark.readStream \
    .format("kafka") \
    .option("subscribe", kafka_topic) \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.kerberos.service.name", "kafka") \
    .option("startingOffsets", "latest") \
    .load() \
	.selectExpr("CAST(value AS STRING)")

3.3. Preparing the Schema and Parsing the Data

To process the data stored in the “value” column of Kafka messages, we need to define a schema that matches the desired structure. We’ll use the StructType and StructField classes from pyspark.sql.types to define the schema based on the JSON format of the Kafka messages.

Then, we’ll use the from_json function to parse the JSON data within the “value” column based on the defined schema. This will produce a DataFrame with individual columns corresponding to each field in the JSON data. We’ll also use the to_timestamp function to convert timestamp columns from milliseconds to seconds.

from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, IntegerType
from pyspark.sql.functions import from_json, to_timestamp, col

# Define the schema for the data
schema = StructType([
    StructField("VendorID", LongType()),
    StructField("lpep_pickup_datetime", StringType()),
    StructField("lpep_dropoff_datetime", StringType()),
    StructField("store_and_fwd_flag", StringType()),
    StructField("RatecodeID", LongType()),
    StructField("PULocationID", LongType()),
    StructField("DOLocationID", LongType()),
    StructField("passenger_count", LongType()),
    StructField("trip_distance", DoubleType()),
    StructField("fare_amount", DoubleType()),
    StructField("extra", DoubleType()),
    StructField("mta_tax", DoubleType()),
    StructField("tip_amount", DoubleType()),
    StructField("tolls_amount", DoubleType()),
    StructField("ehail_fee", IntegerType()),
    StructField("improvement_surcharge", DoubleType()),
    StructField("total_amount", DoubleType()),
    StructField("payment_type", LongType()),
    StructField("trip_type", DoubleType()),
    StructField("congestion_surcharge", IntegerType())
])

# Parse the JSON data and apply the schema
parsed_df = df.withColumn("data", from_json(df["value"], schema)) \
    .select("data.*")
# Convert the timestamp columns from milliseconds to seconds
parsed_df = parsed_df.withColumn("lpep_pickup_datetime", to_timestamp(col("lpep_pickup_datetime") / 1000)) \
                     .withColumn("lpep_dropoff_datetime", to_timestamp(col("lpep_dropoff_datetime") / 1000))

3.4 .Performing Stream-Static Joins

Structured Streaming allows us to join a streaming DataFrame with a static DataFrame. In our scenario, we want to join the streaming DataFrame (parsed_df) with a static DataFrame containing taxi location codes and names.

First, we’ll download a CSV file that contains the taxi zone lookup data. We’ll create a DataFrame named location_df from this CSV file. Then, we’ll perform the join operation between the streaming DataFrame and the static DataFrame using the PULocationID and LocationID columns.

To optimize the join process, we’ll use the broadcast function, which broadcasts the static DataFrame to all the worker nodes, reducing data shuffling and enhancing performance. This approach is particularly beneficial when working with smaller datasets.

from pyspark import SparkFiles
from pyspark.sql.functions import broadcast
# Download and ead the CSV file into a DataFrame
url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"
spark.sparkContext.addFile(url)
locatio_df = spark.read.option("header", True).csv("file://" + SparkFiles.get("taxi+_zone_lookup.csv"))
# Perform the join
joined_df = parsed_df.join(broadcast(locatio_df), parsed_df["PULocationID"] == locatio_df["LocationID"], "left").withColumnRenamed("Borough","PickUpBorough")

3.5 Aggregating and Summarizing Data with Windows

To calculate summary statistics for taxi rides by pickup borough on an hourly basis, we’ll use:

  • Windowing: We’ll perform aggregations within 1-hour windows, generating summarized statistics for each hour. This will allow us to calculate metrics like total trip count, average passenger count, and total distance.
  • Sliding: The aggregations will be continuously updated for each consecutive hour, such as 00:00 - 01:00, 01:00 - 02:00, and so on.
  • Watermarking: To handle late-arriving data, we’ll implement watermarking. We’ll set a watermark threshold of 10 minutes, which means that data arriving within this threshold will be considered for updating the corresponding window’s aggregates. Data arriving later than the threshold will be dropped to prevent unnecessary state accumulation.
from pyspark.sql.functions import count, sum, avg, round, col, window

windowDuration = "1 hour"
slideDuration = "1 hour"
watermarkThreshold = "10 minutes"

# Perform windowed aggregations on the joined DataFrame
summary_df = joined_df.withWatermark("lpep_dropoff_datetime", watermarkThreshold) \
    .groupBy(window("lpep_dropoff_datetime", windowDuration, slideDuration), "PickUpBorough") \
    .agg(
        count("*").alias("TotalTripCount"),
        sum(col("passenger_count").cast("int")).alias("TotalPassengerCount"),
        round(avg("passenger_count"), 2).alias("AvgPassengerCount"),
        round(sum("trip_distance"), 2).alias("TotalDistance"),
        round(avg("trip_distance"), 2).alias("AvgTipAmount"),
        round(sum("fare_amount"), 2).alias("TotalFareAmount"),
        round(avg("fare_amount"), 2).alias("AvgFareAmount"),
        round(sum("total_amount"), 2).alias("TotalTripAmount"),
        round(avg("total_amount"), 2).alias("AvgTripAmount")
    )

# Order the summary results by the window start time
summary_df = summary_df.orderBy("window.start")

3.6 Starting the Streaming Query and Outputting the Results

Now that we have prepared the aggregated summary data, it’s time to start the streaming query and output the results. we use the writeStream function on the summary_df DataFrame with the output mode set to “complete” to provide the complete result of each batch. We specify the output format as “console” and disable result truncation to display the results without any truncation. Finally, we start the query using the start() method and ensure it continues running by calling awaitTermination().

# Start the streaming query and output the results
query = summary_df.writeStream.outputMode("complete").format("console").option("truncate", False).start()

# Wait for the query to finish
query.awaitTermination()

4. Submitting the Application

To submit our application, you can run the following command:

/bin/spark3-submit \
--master local --driver-memory 2g \
--num-executors 2 --executor-memory 2g \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2 \
--driver-java-options "-Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf" \
stream_taxi_data_spark.py

Once the Spark application is running and you see an empty “Batch: 0” message with DataFrame headers in the console output, it’s time to execute the data ingestion using the stream_taxi_data_kafka_producer.py script. To do this, open a new terminal window and execute the following command:

# Change directory to nyc-taxi-stream
cd nyc-taxi-stream
# Activate the virtual environment
source ven/bin/activate
# Start data ingestion
python3 stream_taxi_data_kafka_producer.py

Note: To stop the data ingestion, press Ctrl + Z and run the following command: kill -9 $(pgrep -f stream_taxi_data_kafka_producer.py)

5. Clean up

To delete the tdp-user-nyc-taxi topic associated with this tutorial, run the following command:

/bin/kafka-topics.sh --delete \
--topic tdp-user-nyc-taxi \
--command-config /etc/kafka/conf/client.properties

Further reading

To learn about Spark architecture and its use cases, refer to Spark Basics.