Try StarTree Cloud: 30-day free trial
NYPD Complaint Data

NYPD Complaints

This dataset contains complaints including valid felony, misdemeanor, and violation crimes reported to the New York City Police Department. You can learn more about the dataset at NYPD Complaint Data Current (Year To Date) (opens in a new tab).

Prerequisites

To follow the code examples in this guide, you must install Docker (opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven't already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/nypd-dataset

Download NYPD data

You can export a CSV file containing the data from https://data.cityofnewyork.us/Public-Safety/NYPD-Complaint-Data-Current-Year-To-Date-/5uac-w243 (opens in a new tab). Once you've got the CSV file, put it in the data directory.

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

 
docker compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml (opens in a new tab) file on GitHub.

Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

{
  "schemaName": "nypdComplaintData",
  "dimensionFieldSpecs": [
    {"name": "complntNum", "dataType": "STRING"},
    {"name": "addrPctCd", "dataType": "INT"},
    {"name": "boroNm", "dataType": "STRING"},
    {"name": "crmAtptCptdCd", "dataType": "STRING"},
    {"name": "hadevelopt", "dataType": "STRING"},
    {"name": "housingPsa", "dataType": "STRING"},
    {"name": "jurisdictionCode", "dataType": "INT"},
    {"name": "jurisDesc", "dataType": "STRING"},
    {"name": "kyCd", "dataType": "INT"},
    {"name": "lawCatCd", "dataType": "STRING"},
    {"name": "locOfOccurDesc", "dataType": "STRING"},
    {"name": "ofnsDesc", "dataType": "STRING"},
    {"name": "parksNm", "dataType": "STRING"},
    {"name": "patrolBoro", "dataType": "STRING"},
    {"name": "pdCd", "dataType": "INT"},
    {"name": "pdDesc", "dataType": "STRING"},
    {"name": "premTypDesc", "dataType": "STRING"},
    {"name": "stationName", "dataType": "STRING"},
    {"name": "suspAgeGroup", "dataType": "STRING"},
    {"name": "suspRace", "dataType": "STRING"},
    {"name": "suspSex", "dataType": "STRING"},
    {"name": "transitDistrict", "dataType": "INT"},
    {"name": "vicAgeGroup", "dataType": "STRING"},
    {"name": "vicRace", "dataType": "STRING"},
    {"name": "vicSex", "dataType": "STRING"},
    {"name": "latLon", "dataType": "STRING"},
    {"name": "newGeoreferencedColumn", "dataType": "STRING"},
    {"name": "latitude", "dataType": "FLOAT"},
    {"name": "longitude", "dataType": "FLOAT"},
    {"name": "xCoordCd", "dataType": "FLOAT"},
    {"name": "yCoordCd", "dataType": "FLOAT"}
  ],
  "metricFieldSpecs": [
 
  ],
  "dateTimeFieldSpecs": [
    {"name": "complntFrDtTm", "dataType": "TIMESTAMP", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
    {"name": "complntToDtTm", "dataType": "TIMESTAMP", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" },
    {"name": "rptDt", "dataType": "TIMESTAMP", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" }
  ]
}

config/schema.json

We'll also have the following table config:

{
    "tableName": "nypdComplaintData",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "replication": 1,
      "schemaName": "nypdComplaintData",
      "timeColumnName": "complntFrDtTm"
    },
    "tenants": {},
    "tableIndexConfig": {},
    "ingestionConfig": {
        "continueOnError": "true",
        "rowTimeValueCheck": "true",
        "transformConfigs": [
          {"columnName": "complntFrDtTm", "transformFunction": "FromDateTime(concat(CMPLNT_FR_DT, replace(CMPLNT_FR_TM, '(null)', '00:00:00'), ' '), 'MM/dd/yyyy HH:mm:ss')" },
          {"columnName": "complntToDtTm", "transformFunction": "FromDateTime(concat(CMPLNT_TO_DT, replace(CMPLNT_TO_TM, '(null)', '00:00:00'), ' '), 'MM/dd/yyyy HH:mm:ss')" },
          {"columnName": "rptDt", "transformFunction": "FromDateTime(RPT_DT, 'MM/dd/yyyy')" },
          {"columnName": "addrPctCd", "transformFunction": "ADDR_PCT_CD" },
          {"columnName": "complntNum", "transformFunction": "CMPLNT_NUM"},
          {"columnName": "boroNm", "transformFunction": "BORO_NM"},
          {"columnName": "crmAtptCptdCd", "transformFunction": "CRM_ATPT_CPTD_CD"},
          {"columnName": "hadevelopt", "transformFunction": "HADEVELOPT"},
          {"columnName": "housingPsa", "transformFunction": "HOUSING_PSA"},
          {"columnName": "jurisdictionCode", "transformFunction": "JURISDICTION_CODE"},
          {"columnName": "jurisDesc", "transformFunction": "JURIS_DESC"},
          {"columnName": "kyCd", "transformFunction": "KY_CD"},
          {"columnName": "lawCatCd", "transformFunction": "LAW_CAT_CD"},
          {"columnName": "locOfOccurDesc", "transformFunction": "LOC_OF_OCCUR_DESC"},
          {"columnName": "ofnsDesc", "transformFunction": "OFNS_DESC"},
          {"columnName": "parksNm", "transformFunction": "PARKS_NM"},
          {"columnName": "patrolBoro", "transformFunction": "PATROL_BORO"},
          {"columnName": "pdCd", "transformFunction": "PD_CD"},
          {"columnName": "pdDesc", "transformFunction": "PD_DESC"},
          {"columnName": "premTypDesc", "transformFunction": "PREM_TYP_DESC"},
          {"columnName": "stationName", "transformFunction": "STATION_NAME"},
          {"columnName": "suspAgeGroup", "transformFunction": "SUSP_AGE_GROUP"},
          {"columnName": "suspRace", "transformFunction": "SUSP_RACE"},
          {"columnName": "suspSex", "transformFunction": "SUSP_SEX"},
          {"columnName": "transitDistrict", "transformFunction": "TRANSIT_DISTRICT"},
          {"columnName": "vicAgeGroup", "transformFunction": "VIC_AGE_GROUP"},
          {"columnName": "vicRace", "transformFunction": "VIC_RACE"},
          {"columnName": "vicSex", "transformFunction": "VIC_SEX"},
          {"columnName": "xCoordCd", "transformFunction": "X_COORD_CD"},
          {"columnName": "yCoordCd", "transformFunction": "Y_COORD_CD"},
          {"columnName": "latitude", "transformFunction": "Latitude"},
          {"columnName": "longitude", "transformFunction": "Longitude"},
          {"columnName": "latLon", "transformFunction": "\"Lat_Lon\""},
          {"columnName": "newGeoreferencedColumn", "transformFunction": "\"New Georeferenced Column\""}
        
        ],
        "filterConfig": {
            
          }
      },
    "metadata": {}
  }   

config/table.json

The field names in the CSV don't fit the camel case style that we use for fields, so we need to define a series of transformConfigs to handle that mapping.

You can create the table and schema by running the following command:`

docker run \
   --network nypd \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table.json \
     -controllerHost "pinot-controller-nypd" \
    -exec

Ingesting CSV files

We're going to use Pinot's SQL insert syntax to import the CSV files. Navigate to the Pinot UI (opens in a new tab) and click on Query Console You can then run the following query to run the ingestion job:

SET taskName = 'nypd-dataload';
SET input.fs.className = 'org.apache.pinot.spi.filesystem.LocalPinotFS';
SET includeFileNamePattern='glob:**/*.csv';
INSERT INTO nypdComplaintData 
FROM FILE 'file:///data/';

Wait a few minutes for the job to complete. You can check on the status by navigating to the Minion Task Manager (opens in a new tab) page.

Querying the dataset

Once the data is loaded, we can run some queries:

select boroNm, count(*)
from nypdComplaintData 
group by boroNm
order by count(*) DESC
limit 10