Importing CSV files with columns containing spaces
Pinot can transform data at ingestion. In this recipe, we'll learn how to use a transformation to change the name of a column. We will ingest a CSV file with a column containing spaces in its name. We will then use a transformation function to remove the space while the data is being ingested into Pinot.
Pinot Version | 1.0.0 |
Code | startreedata/pinot-recipes/csv-files-spaces-column-names |
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In the terminal, navigate to this recipe's directory:
cd pinot-recipes/recipes/csv-files-spaces-column-names
Launch Pinot Cluster
Launch a Pinot Cluster:
docker-compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper.
You can find and examine the docker-compose.yml (opens in a new tab) file on GitHub.
Dataset
We're going to import the following CSV file, in which the Case Number
column heading contains a space:
ID | Case Number |
---|---|
10224738 | HY411648 |
10224739 | HY411615 |
11646166 | JC213529 |
10224740 | HY411595 |
data/import.csv
Pinot Schema and Table
Next we create a Pinot Schema and Table.
A common pattern when creating a schema is to create columns that map directly to the names of the fields in our data source. We can't do that in this case since column names can't contain spaces, so instead we'll use the following:
{
"schemaName": "crimes",
"dimensionFieldSpecs": [
{
"name": "ID",
"dataType": "INT"
},
{
"name": "CaseNumber",
"dataType": "STRING"
}
]
}
config/schema.json
We'll also have the following table config:
{
"tableName": "crimes",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": 1
},
"tenants": {
"broker":"DefaultTenant",
"server":"DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
},
"transformConfigs": [
{"columnName": "CaseNumber", "transformFunction": "\"Case Number\"" }
]
},
"metadata": {}
}
config/table.json
The entry under ingestionConfig.transformConfigs
makes sure that data in the Case Number
field in the data source is ingested into the CaseNumber
column of the table. To learn more about writing these functions, see the ingestion transformation (opens in a new tab) documentation.
Create the table and schema by running the following command:
docker run \
--network csv \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-csv" \
-exec
You should see a message similar to the following if everything is working correctly:
2021/11/25 12:02:04.606 INFO [AddTableCommand] [main] Executing command: AddTable -tableConfigFile /config/table.json -schemaFile /config/schema.json -controllerProtocol http -controllerHost 192.168.144.3 -controllerPort 9000 -user null -password [hidden] -exec
2021/11/25 12:02:05.084 INFO [AddTableCommand] [main] {"status":"Table crimes_OFFLINE succesfully added"}
Ingestion Job
Next, we import the CSV file into Pinot. We'll do this with the following ingestion spec:
executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/data'
includeFileNamePattern: 'glob:**/import.csv'
outputDirURI: '/opt/pinot/data/crimes/'
overwriteOutput: true
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'crimes'
pinotClusterSpecs:
- controllerURI: 'http://pinot-controller-csv:9000'
pushJobSpec:
pushAttempts: 2
pushRetryIntervalMillis: 1000
config/job-spec.yml
Run the following command to run the import:
docker run \
--network csv \
-v $PWD/config:/config \
-v $PWD/data:/data \
apachepinot/pinot:1.0.0 LaunchDataIngestionJob \
-jobSpecFile /config/job-spec.yml
Querying
Once that's completed, navigate to localhost:9000/#/query (opens in a new tab) and click on the crimes
table or copy/paste the following query:
select *
from crimes
limit 10
You will see the following output:
CaseNumber | ID |
---|---|
HY411648 | 10224738 |
HY411615 | 10224739 |
JC213529 | 11646166 |
HY411595 | 10224740 |
Query Results