Authors: Stuart Eudaly, Josh Patterson, Austin Harris
Date: Nov 5th, 2019
In part 2 of our series, we looked at how Big Cloud Dealz would detect items in their Shopping Cart 2.0 using TensorFlow. In this post, we’ll look at setting up Confluent’s distribution of Kafka, creating a MySQL database for our in-store items, and pulling in that data to Kafka using Kafka Connect.
Now that we have a plan for how to generate the shopping cart data to be sent to Kafka, we need to install and configure Kafka using Confluent’s platform so that the data has somewhere to go. Once the Confluent files are downloaded, run the commands listed below in order to start and configure Kafka for this demo. We suggest executing each command in its own terminal window or tab.
# (1) Start Zookeeper
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
# (2) Start Kafka
$ ./bin/kafka-server-start ./etc/kafka/server.properties
# (3) Start the Schema Registry
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
# (4) Create topic for incoming objects
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic shopping_cart_objects
# (5) Create other topics that will be used later
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysql_tables_jdbc_inventory
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic inventory_rekeyed
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic windowed_items
The previous commands start all the Kafka services we’ll need and create the shopping_cart_objects
topic. At this point, our code should be ready to run. To get the code and run it locally, you’ll need both Git and Apache Maven installed. Download and prepare the code using the following commands:
git clone https://github.com/pattersonconsulting/BigCloudDealz_GreenLightSpecial.git
cd CartCamApp
mvn package
These commands pull in the files from the repository then build an uber jar in the ./target
subdirectory. You'll actually need to cd
into each of the three subdirectories (CartCamApp
, TestKafkaProducer
, and KafkaProcessingApp
) and run mvn package
to ensure all three pieces of code we'll use will work. We'll need one additional file to run ObjectDetectionProducer
- the pre-trained detection model. The model can be downloaded here, or you can browse models and learn more about them here. To run the producer from Maven, use the following command, making sure to include the location of the downloaded model and the address of the Schema Registry as arguments:
mvn exec:java -Dexec.mainClass="com.pattersonconsultingtn.kafka.examples.tf_object_detection.ObjectDetectionProducer" -Dexec.args="/Users/josh/Downloads/faster_rcnn_resnet101_coco_2018_01_28/saved_model/ http://localhost:8081"
Looking at the output from the above command, you’ll see something like this:
Here we see the TensorFlow code finding objects in the images in the local directory included in the project /resources
subdirectory. It will take these objects and individually send them to the Kafka cluster as messages to the shopping_cart_objects
topic. You can check that they made it to the topic by executing the command below:
./bin/kafka-avro-console-consumer --topic shopping_cart_objects --from-beginning --property print.key=true --bootstrap-server localhost:9092
You should see console output similar to this:
So, we now have Kafka running and our code producing messages to a topic. Each of these messages contains information about an item in a customer’s basket, which is a great start. However, neither the baskets nor Kafka have any notion of what item Big_Cloud_Dealz wants to pair with the items in customers’ baskets. Luckily, Kafka Connect makes it easy to integrate a large number of external systems with Kafka. If we have a Kafka topic that contains inventory information, it will eliminate the need to constantly perform SQL queries every time a new item is detected in a cart. This, in turn will prevent excessive table lookups and scalability issues. To demonstrate all of this, we’ll create a MySQL database, add a table for inventory and populate it, then use Kafka Connect to ingest that data into Kafka. The end goal here is to have all the information we need inside Kafka so that we can do real-time data enrichment using Kafka Streams, but we’ll wait until part 4 to do that. Start by installing MySQL and logging into it. Once you’re at the mysql>
prompt, use the following commands to build the inventory table and populated it with data:
mysql> CREATE DATABASE big_cloud_dealz;
mysql> USE big_cloud_dealz;
mysql> CREATE TABLE inventory (
id serial NOT NULL PRIMARY KEY,
name varchar(100),
upsell_item varchar(200),
count INT,
modified timestamp default CURRENT_TIMESTAMP NOT NULL,
INDEX `modified_index` (`modified`)
);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('cup', 'plate', 100);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('bowl', 'cup', 10);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('fork', 'spoon', 200);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('spoon', 'fork', 10);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('sportsball', 'soccer goal', 2);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('tennis racket', 'tennis ball', 10);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('frisbees', 'frisbee goal', 100);
The above commands do 4 things:
big_cloud_dealz
big_cloud_dealz
in the MySQL command line toolinventory
in the big_cloud_dealz
databaseinventory
table
Now let's move on to configuring Kafka Connect so we can ingest the inventory
table into a topic in the Kafka cluster.
Kafka Connect helps create reliable, high-performance ETL pipelines into Kafka. The Kafka Connect system uses a predefined connector to communicate with MySQL and ingest an Avro message for every record in the table. Given that our system is based on the Confluent platform, we already have Kafka Connect installed. Before we start Kafka Connect, we need to configure it to know where our database is located, what information to ingest, and how to connect to it. We can see the configuration file for the Kafka Connect system below.
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/home/pattersonconsulting/confluent-5.1.2/share/java
This configuration file tells Connect where the bootstrap server for the Kafka cluster is, to use Avro for the messages, and where the connector plugin jars are. Now, let’s take a look at how we configure the MySQL connector. We'll use the stock JDBC connector that ships with the Confluent platform. The stock JDBC connector allows us to connect to any relational database that supports JDBC as described below:
"The JDBC connector allows you to import data from any relational database with a JDBC driver (such as MySQL, Oracle, or SQL Server) into Kafka. By using JDBC, this connector can support a wide variety of databases without requiring custom code for each one."
Confluent Blog Post: How to Build a Scalable ETL Pipeline with Kafka Connect
Here, you can see the connector configuration file. The most complicated part is making sure the connection.url
string is right. Once the file is set up correctly, you’re ready to use Kafka Connect to ingest MySQL data from the inventory
table into the Kafka topic mysql_tables_jdbc_inventory
.
name=mysql-jdbc
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/big_cloud_dealz?user=root&password=1234&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=modified
topic.prefix=mysql_tables_jdbc_
table.whitelist=inventory
Now that everything is set up, let’s get Kafka Connect up and running. The command below runs Connect in standalone mode and points to the Connect properties and MySQL connector properties files.
./bin/connect-standalone /home/pattersonconsulting/BigCloudDealz_GreenLightSpecial/CartCamApp/src/main/resources/kafka/connect/connect-avro-standalone.properties /home/pattersonconsulting/BigCloudDealz_GreenLightSpecial/CartCamApp/src/main/resources/kafka/connect/mysql_ingest.properties
This command will output logs to the terminal similar to this:
If we launch another terminal window we can query our inventory data in the mysql_tables_jdbc_inventory
topic with the following command:
./bin/kafka-avro-console-consumer --topic mysql_tables_jdbc_inventory --property print.key=true --from-beginning --bootstrap-server localhost:9092
The output should look like what you see below. Kafka Connect successfully converted each row of the table into an individual message in the mysql_tables_jdbc_inventory
topic in Kafka. You’ll notice that the key for each message is null. This is something we’ll need to address in part 4 of our series.
inventory
table in MySQL using Kafka Connect, you'll need to delete the connect.offsets
file in the /tmp
directory. This file keeps track of what Kafka Connect has already sent to Kafka. Deleting it ensures that Kafka Connect will re-scan the entire table. Otherwise, it would only send any new data to Kafka.
I STARTED TO DOUBT MYSELF FOR A MOMENT AT LUNCH WHEN I COULDN'T DECIDE BETWEEN THE BURRITO OR THE TACO. AND THEN I REMEMBERED I PUT THE WOOOOOO BACK IN RETAIL. AND THAT MOMENT PASSED. GREATNESS NEVER IN DOUBT.
— BigCloudRon (@BigCloudRon1) August 8, 2018
At this point, we have our system detecting objects, sending them to a Kafka topic, and our inventory table from MySQL being ingested into its own Kafka topic. In our final post in this series (Part 4, coming soon), the Big Cloud Dealz team will join the cart items with the inventory and perform windowed aggregations in real-time to create the "Green Light Special" application.
For a further discussion on architecture ideas around Kafka and Confluent-based infrastructure, please reach out to the Patterson Consulting team. We'd love to talk with your team about topics such as: