Authors: Stuart Eudaly, Josh Patterson, Austin Harris
Date: January 7th, 2020
In the last article, we set up Confluent’s distribution of Kafka. We also used Kafka Connect to import the inventory table from a MySQL database and store it as a topic in the Kafka cluster. In this article, we'll actually join detected objects with inventory and perform real-time windowed aggregations to create the Green Light Special application.
Other parts of the series:
In order to do the joins and windowed aggregations needed to get the Green Light Special application up and running, we’re going to utilize Kafka Streams. Essentially, Kafka Streams takes data from a Kafka topic as input, performs some operation on it, then outputs that data to another topic. The Kafka Streams library makes it relatively easy to apply just about any logic to data in Kafka in real time. The join we’re doing between streaming data and an inventory table is commonly seen in real-time applications:
"I’ll argue that the fundamental problem of an asynchronous application is combining tables that represent the current state of the world with streams of events about what is happening right now."
Jay Kreps' Blog Article: "Introducing Kafka Streams: Stream Processing Made Simple"
Two of the core classes used consistently in the design of streaming applications are the KStream
and KTable
classes. Below we describe their function and how they fit in with the rest of the streaming pipeline.
"A KStream is an abstraction of a record stream, where each data record represents a self-contained datum in the unbounded data set. Using the table analogy, data records in a record stream are always interpreted as an “INSERT” – think: adding more entries to an append-only ledger – because no record replaces an existing row with the same key. Examples are a credit card transaction, a page view event, or a server log entry."
"A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten."
So, a KTable
gives us the latest view of a stream of events by key, while a KStream
shows every record as each is considered an independent piece of information. We’ll be using KStreams
and KTables
(among other things) extensively in our join and windowed aggregation code.
Kafka, as well as just about any other system, requires the keys (both the key type and actual value of the key) to match before a join will occur. If we take a look at the schemas for both the shopping_cart_object
and mysql_tables_jdbc_inventory
topics, we notice that the keys don’t match up. In fact, mysql_tables_jdbc_inventory
has null
keys! As it turns out, when you ingest data from a table in a database using the JDBC connector, it will have a null key by default. Here’s example output from each topic:
./bin/kafka-avro-console-consumer --topic shopping_cart_objects --from-beginning --property print.key=true --bootstrap-server localhost:9092
./bin/kafka-avro-console-consumer --topic mysql_tables_jdbc_inventory --property print.key=true --from-beginning --bootstrap-server localhost:9092
As you can see, shopping_cart_objects
already has the name of the item as the key. However, because mysql_tables_jdbc_inventory
has null
keys, we’ll need to rekey the data in that topic so that Kafka Streams knows which items to join. For the keys to match up, we’ll change mysql_tables_jdbc_inventory
from null
to the name of the item. We will also need to send the newly rekeyed data to a new topic in Kafka - inventory_rekeyed
. Below is the chunk of code where we accomplish this:
The code above first pulls in the inventory from the mysql_tables_jdbc_inventory
topic using a KStream
. Then, the .map
function is used to assign the new key and the data is sent to the inventory_rekeyed
topic. Now, we should be ready to do the join on the two topics and enrich our detected objects in real time!
Now that we have the keys matching between the two topics, it’s time to perform the join. This join will enrich the detected objects with data from inventory. There are three basic types of joins available in Kafka Streams:
Because the objects in carts are independently detected and output to Kafka, a KStream
fits best for shopping_cart_objects
. On the other hand, inventory from mysql_tables_jdbc_inventory
fits best in a KTable
, as each record represents a row of the original table from MySQL. That means we’ll be utilizing the KStream-KTable join. Here a few things to note about a KStream-KTable join:
KStream
against the KTable
.KStream
.Below is the code where the join occurs:
This code creates a KTable
from the inventory_rekeyed
topic, creates a KStream
from the shopping_cart_objects
topic, then performs the join. The join returns the name of the item in the cart, the item that should be paired with it (the upsell item), and the number of items. Because each cart item is a separate record in the KStream
, the count for each will always be 1. This wouldn’t matter, except that we’ll be using those counts in the windowed aggregations in the next section. Read on!
At this point, our code has joined the data from shopping_cart_objects
and mysql_tables_jdbc_inventory
using a KStream
and KTable
. However, the result of that operation is another KStream
. That means that the Big Cloud Dealz team would see a constant stream of individual items detected in carts (and enriched with data from inventory), rather than a sum of all items in all carts in the store. This can be fixed by converting the output KStream
to a KTable
, which can be used to show an ever-increasing aggregate of the items. That solves half the problem, but doesn’t ever dump items that are no longer in the carts. To accomplish this, we’ll utilize a Windowed KTable
so that we only see aggregates for a specified time window. As this is a proof of concept, we’ll assume that an item is only “valid” in a cart for one minute. In other words, we want to see the totals of all items in all carts every 60 seconds. Let’s take a look at how we do it:
The code above:
Windowed KTable
..groupByKey
to (you guessed it) group the items by key.KTable
by the window length we defined earlier..reduce
that simply sums the aggregate and the newly counted item..suppress
so that we get one output per time window instead of every time an object is detected. The .suppress
operator requires a defined “grace period” on the time window in order know when the window is closed (which is why we added .grace
to the time window earlier). For more information on the .suppress
operator, read this.System.out
using a KStream
(and some hacked-together string formatting to make it look nice).To see this code in action, first run StreamingJoin_CartCountsAndInventoryTopics.java, then start ObjectDetectionProducer.java in a separate terminal (or TestDetectionProducer.java).
# 1. Start join and aggregate code
mvn exec:java -Dexec.mainClass="com.pattersoncsultingtn.kafka.examples.tf_object_detection.StreamingJoin_CartCountsAndInventoryTopics"
# 2. Start producer code
mvn exec:java -Dexec.mainClass="com.pattersoncoultingtn.kafka.examples.test_producer.TestDetectionProducer"
Here is some sample output from our TestDetectionProducer.java that randomly outputs an item every 15 seconds:
As you can see, every minute we get output that includes the items in carts, how many there are across all carts, and the upsell items that should be paired with those items. Because our TestDetectionProducer.java code outputs an item every 15 seconds, we see a total of 4 items every 1-minute time window (with the exception of the first window, where the code was started halfway through the window). Because the windowing operation only includes items seen in that particular 1-minute time window, it also drops items from older time windows (rather than an ever-increasing aggregate). So, the end result is that we only see items from the defined time window each time a window closes. Sweet!
./bin/kafka-streams-application-reset --application-id pct-cv-streaming-join-counts-inventory-app-3 --input-topics mysql_tables_jdbc_inventory,shopping_cart_objects,inventory_rekeyed
In this post, we utilized Kafka Streams to join data from two Kafka topics and perform windowed aggregations on the results. This has been a fairly involved process to get the Big Cloud Dealz team the desired functionality in their Green Light Special application. While this application involves many pieces working together to function, Kafka and the Kafka Streams library enable the manipulation of real-time streaming data that would be rather difficult otherwise.
At this point we have put together the original concept laid out in the beginning of this series (Part 1), as reflected in the diagram above. The core components we built in this series are:
With this prototype running, the Big Cloud Dealz team is able to view their brick and mortar shopping floor much the same way as an online retailer views the aggregate activity of their online users. It sets the store operations team up to dynamically recommend upsell items to shoppers strategically at the right time based on what is happening inside the shopping carts. The Big Cloud Dealz team is able to roll out the proof of concept Green Light Special system to one of their locations to begin live testing. Big Cloud Ron is exceptionally pleased as well:
IF I COULD LOVE THE GREEN LIGHT SPECIAL ANY MORE IT WOULD (LIKELY) NOT BE LEGAL IN CERTAIN STATES
— BigCloudRon (@BigCloudRon1) January 7, 2020
For a further discussion on architecture ideas around Kafka and Confluent-based infrastructure, please reach out to the Patterson Consulting team or check out our Kafka Offerings page. We'd love to talk with your team about topics such as: