Editor’s note: Mark Needham is a speaker for ODSC Europe this June. Be sure to check out his talk, “Building a Real-time Analytics Application for a Pizza Delivery Service,” there!
Gartner defines Real-Time Analytics as follows:
Real-time analytics is the discipline that applies logic and mathematics to data to provide insights for making better decisions quickly.
The bit that I’ve highlighted in bold is the most important part of the definition in my opinion.
To understand what it means, we should start by thinking of the world in terms of events, where an event is a thing that happens. It could be someone ordering something, something being shipped, effectively any action that happened in the past.
And we are going to take those events, become aware of them, and understand them. We want to know what happened, gain some understanding of that event, and then take action based on that event right now!
The fact that we want to take action immediately is what distinguishes real-time analytics from more batch based/historical analytics.
Real-time analytics (RTA) has become so ubiquitous in recent years that you might not even know that you’ve used an app that implements it. One of the most popular uses of RTA is LinkedIn’s ‘Who viewed your profile’ feature, which shows you the number of people who have viewed your profile recently along with their names.
They also obfuscate some of the viewers to try to persuade you to sign up for LinkedIn Premium. But as well as that, the reason for showing users this data in real-time is to have people take the action to message each other. Maybe the person looking at your profile has a job or opportunity that might be interesting – you’ve just got to find out!
This and other RTA applications are built using some combination of tools that form the RTA stack.
The RTA stack
The RTA stack describes a series of tools and processes that you can use to derive insights from unbounded data and build applications like LinkedIn’s who viewed your profile. A diagram showing the stack is included below:
Let’s quickly go through each of the components:
- Event producers – Detect state changes in source systems and trigger events that can be consumed and processed by downstream applications.
- Streaming platform – Acts as the source of truth for event data and must therefore handle high volume and concurrency of data being produced and consumed. Stores events in a durable manner so that downstream components can process them.
- Stream processor – Reads events from the streaming data platform and then takes some action on that event.
- Serving layer – The primary access point to consume real-time analytics produced in the event streaming platform.
- Front end – The thing that end users interact with.
Building a Pizza Delivery Service
So that’s the theory, but how do we put it into practice? I think it’s always fun to build an end-to-end example, so we’re going to look at how an online pizza service might use Real-Time Analytics to manage its business.
We’re going to assume that the pizza service already captures orders in Apache Kafka and is also keeping a record of its customers and the products that they sell in MySQL.
We’ve been brought in as consultants to help them get a real-time view of what’s happening in the business. The first requirement is to create an operations dashboard that shows the number of orders, total revenue, and average order value in the last few minutes. We also need to indicate which way this is trending.
We can solve this problem using just the data in the orders topic. Let’s have a look at the contents of that topic using the kcat CLI tool:
kcat -C -b localhost:29092 -t orders -c1 | jq { "id": "80ad8992-dc34-463a-a5c9-e4cf8d79fa13", "createdAt": "2023-05-22T14:25:56.565681", "userId": 2783, "price": 1241, "items": [ { "productId": "56", "quantity": 3, "price": 155 }, { "productId": "36", "quantity": 3, "price": 60 }, { "productId": "50", "quantity": 4, "price": 149 } ], "deliveryLat": "12.95916643", "deliveryLon": "77.54959169" }
Each event contains an event ID, a user ID, the time the order was placed, and then an array of order items. This all looks like it’s working well, so let’s look at how to ingest those events into Apache Pinot.
Apache Pinot is a real-time OLAP database built at LinkedIn to deliver scalable real-time analytics with low latency. It can ingest from batch data sources (such as Hadoop HDFS, Amazon S3, and Google Cloud Storage) as well as stream data sources (such as Apache Kafka and Redpanda).
Pinot stores data in tables, each of which must first define a schema. The schema for our table is defined below:
{ "schemaName": "orders", "dimensionFieldSpecs": [ {"name": "id", "dataType": "STRING"}, {"name": "userId", "dataType": "INT"}, {"name": "deliveryLat", "dataType": "DOUBLE"}, {"name": "deliveryLon", "dataType": "DOUBLE"}, {"name": "items", "dataType": "JSON"} ], "metricFieldSpecs": [ {"name": "productsOrdered", "dataType": "INT"}, {"name": "totalQuantity", "dataType": "INT"}, {"name": "price", "dataType": "DOUBLE"} ], "dateTimeFieldSpecs": [ { "name": "ts", "dataType": "TIMESTAMP", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" } ] }
Pinot will automatically map fields from the data source to columns in the schema if they have the same name. This is the case for id, userId, deliveryLat, deliveryLon, items, and price. The other fields will be populated by a transformation function describe in the table config:
{ "tableName": "orders", "tableType": "REALTIME", "segmentsConfig": { "timeColumnName": "ts", "timeType": "MILLISECONDS", "retentionTimeUnit": "DAYS", "retentionTimeValue": "1", "schemaName": "orders", "replicasPerPartition": "1" }, "tenants": {}, "tableIndexConfig": { "loadMode": "MMAP", "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "lowLevel", "stream.kafka.topic.name": "orders", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.broker.list": "kafka:9092", "stream.kafka.consumer.prop.auto.offset.reset": "smallest" } }, "ingestionConfig": { "transformConfigs": [ { "columnName": "ts", "transformFunction": "FromDateTime(createdAt, 'yyyy-MM-dd''T''HH:mm:ss.SSSSSS')" }, { "columnName": "totalQuantity", "transformFunction": "JSONPATHLONG(items, '$.sum($.[*].quantity)')" }, { "columnName": "productsOrdered", "transformFunction": "JSONPATHARRAY(items, '$.length()')" } ] } }
Transformation functions are defined under ingestionConfig.transformConfigs. These functions populate a field based on values that exist elsewhere in the source data.
This table has a table type of REALTIME, which means it is going to ingest data from a streaming data platform. The config to connect to Kafka is defined under tableIndexConfig.streamConfigs.
We can then create the table and schema by running the following command:
docker run \ -v $PWD/pinot/config:/config \ --network pizza-shop \ apachepinot/pinot:0.12.0-arm64 \ AddTable \ -schemaFile /config/orders/schema.json \ -tableConfigFile /config/orders/table.json \ -controllerHost pinot-controller \ -exec
That will take a few seconds to run and once it’s done, we’ll navigate to http://localhost:9000, click on ‘Query Console’, and then click on the ‘orders’ table. We should see something like this:
Our orders are successfully making their way into Pinot. We could then write a query to see how many orders have been made in the last 1 minute and the 1 minute prior to that:
select count(*) FILTER(WHERE ts > ago('PT1M')) AS events1Min, count(*) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min, sum(price) FILTER(WHERE ts > ago('PT1M')) AS total1Min, sum(price) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS total1Min2Min from orders where ts > ago('PT2M');
It looks like we have slightly more events in the last 2 minute period, but there’s not much in it.
The Pinot UI is good for running ad hoc queries, but we’ll eventually want to code them into a front end application. We’ll be using Streamlit, an open-source Python library that enables developers to create interactive, user-friendly web applications for machine learning and data science with ease. It simplifies the process of crafting data visualizations and analytics tools without needing extensive knowledge of web development.
A visualization of our data is shown below:
The corresponding code is shown below:
import streamlit as st import pandas as pd from pinotdb import connect from datetime import datetime import os pinot_host=os.environ.get("PINOT_SERVER", "pinot-broker") pinot_port=os.environ.get("PINOT_PORT", 8099) conn = connect(pinot_host, pinot_port) st.set_page_config(layout="wide") st.title("All About That Dough Dashboard 🍕") now = datetime.now() dt_string = now.strftime("%d %B %Y %H:%M:%S") st.write(f"Last update: {dt_string}") curs = conn.cursor() query = """ select count(*) FILTER(WHERE ts > ago('PT1M')) AS events1Min, count(*) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min, sum(price) FILTER(WHERE ts > ago('PT1M')) AS total1Min, sum(price) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS total1Min2Min from orders where ts > ago('PT2M') limit 1 """ curs.execute(query) df = pd.DataFrame(curs, columns=[item[0] for item in curs.description]) st.subheader("Orders in the last minute") metric1, metric2, metric3 = st.columns(3) metric1.metric( label="# of Orders", value="{:,}".format(int(df['events1Min'].values[0])), delta="{:,}".format(int(df['events1Min'].values[0] - df['events1Min2Min'].values[0])) if df['events1Min2Min'].values[0] > 0 else None ) metric2.metric( label="Revenue in ₹", value="{:,.2f}".format(df['total1Min'].values[0]), delta="{:,.2f}".format(df['total1Min'].values[0] - df['total1Min2Min'].values[0]) if df['total1Min2Min'].values[0] > 0 else None ) average_order_value_1min = df['total1Min'].values[0] / int(df['events1Min'].values[0]) average_order_value_1min_2min = (df['total1Min2Min'].values[0] / int(df['events1Min2Min'].values[0]) if int(df['events1Min2Min'].values[0]) > 0 else 0) metric3.metric( label="Average order value in ₹", value="{:,.2f}".format(average_order_value_1min), delta="{:,.2f}".format(average_order_value_1min - average_order_value_1min_2min) if average_order_value_1min_2min > 0 else None )
Summary
This is only the first part of the application. We will do several more iterations of the application, using Debezium to bring data from MySQL into Kafka and RisngWave to join streams together. We’ll also update our Streamlit dashboard to view the top-selling products and categories.
If this has piqued your curiosity, I’d love to see you at the ODSC Europe conference where I’ll be hosting a workshop building the rest of this application!
About the author/ODSC Europe speaker:
Mark Needham is an Apache Pinot advocate and developer relations engineer at StarTree. As a developer relations engineer, Mark helps users learn how to use Apache Pinot to build their real-time user-facing analytics applications. He also does developer experience, simplifying the getting started experience by making product tweaks and improvements to the documentation. Mark writes about his experiences working with Pinot at markhneedham.com. He tweets at @markhneedham.