Tutorial: Kafka and Spark Structured Streaming
Spark Structured Streaming is a powerful stream processing engine built on Spark SQL, designed to handle scalable and fault-tolerant processing of streaming data. It enables you to express streaming computations in a similar way to batch computations on static data, enabling incremental and continuous execution of computations as new streaming data arrives.
Key features and concepts of Spark Structured Streaming include:
- Streaming Data Sources: Structured Streaming reads data from various streaming sources like Kafka, Flume, HDFS, S3, treating it as an unbounded table that continuously receives new data.
- Event-Time Processing: Structured Streaming supports event-time processing, allowing handling of late and out-of-order data, and performing window-based aggregations based on timestamps in the data.
- Time Windows and Watermarking: Structured Streaming defines time-based windows (fixed-size, sliding, session) and uses watermarking to handle late data by setting a threshold for considering and processing late-arriving data.
- Streaming Queries: Structured Streaming provides a high-level API for expressing streaming computations through SQL-like queries, DataFrame operations, and UDFs, supporting filtering, aggregation, joining, and windowing operations.
- Output Modes: Structured Streaming offers output modes (complete, append, update) for writing query results to an output sink, specifying how incremental updates are handled.
- Fault Tolerance and Exactly-Once Semantics: Structured Streaming ensures fault tolerance and exactly-once processing by leveraging Spark’s built-in mechanisms for lineage information, data checkpointing, and recovery, guaranteeing durable and consistent results.
In this tutorial, we will explore how to ingest data into Kafka using Python and apply transformations and aggregations using Spark Structured Streaming.
Requirements
To follow along with this tutorial, you will need a cluster based on TDP getting started, which provides an easily deployable TDP environment for testing. This deployment provides you with:
tdp_user
, a user with the ability tokinit
for authentication.- An edge node accessible by SSH
Note: When using another TDP deployment than tdp-getting-started, some commands require some customization with your environment.
Before beginning the tutorial, connect to the cluster and authenticate yourself with kinit
using the following commands:
# Connect to edge-01.tdp
vagrant ssh edge-01
# Switch user to tdp_user
sudo su tdp_user
# Authenticate the user with his Kerberos principal and password
kinit -kt ~/tdp_user.keytab tdp_user@REALM.TDP
Downloading the dataset
This tutorial is based on NYC Green Taxi Trip dataset, which must be accessible in HDFS at /user/tdp_user/data/nyc_green_taxi_trip/
.
For ease of use, we provide a script and instructions to download the dataset on the dataset page.
Preparing the Environment
To set up the working environment, we need to install the following Python packages:
- PyArrow: This library enables efficient data interchange and processing. It will be used to read Parquet files from HDFS.
- Kafka-Python: A Python client library that simplifies interactions with Apache Kafka.
- GSSAPI (Generic Security Service API): It provides a standard interface for working with security services. It is commonly used with Kerberos. This package requires the system package
krb5-devel
.
Run the following commands to prepare the environment:
# Create a directory named kcd nyc-taxi-stream and change to it
mkdir nyc-taxi-stream && cd nyc-taxi-stream
# Create a virtual environment named venv
python3 -m venv venv
# Activate the virtual environment
source venv/bin/activate
# Upgrade pip to the latest version
pip install --upgrade pip
# Install the required Python packages
pip install kafka-python pyarrow gssapi pandas
1. Creating a Kafka Topic
To begin, let’s create a Kafka topic named tdp-user-nyc-taxi
with three partitions. Each partition will have three replicas. We will use the --command-config
option to specify the client properties file for Kafka authentication.
Run the following command to create the topic:
# Create tdp-user-taxirides topic
/bin/kafka-topics.sh --create \
--topic tdp-user-nyc-taxi \
--replication-factor 3 \
--partitions 3 \
--command-config /etc/kafka/conf/client.properties
2. Ingesting Taxi Data into Kafka
In this step, we will simulate the data flow by using a data generator that reads Parquet files stored in HDFS and streams the data to a Kafka topic. In real-world scenarios, the data sources would typically be devices installed in taxi cabs.
To interact with files stored in HDFS, we’ll use PyArrow’s HadoopFileSystem
class. We’ll create a Kafka producer object, configuring it with the necessary settings. Then, we’ll read each Parquet file, convert it to a DataFrame, and send each row as JSON to the Kafka topic using the producer. Please note that for the sake of simplicity in this tutorial, we will only stream one Parquet file.
Create a new file named stream_taxi_data_kafka_producer.py
and paste the following code into it.
import os
import time
import pyarrow as pa
import pyarrow.parquet as pq
from kafka import KafkaProducer
# Set Hadoop and Arrow environment variables
os.environ['ARROW_LIBHDFS_DIR'] = '/opt/tdp/hadoop/lib/native/'
os.environ["CLASSPATH"] = os.popen("/usr/bin/hadoop classpath --glob").read().strip()
# Connect to HadoopFileSystem and get the path of the first Parquet file
hdfs = pa.HadoopFileSystem('default', port=9871, kerb_ticket='/tmp/krb5cc_1101')
file_path = hdfs.ls('/user/tdp_user/data/nyc_green_taxi_trip/')[0]
# Set Kafka brokers and topic
kafka_brokers = "worker-01.tdp:9092,worker-02.tdp:9092,worker-03.tdp:9092"
kafka_topic = "tdp-user-nyc-taxi"
# Create a Kafka producer
producer = KafkaProducer(
bootstrap_servers=kafka_brokers,
security_protocol='SASL_SSL',
sasl_mechanism='GSSAPI'
)
# Open the Parquet file and read data
with hdfs.open(file_path, 'rb') as f:
print('File path:', file_path)
df = pq.read_pandas(f).to_pandas()
# Send each row to the Kafka topic
for _, row in df.iterrows():
message = row.to_json().encode('utf-8')
producer.send(kafka_topic, value=message)
print(message)
# Add a small delay between messages
time.sleep(0.1)
# Close the Kafka producer
producer.close()
3. Spark Structured Streaming integration with Kafka
Once the data ingestion into Kafka is prepared, the next step is to leverage the power of Spark Structured Streaming for data transformation and aggregation. Our goal is to calculate an aggregated summary of all rides by zone every hour.
Create a new file named stream_taxi_data_spark.py
and copy the following code snippets into it.
3.1. Creating a Spark Session
To interact with Spark’s functionality, we first need to create a Spark Session. This will serve as the entry point for our application.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("NycTaxiStreaming").getOrCreate()
spark.sparkContext.setLogLevel('WARN')
3.2. Reading data from Kafka
Next, we’ll create a streaming DataFrame by reading data from the Kafka topic. We’ll specify the Kafka brokers, the topic, and the required security settings. To ensure that we read the most recent data available in the Kafka topic, we’ll set the startingOffsets
option to “latest”. Finally, we’ll select only the “value” column, which contains the actual data of interest stored by Kafka.
# Read data from Kafka
kafka_brokers = "worker-01.tdp:9092,worker-02.tdp:9092,worker-03.tdp:9092"
kafka_topic = "tdp-user-nyc-taxi"
df = spark.readStream \
.format("kafka") \
.option("subscribe", kafka_topic) \
.option("kafka.bootstrap.servers", kafka_brokers) \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.kerberos.service.name", "kafka") \
.option("startingOffsets", "latest") \
.load() \
.selectExpr("CAST(value AS STRING)")
3.3. Preparing the Schema and Parsing the Data
To process the data stored in the “value” column of Kafka messages, we need to define a schema that matches the desired structure. We’ll use the StructType
and StructField
classes from pyspark.sql.types
to define the schema based on the JSON format of the Kafka messages.
Then, we’ll use the from_json
function to parse the JSON data within the “value” column based on the defined schema. This will produce a DataFrame with individual columns corresponding to each field in the JSON data. We’ll also use the to_timestamp
function to convert timestamp columns from milliseconds to seconds.
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, IntegerType
from pyspark.sql.functions import from_json, to_timestamp, col
# Define the schema for the data
schema = StructType([
StructField("VendorID", LongType()),
StructField("lpep_pickup_datetime", StringType()),
StructField("lpep_dropoff_datetime", StringType()),
StructField("store_and_fwd_flag", StringType()),
StructField("RatecodeID", LongType()),
StructField("PULocationID", LongType()),
StructField("DOLocationID", LongType()),
StructField("passenger_count", LongType()),
StructField("trip_distance", DoubleType()),
StructField("fare_amount", DoubleType()),
StructField("extra", DoubleType()),
StructField("mta_tax", DoubleType()),
StructField("tip_amount", DoubleType()),
StructField("tolls_amount", DoubleType()),
StructField("ehail_fee", IntegerType()),
StructField("improvement_surcharge", DoubleType()),
StructField("total_amount", DoubleType()),
StructField("payment_type", LongType()),
StructField("trip_type", DoubleType()),
StructField("congestion_surcharge", IntegerType())
])
# Parse the JSON data and apply the schema
parsed_df = df.withColumn("data", from_json(df["value"], schema)) \
.select("data.*")
# Convert the timestamp columns from milliseconds to seconds
parsed_df = parsed_df.withColumn("lpep_pickup_datetime", to_timestamp(col("lpep_pickup_datetime") / 1000)) \
.withColumn("lpep_dropoff_datetime", to_timestamp(col("lpep_dropoff_datetime") / 1000))
3.4 .Performing Stream-Static Joins
Structured Streaming allows us to join a streaming DataFrame with a static DataFrame. In our scenario, we want to join the streaming DataFrame (parsed_df
) with a static DataFrame containing taxi location codes and names.
First, we’ll download a CSV file that contains the taxi zone lookup data. We’ll create a DataFrame named location_df
from this CSV file. Then, we’ll perform the join operation between the streaming DataFrame and the static DataFrame using the PULocationID
and LocationID
columns.
To optimize the join process, we’ll use the broadcast
function, which broadcasts the static DataFrame to all the worker nodes, reducing data shuffling and enhancing performance. This approach is particularly beneficial when working with smaller datasets.
from pyspark import SparkFiles
from pyspark.sql.functions import broadcast
# Download and ead the CSV file into a DataFrame
url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"
spark.sparkContext.addFile(url)
locatio_df = spark.read.option("header", True).csv("file://" + SparkFiles.get("taxi+_zone_lookup.csv"))
# Perform the join
joined_df = parsed_df.join(broadcast(locatio_df), parsed_df["PULocationID"] == locatio_df["LocationID"], "left").withColumnRenamed("Borough","PickUpBorough")
3.5 Aggregating and Summarizing Data with Windows
To calculate summary statistics for taxi rides by pickup borough on an hourly basis, we’ll use:
- Windowing: We’ll perform aggregations within 1-hour windows, generating summarized statistics for each hour. This will allow us to calculate metrics like total trip count, average passenger count, and total distance.
- Sliding: The aggregations will be continuously updated for each consecutive hour, such as 00:00 - 01:00, 01:00 - 02:00, and so on.
- Watermarking: To handle late-arriving data, we’ll implement watermarking. We’ll set a watermark threshold of 10 minutes, which means that data arriving within this threshold will be considered for updating the corresponding window’s aggregates. Data arriving later than the threshold will be dropped to prevent unnecessary state accumulation.
from pyspark.sql.functions import count, sum, avg, round, col, window
windowDuration = "1 hour"
slideDuration = "1 hour"
watermarkThreshold = "10 minutes"
# Perform windowed aggregations on the joined DataFrame
summary_df = joined_df.withWatermark("lpep_dropoff_datetime", watermarkThreshold) \
.groupBy(window("lpep_dropoff_datetime", windowDuration, slideDuration), "PickUpBorough") \
.agg(
count("*").alias("TotalTripCount"),
sum(col("passenger_count").cast("int")).alias("TotalPassengerCount"),
round(avg("passenger_count"), 2).alias("AvgPassengerCount"),
round(sum("trip_distance"), 2).alias("TotalDistance"),
round(avg("trip_distance"), 2).alias("AvgTipAmount"),
round(sum("fare_amount"), 2).alias("TotalFareAmount"),
round(avg("fare_amount"), 2).alias("AvgFareAmount"),
round(sum("total_amount"), 2).alias("TotalTripAmount"),
round(avg("total_amount"), 2).alias("AvgTripAmount")
)
# Order the summary results by the window start time
summary_df = summary_df.orderBy("window.start")
3.6 Starting the Streaming Query and Outputting the Results
Now that we have prepared the aggregated summary data, it’s time to start the streaming query and output the results. we use the writeStream
function on the summary_df
DataFrame with the output mode set to “complete” to provide the complete result of each batch. We specify the output format as “console” and disable result truncation to display the results without any truncation. Finally, we start the query using the start()
method and ensure it continues running by calling awaitTermination()
.
# Start the streaming query and output the results
query = summary_df.writeStream.outputMode("complete").format("console").option("truncate", False).start()
# Wait for the query to finish
query.awaitTermination()
4. Submitting the Application
To submit our application, you can run the following command:
/bin/spark3-submit \
--master local --driver-memory 2g \
--num-executors 2 --executor-memory 2g \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2 \
--driver-java-options "-Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf" \
stream_taxi_data_spark.py
Once the Spark application is running and you see an empty “Batch: 0” message with DataFrame headers in the console output, it’s time to execute the data ingestion using the stream_taxi_data_kafka_producer.py
script. To do this, open a new terminal window and execute the following command:
# Change directory to nyc-taxi-stream
cd nyc-taxi-stream
# Activate the virtual environment
source ven/bin/activate
# Start data ingestion
python3 stream_taxi_data_kafka_producer.py
Note: To stop the data ingestion, press Ctrl + Z
and run the following command: kill -9 $(pgrep -f stream_taxi_data_kafka_producer.py)
5. Clean up
To delete the tdp-user-nyc-taxi
topic associated with this tutorial, run the following command:
/bin/kafka-topics.sh --delete \
--topic tdp-user-nyc-taxi \
--command-config /etc/kafka/conf/client.properties
Further reading
To learn about Spark architecture and its use cases, refer to Spark Basics.