Lookup-based Join Support
In this recipe we'll learn how to use Pinot's lookup-based join support to perform joins between fact and dimension tables.
Pinot Version | 0.9.0 |
Code | startreedata/pinot-recipes/lookup-joins |
What are lookup joins?
Pinot was designed with querying single tables in mind, but sometimes we'll have queries where we want to bring in data from other tables. Large-scale joins of Pinot tables can be done using the Presto Pinot connector (opens in a new tab).
In this guide we'll focus on joins based on lookup tables that are small in size. For example, to enrich the final result with the customer name, you can join an orders table with a customers table, which can be used as a lookup table.
Joining a fact table with a small lookup table is made possible by Pinot's Lookup UDF function (opens in a new tab). The Lookup UDF supports decoration join functionality by getting additional dimensional data via the primary key from a dimension table.
Use Case
To understand how lookup joins work, let’s take a simple example.
Assume we have a fact table of orders and a dimension table of customers with the following schemas.
Customers and Orders table schemas
The goal is to answer the following questions.
- Find all the orders placed by Gold customers.
- Find the total number of orders by customer’s tier.
- Find the top five countries by sales.
The orders table alone can’t answer the questions above. Hence, we need to do a lookup join with the customers table to enrich the result with additional dimensions such as tier and country.
That requires us to define customers as a dimension table and join with orders table using the customer_id field.
Solution Architecture
The solution is based on two Pinot tables.
-
Orders table – This table captures every fact about sales orders and qualifies as a fact table. We can implement this as a real-time table as new orders constantly arrive. Data ingestion takes place from a Kafka topic called the orders.
-
Customers table – This captures customer dimensions and has a primary key of customer_id. We can model this as a dimension table of OFFLINE type. Since customer information is not frequently updated as orders, a daily batch job can be used to populate this table.
Solution architecture
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/lookup-joins
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Zookeeper, and Kafka. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Create orders Kafka topic
Execute the following command from the project’s root directory to create a new topic called orders to represent the incoming stream of orders.
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create --bootstrap-server kafka:9092 --topic orders
Define orders table and ingest order records
Let’s first define a schema and table for the orders table.
You can find the schema definition in the orders_schema.json (opens in a new tab) file and table definition in the orders_table.json (opens in a new tab) file. They are both located in the /config (opens in a new tab) directory. In terms of configurations, they look similar to any other real-time table, so we won't go through them here.
Go ahead and execute the following command from the project’s root directory.
docker run \
--network lookup-join \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/orders_schema.json \
-tableConfigFile /config/orders_table.json \
-controllerHost "pinot-controller" \
-exec
The above command connects to the container that runs the Pinot controller and executes pinot-admin.sh
to create the schema and a table for orders.
Define the customers table and ingest customer records
Let’s create the customers dimension table next.
You can also find the schema and table definitions inside the /config (opens in a new tab) directory.
You’ll notice that the customer_id
field has been marked as a primary key in the customers_schema.json (opens in a new tab) file.
{
"schemaName": "customers",
"primaryKeyColumns": [
"customer_id"
],
"dimensionFieldSpecs": [
{
"dataType": "INT",
"name": "customer_id"
},
{
"dataType": "STRING",
"name": "name"
},
{
"dataType": "STRING",
"name": "tier"
},
{
"dataType": "STRING",
"name": "country"
},
{
"dataType": "STRING",
"name": "dob"
}
]
}
customers_schema.json
Also, you will notice the following unique configurations in the customers_table.json file.
{
"tableName": "customers",
"tableType": "OFFLINE",
"isDimTable": true,
"segmentsConfig": {
"segmentPushType": "REFRESH",
"replication": "1"
},
"tenants": {
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"metadata": {
"customConfigs": {
}
},
"quota": {
"storage": "200M"
}
}
customers_table.json
The isDimTable directive instructs Pinot to mark this table as a dimension table. Pinot will then replicate a copy of this table on all the servers.
Execute the following command to create the schema and table definition for customers.
docker run \
--network lookup-join \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/customers_schema.json \
-tableConfigFile /config/customers_table.json \
-controllerHost "pinot-controller" \
-exec
At this point, you should be able to see two tables, orders, and customers have been created inside Pinot Query Console.
Ingest sample records
Now that we have the tables created inside Pinot. Before writing any join queries, let’s ingest some sample records into both tables.
Produce sample orders to Kafka You can find two sample data files inside the /data (opens in a new tab) directory. The orders.json (opens in a new tab) file contains thousand of JSON formatted orders.
A sample would look like this:
{
"order_id":1,
"customer_id":40,
"order_status":"PROCESSING",
"amount":7075.2,
"ts":"1619116750"
}
Execute the following command to publish those events into the orders topic we created earlier.
```bash
cat data/orders.json |
kcat -P -b localhost:9092 -t orders
If you check the orders table after a few seconds, you will see it has been populated with data, as shown below:
Orders table
Ingest the customers CSV file
In the same /data directory, you can find the customers.csv file containing several customer records.
Execute the following command to ingest them into the customers table.
docker run \
--network lookup-join \
-v $PWD/config:/config \
-v $PWD/data:/data \
apachepinot/pinot:1.0.0 LaunchDataIngestionJob \
-jobSpecFile /config/customers_job-spec.yml
You can find the ingestion spec file inside the /config directory. The above command launches an ingestion job inside the Pinot controller. The /data directory has been mounted as a volume there to read the customers.csv file and write the generated segments back to the same folder.
Upon successful ingestion, you should see that the customers table will be populated with data.
Write the lookup join queries
Now that we have got both tables created and populated with data. What remains is to write SQL queries that answer the questions stated above.
The signature of the lookup UDF function looks similar to this:
lookUp('dimTableName', 'dimColToLookUp', 'dimJoinKey1', factJoinKeyVal1, 'dimJoinKey2', factJoinKeyVal2 ... ) Where:
- dimTableName - Name of the dimension table to perform the lookup on.
- dimColToLookUp - The column name of the dimension table to be retrieved to decorate our result.
- dimJoinKey - The column name on which we want to perform the lookup, i.e., the join column name for the dimension table.
- factJoinKeyVal - The value of the dimension table join column will retrieve the dimColToLookUp for the scope and invocation.
The return type of the UDF will be that of the dimColToLookUp column type. There can also be multiple primary keys and corresponding values.
Let’s walk through a couple of queries that answer the questions that we initially had.
Find all the orders placed by Gold customers
What are the orders placed by Gold customers?
To answer this problem, we need to write a SQL query that performs a lookup join between orders and customers table, based on customer_id. The joined result can be further filtered down on the ‘tier’ field.
SELECT
orders.order_id,
lookup('customers','name','customer_id',customer_id) as name,
lookup('customers','tier','customer_id',customer_id) as tier,
orders.amount
FROM orders
WHERE tier='Gold'
LIMIT 10
The query returns the following result:
Orders placed by Gold customers
Find the total number of orders by customer’s tier
Which customer tier has made the most sales?
The query looks similar to the above, but we need to aggregate the orders.amount by customers.tier.
SELECT
lookup('customers','tier','customer_id',customer_id) as tier,
SUM(orders.amount) as sales_total
FROM orders
GROUP BY tier
ORDER By sales_total DESC
The above returns the this:
Sales by tier
Find the top 5 countries by sales
Who are the top five countries that have contributed to the most sales?
The query will be the same as the above, but we will use the customers.country field to enrich the results further.
SELECT
lookup('customers','country','customer_id',customer_id) as country,
COUNT(*) as total_orders,
SUM(orders.amount) as sales_total
FROM orders
GROUP BY country
ORDER By sales_total DESC
LIMIT 5
Sales by country
Things to consider
When a table is marked as a dimension table, it will be replicated on all the hosts. Hence, it is necessary that the size of the dimension table has to be small. Pinot allows you to control the size of a dimension table to prevent oversized tables.
The maximum size quota for a dimension table in a cluster is controlled by controller.dimTable.maxSize controller property. Table creation will fail if the storage quota exceeds this maximum size.
Conclusion
The lookup UDF function in Pinot enables you to join a table with a small lookup table to enrich the final result of a query. These lookup tables must be modeled as dimension tables of type OFFLINE and possess a primary key.
Dimension tables are replicated across all servers of a Pinot cluster, taking advantage of data locality to perform fast and scalable lookups.
You can find more information about Lookup joins in the following video.