Default value for Geospatial columns

Default value for Geospatial columns

In this recipe we'll learn how to set a default value for a Geospatial point column. To learn more about using Geospatial in Apache Pinot, see the Geospatial objects developer guide or Geospatial indexing developer guide.

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/geospatial-default

Launch Pinot Cluster

Spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Generating Geospatial data

This recipe contains a data generator that produces JSON documents that contain Geospatial points but occassionally null values instead.

You'll need to first install the following dependencies:

pip install geofactory faker geojson shapely

Once that's done you can run the data generator and grab just the first generated document, by running the following command:

python datagen.py 2>/dev/null | head -n2 | jq

Output is shown below:

{
  "tsString": 1679996635152,
  "uuid": "23251ad1-2748-4203-a5ab-8113788d426b",
  "count": 621,
  "pointString": null
}
{
  "tsString": 1679996637331,
  "uuid": "ed51e74f-851d-4245-8003-efb453bea090",
  "count": 378,
  "pointString": "POINT (165.139969 -79.693387)"
}

You can see from this output that we have a null value in the first event and a geospatial point in the secone one.

Kafka ingestion

We're going to ingest this data into an Apache Kafka topic using the kcat (opens in a new tab) command line tool. We'll also use jq to structure the data in the key:payload structure that Kafka expects:

python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø

We can check that Kafka has some data by running the following command:

docker exec -it kafka-geospatial kafka-run-class.sh \
  kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic events

We'll see something like the following:

events:0:138780

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
    "schemaName": "events",
    "dimensionFieldSpecs": [
      {"name": "uuid", "dataType": "STRING"},
      {"name": "point", "dataType": "BYTES", "defaultNullValue": "003fe5f4a42008f90c4054e004d205fbe4"}
    ],
    "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
    "dateTimeFieldSpecs": [
      {
        "name": "ts",
        "dataType": "TIMESTAMP",
        "format": "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }
    ]
  }

Note that the column for point has a data type of BYTES. Geospatial columns must use the BYTES type because Pinot will serialize the Geospatial objects into bytes for storage purposes.

We are also passing in a defaultNullValue which must be a Hex encoded representation of a point. In this case the point is a location in the Arctic.

💡

You can get back a Hex encoded representation of a Geospatial object by running a query that returns the object. For example:

SELECT ST_GeomFromText('POINT (0.6861134172138761 83.5002942140996)')
FROM ignoreMe
003fe5f4a42008f90c4054e004d205fbe4
003fe5f4a42008f90c4054e004d205fbe4

Query Results

Now for the table config:

{
    "tableName": "events_geo",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "schemaName": "events_geo",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "events",
        "stream.kafka.broker.list": "kafka-geospatial:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
        "realtime.segment.flush.threshold.rows":"100000",
        "realtime.segment.flush.threshold.time":"1h"
      }
    },
    "ingestionConfig": {
      "transformConfigs": [
        {
          "columnName": "point",
          "transformFunction": "toSphericalGeography(ST_GeomFromText(pointString))"
        }
      ]
    },
    "tenants": {},
    "metadata": {}
}

When using a default value for a BYTES column we'll need to create the schema and table separately, rather than using the AddTable command. If we try to use the AddTable command, we'll end up with double decoding of the defaultNullValue, resulting in Pinot trying to store an invalid value.

Instead, we'll create the schema with the AddSchema command:

docker run \
   --network geospatial \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddSchema \
     -schemaFile /config/schema.json \
     -controllerHost "pinot-controller-geospatial" \
    -exec

And then we'll create the table via the REST API:

curl -X POST http://localhost:9000/tables --data @config/table.json

Querying for defaults

We can then the following query to check how many times the default value has been used:

SELECT STDistance(
           ST_GeomFromText('POINT (0.6861134172138761 83.5002942140996)'),
           toGeometry(point)
       ) AS distance,
       COUNT(*)
FROM events
GROUP BY distance
ORDER BY COUNT(*) DESC, distance DESC
LIMIT 10
 
distancecount(*)
03147529
250.44218808854981
250.350326020237361
250.34458730869311
250.320239872523051
250.285006442224841
250.266235012298751
250.262719726843841
250.259185100247831
250.25115113615281

Query Results