Default value for Geospatial columns
In this recipe we'll learn how to set a default value for a Geospatial point column. To learn more about using Geospatial in Apache Pinot, see the Geospatial objects developer guide or Geospatial indexing developer guide.
Pinot Version | 1.0.0 |
Code | startreedata/pinot-recipes/geospatial-default |
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/geospatial-default
Launch Pinot Cluster
Spin up a Pinot Cluster by running the following command:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Generating Geospatial data
This recipe contains a data generator that produces JSON documents that contain Geospatial points but occassionally null values instead.
You'll need to first install the following dependencies:
pip install geofactory faker geojson shapely
Once that's done you can run the data generator and grab just the first generated document, by running the following command:
python datagen.py 2>/dev/null | head -n2 | jq
Output is shown below:
{
"tsString": 1679996635152,
"uuid": "23251ad1-2748-4203-a5ab-8113788d426b",
"count": 621,
"pointString": null
}
{
"tsString": 1679996637331,
"uuid": "ed51e74f-851d-4245-8003-efb453bea090",
"count": 378,
"pointString": "POINT (165.139969 -79.693387)"
}
You can see from this output that we have a null value in the first event and a geospatial point in the secone one.
Kafka ingestion
We're going to ingest this data into an Apache Kafka topic using the kcat (opens in a new tab) command line tool.
We'll also use jq
to structure the data in the key:payload
structure that Kafka expects:
python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
We can check that Kafka has some data by running the following command:
docker exec -it kafka-geospatial kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic events
We'll see something like the following:
events:0:138780
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "events",
"dimensionFieldSpecs": [
{"name": "uuid", "dataType": "STRING"},
{"name": "point", "dataType": "BYTES", "defaultNullValue": "003fe5f4a42008f90c4054e004d205fbe4"}
],
"metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
Note that the column for point
has a data type of BYTES
.
Geospatial columns must use the BYTES
type because Pinot will serialize the Geospatial objects into bytes for storage purposes.
We are also passing in a defaultNullValue
which must be a Hex encoded representation of a point.
In this case the point is a location in the Arctic.
You can get back a Hex encoded representation of a Geospatial object by running a query that returns the object. For example:
SELECT ST_GeomFromText('POINT (0.6861134172138761 83.5002942140996)')
FROM ignoreMe
003fe5f4a42008f90c4054e004d205fbe4 |
---|
003fe5f4a42008f90c4054e004d205fbe4 |
Query Results
Now for the table config:
{
"tableName": "events_geo",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events_geo",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-geospatial:9093",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows":"100000",
"realtime.segment.flush.threshold.time":"1h"
}
},
"ingestionConfig": {
"transformConfigs": [
{
"columnName": "point",
"transformFunction": "toSphericalGeography(ST_GeomFromText(pointString))"
}
]
},
"tenants": {},
"metadata": {}
}
When using a default value for a BYTES
column we'll need to create the schema and table separately, rather than using the AddTable
command.
If we try to use the AddTable
command, we'll end up with double decoding of the defaultNullValue
, resulting in Pinot trying to store an invalid value.
Instead, we'll create the schema with the AddSchema
command:
docker run \
--network geospatial \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddSchema \
-schemaFile /config/schema.json \
-controllerHost "pinot-controller-geospatial" \
-exec
And then we'll create the table via the REST API:
curl -X POST http://localhost:9000/tables --data @config/table.json
Querying for defaults
We can then the following query to check how many times the default value has been used:
SELECT STDistance(
ST_GeomFromText('POINT (0.6861134172138761 83.5002942140996)'),
toGeometry(point)
) AS distance,
COUNT(*)
FROM events
GROUP BY distance
ORDER BY COUNT(*) DESC, distance DESC
LIMIT 10
distance | count(*) |
---|---|
0 | 3147529 |
250.4421880885498 | 1 |
250.35032602023736 | 1 |
250.3445873086931 | 1 |
250.32023987252305 | 1 |
250.28500644222484 | 1 |
250.26623501229875 | 1 |
250.26271972684384 | 1 |
250.25918510024783 | 1 |
250.2511511361528 | 1 |
Query Results