Skip to main content

Converting DateTime strings to Timestamps

In this recipe we'll learn how to import DateTime strings into Pinot.

Pre-requisites

You will need to install Docker locally to follow the code examples in this guide.

Download Recipe

First, clone the GitHub repository to your local machine and navigate to this recipe:

git clone git@github.com:startreedata/pinot-recipes.git
cd pinot-recipes/recipes/datetime-string-to-timestamp

If you don't have a Git client, you can also download a zip file that contains the code and then navigate to the recipe.

Launch Pinot Cluster and Kafka

You can spin up a Pinot Cluster and Kafka Broker by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml file on GitHub.

DateTime Strings in Pinot

Pinot automatically creates Timestamps from DateTime strings that are in the following format:

  • yyyy-mm-dd hh:mm:ss[.fffffffff]
  • Millis since epoch

If we have a DateTime value in another format we'll need to convert that value using a transformation function.

We're going to import the following message into Kafka:

{
"timestamp1": "2019-10-09 22:25:25",
"timestamp2": "1570656325000",
"timestamp3": "10/09/2019T22:25:25"
}

Add Schema and Table

Now let's create a Pinot Schema and Table.

config/schema.json
{
"schemaName": "dates",
"dimensionFieldSpecs": [
{"name": "Date1", "dataType": "TIMESTAMP"},
{"name": "Date2", "dataType": "TIMESTAMP"},
{"name": "Date3", "dataType": "TIMESTAMP"}
],
"dateTimeFieldSpecs": [
{
"name": "Date4",
"dataType": "TIMESTAMP",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
config/table.json
{
"tableName":"dates",
"tableType":"REALTIME",
"segmentsConfig":{
"timeColumnName":"Date4",
"schemaName":"dates",
"replication":"1",
"replicasPerPartition":"1"
},
"ingestionConfig":{
"batchIngestionConfig":{
"segmentIngestionType":"APPEND",
"segmentIngestionFrequency":"DAILY"
},
"transformConfigs":[
{"columnName":"Date1", "transformFunction":"timestamp1"},
{"columnName":"Date2", "transformFunction":"timestamp2"},
{"columnName":"Date3", "transformFunction": "FromDateTime(timestamp3, 'MM/dd/yyyy''T''HH:mm:ss')"},
{"columnName":"Date4", "transformFunction":"timestamp1"}
]
},
"tableIndexConfig":{
"loadMode":"MMAP",
"streamConfigs":{
"streamType":"kafka",
"stream.kafka.topic.name":"dates",
"stream.kafka.broker.list":"kafka-datetime:9093",
"stream.kafka.consumer.type":"lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset":"smallest",
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
}
},
"tenants":{},
"metadata":{}
}

This table has multiple transform configs defined that:

  • Map timestamp1 from the data source into the Date1 column
  • Map timestamp2 from the data source into the Date2 column
  • Map timestamp3 from the data source into the Date3 column, with help from the FromDateTime function.
  • Map timestamp4 from the data source into the Date1 column

You can create the schema and table by running the following command:

docker exec -it pinot-controller-datetime bin/pinot-admin.sh AddTable   \
-tableConfigFile /config/table.json \
-schemaFile /config/schema.json -exec

Ingesting Data into Kafka

We can ingest a message into Kafka by running the following command:

printf '{"timestamp1": "2019-10-09 22:25:25", "timestamp2": "1570656325000", "timestamp3": "10/09/2019T22:25:25"}\n' |
docker exec -i kafka-datetime /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic dates

Querying

Once that's completed, navigate to localhost:9000/#/query and click on the dates table or copy/paste the following query:

select * 
from dates
limit 10

You will see the following output:

Date1Date2Date3Date4
2019-10-09 22:25:25.02019-10-09 21:25:25.02019-10-09 22:25:25.02019-10-09 22:25:25.0

Query Results