NYPD Complaints
This dataset contains complaints including valid felony, misdemeanor, and violation crimes reported to the New York City Police Department. You can learn more about the dataset at NYPD Complaint Data Current (Year To Date) (opens in a new tab).
Prerequisites
To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.
Navigate to recipe
- If you haven't already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/nypd-dataset
Download NYPD data
You can export a CSV file containing the data from https://data.cityofnewyork.us/Public-Safety/NYPD-Complaint-Data-Current-Year-To-Date-/5uac-w243 (opens in a new tab).
Once you've got the CSV file, put it in the data
directory.
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
docker compose up
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.
Pinot Schema and Table
Now let's create a Pinot Schema and Table.
First, the schema:
{
"schemaName": "nypdComplaintData",
"dimensionFieldSpecs": [
{"name": "complntNum", "dataType": "STRING"},
{"name": "addrPctCd", "dataType": "INT"},
{"name": "boroNm", "dataType": "STRING"},
{"name": "crmAtptCptdCd", "dataType": "STRING"},
{"name": "hadevelopt", "dataType": "STRING"},
{"name": "housingPsa", "dataType": "STRING"},
{"name": "jurisdictionCode", "dataType": "INT"},
{"name": "jurisDesc", "dataType": "STRING"},
{"name": "kyCd", "dataType": "INT"},
{"name": "lawCatCd", "dataType": "STRING"},
{"name": "locOfOccurDesc", "dataType": "STRING"},
{"name": "ofnsDesc", "dataType": "STRING"},
{"name": "parksNm", "dataType": "STRING"},
{"name": "patrolBoro", "dataType": "STRING"},
{"name": "pdCd", "dataType": "INT"},
{"name": "pdDesc", "dataType": "STRING"},
{"name": "premTypDesc", "dataType": "STRING"},
{"name": "stationName", "dataType": "STRING"},
{"name": "suspAgeGroup", "dataType": "STRING"},
{"name": "suspRace", "dataType": "STRING"},
{"name": "suspSex", "dataType": "STRING"},
{"name": "transitDistrict", "dataType": "INT"},
{"name": "vicAgeGroup", "dataType": "STRING"},
{"name": "vicRace", "dataType": "STRING"},
{"name": "vicSex", "dataType": "STRING"},
{"name": "latLon", "dataType": "STRING"},
{"name": "newGeoreferencedColumn", "dataType": "STRING"},
{"name": "latitude", "dataType": "FLOAT"},
{"name": "longitude", "dataType": "FLOAT"},
{"name": "xCoordCd", "dataType": "FLOAT"},
{"name": "yCoordCd", "dataType": "FLOAT"}
],
"metricFieldSpecs": [
],
"dateTimeFieldSpecs": [
{"name": "complntFrDtTm", "dataType": "TIMESTAMP", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
{"name": "complntToDtTm", "dataType": "TIMESTAMP", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
{"name": "rptDt", "dataType": "TIMESTAMP", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }
]
}
config/schema.json
We'll also have the following table config:
{
"tableName": "nypdComplaintData",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": 1,
"schemaName": "nypdComplaintData",
"timeColumnName": "complntFrDtTm"
},
"tenants": {},
"tableIndexConfig": {},
"ingestionConfig": {
"continueOnError": "true",
"rowTimeValueCheck": "true",
"transformConfigs": [
{"columnName": "complntFrDtTm", "transformFunction": "FromDateTime(concat(CMPLNT_FR_DT, replace(CMPLNT_FR_TM, '(null)', '00:00:00'), ' '), 'MM/dd/yyyy HH:mm:ss')" },
{"columnName": "complntToDtTm", "transformFunction": "FromDateTime(concat(CMPLNT_TO_DT, replace(CMPLNT_TO_TM, '(null)', '00:00:00'), ' '), 'MM/dd/yyyy HH:mm:ss')" },
{"columnName": "rptDt", "transformFunction": "FromDateTime(RPT_DT, 'MM/dd/yyyy')" },
{"columnName": "addrPctCd", "transformFunction": "ADDR_PCT_CD" },
{"columnName": "complntNum", "transformFunction": "CMPLNT_NUM"},
{"columnName": "boroNm", "transformFunction": "BORO_NM"},
{"columnName": "crmAtptCptdCd", "transformFunction": "CRM_ATPT_CPTD_CD"},
{"columnName": "hadevelopt", "transformFunction": "HADEVELOPT"},
{"columnName": "housingPsa", "transformFunction": "HOUSING_PSA"},
{"columnName": "jurisdictionCode", "transformFunction": "JURISDICTION_CODE"},
{"columnName": "jurisDesc", "transformFunction": "JURIS_DESC"},
{"columnName": "kyCd", "transformFunction": "KY_CD"},
{"columnName": "lawCatCd", "transformFunction": "LAW_CAT_CD"},
{"columnName": "locOfOccurDesc", "transformFunction": "LOC_OF_OCCUR_DESC"},
{"columnName": "ofnsDesc", "transformFunction": "OFNS_DESC"},
{"columnName": "parksNm", "transformFunction": "PARKS_NM"},
{"columnName": "patrolBoro", "transformFunction": "PATROL_BORO"},
{"columnName": "pdCd", "transformFunction": "PD_CD"},
{"columnName": "pdDesc", "transformFunction": "PD_DESC"},
{"columnName": "premTypDesc", "transformFunction": "PREM_TYP_DESC"},
{"columnName": "stationName", "transformFunction": "STATION_NAME"},
{"columnName": "suspAgeGroup", "transformFunction": "SUSP_AGE_GROUP"},
{"columnName": "suspRace", "transformFunction": "SUSP_RACE"},
{"columnName": "suspSex", "transformFunction": "SUSP_SEX"},
{"columnName": "transitDistrict", "transformFunction": "TRANSIT_DISTRICT"},
{"columnName": "vicAgeGroup", "transformFunction": "VIC_AGE_GROUP"},
{"columnName": "vicRace", "transformFunction": "VIC_RACE"},
{"columnName": "vicSex", "transformFunction": "VIC_SEX"},
{"columnName": "xCoordCd", "transformFunction": "X_COORD_CD"},
{"columnName": "yCoordCd", "transformFunction": "Y_COORD_CD"},
{"columnName": "latitude", "transformFunction": "Latitude"},
{"columnName": "longitude", "transformFunction": "Longitude"},
{"columnName": "latLon", "transformFunction": "\"Lat_Lon\""},
{"columnName": "newGeoreferencedColumn", "transformFunction": "\"New Georeferenced Column\""}
],
"filterConfig": {
}
},
"metadata": {}
}
config/table.json
The field names in the CSV don't fit the camel case style that we use for fields, so we need to define a series of transformConfigs
to handle that mapping.
You can create the table and schema by running the following command:`
docker run \
--network nypd \
-v $PWD/config:/config \
apachepinot/pinot:1.0.0 AddTable \
-schemaFile /config/schema.json \
-tableConfigFile /config/table.json \
-controllerHost "pinot-controller-nypd" \
-exec
Ingesting CSV files
We're going to use Pinot's SQL insert syntax to import the CSV files.
Navigate to the Pinot UI (opens in a new tab) and click on Query Console
You can then run the following query to run the ingestion job:
SET taskName = 'nypd-dataload';
SET input.fs.className = 'org.apache.pinot.spi.filesystem.LocalPinotFS';
SET includeFileNamePattern='glob:**/*.csv';
INSERT INTO nypdComplaintData
FROM FILE 'file:///data/';
Wait a few minutes for the job to complete. You can check on the status by navigating to the Minion Task Manager (opens in a new tab) page.
Querying the dataset
Once the data is loaded, we can run some queries:
select boroNm, count(*)
from nypdComplaintData
group by boroNm
order by count(*) DESC
limit 10