Overview
This tutorial follows the [ClickHouse tutorial] but runs all its queries via
pg_clickhouse.
Start ClickHouse
First, create a ClickHouse database if you don't already have one. A quick way
to start is with the Docker image:
docker run -d --network host --name clickhouse -p 8123:8123 -p9000:9000 --ulimit nofile=262144:262144 clickhouse
docker exec -it clickhouse clickhouse-client
Create a Table
Let's borrow from the [ClickHouse tutorial] to create a simple database with The New York
City taxi dataset:
CREATE DATABASE taxi;
CREATE TABLE taxi.trips
(
trip_id UInt32,
vendor_id Enum8(
'1' = 1, '2' = 2, '3' = 3, '4' = 4,
'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10,
'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
'' = 15
),
pickup_date Date,
pickup_datetime DateTime,
dropoff_date Date,
dropoff_datetime DateTime,
store_and_fwd_flag UInt8,
rate_code_id UInt8,
pickup_longitude Float64,
pickup_latitude Float64,
dropoff_longitude Float64,
dropoff_latitude Float64,
passenger_count UInt8,
trip_distance Float64,
fare_amount Decimal(10, 2),
extra Decimal(10, 2),
mta_tax Decimal(10, 2),
tip_amount Decimal(10, 2),
tolls_amount Decimal(10, 2),
ehail_fee Decimal(10, 2),
improvement_surcharge Decimal(10, 2),
total_amount Decimal(10, 2),
payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
trip_type UInt8,
pickup FixedString(25),
dropoff FixedString(25),
cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
pickup_nyct2010_gid Int8,
pickup_ctlabel Float32,
pickup_borocode Int8,
pickup_ct2010 String,
pickup_boroct2010 String,
pickup_cdeligibil String,
pickup_ntacode FixedString(4),
pickup_ntaname String,
pickup_puma UInt16,
dropoff_nyct2010_gid UInt8,
dropoff_ctlabel Float32,
dropoff_borocode UInt8,
dropoff_ct2010 String,
dropoff_boroct2010 String,
dropoff_cdeligibil String,
dropoff_ntacode FixedString(4),
dropoff_ntaname String,
dropoff_puma UInt16
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime;
Add the Data Set
And then import the data:
INSERT INTO taxi.trips
SELECT * FROM s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{1..2}.gz',
'TabSeparatedWithNames', "
trip_id UInt32,
vendor_id Enum8(
'1' = 1, '2' = 2, '3' = 3, '4' = 4,
'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10,
'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
'' = 15
),
pickup_date Date,
pickup_datetime DateTime,
dropoff_date Date,
dropoff_datetime DateTime,
store_and_fwd_flag UInt8,
rate_code_id UInt8,
pickup_longitude Float64,
pickup_latitude Float64,
dropoff_longitude Float64,
dropoff_latitude Float64,
passenger_count UInt8,
trip_distance Float64,
fare_amount Decimal(10, 2),
extra Decimal(10, 2),
mta_tax Decimal(10, 2),
tip_amount Decimal(10, 2),
tolls_amount Decimal(10, 2),
ehail_fee Decimal(10, 2),
improvement_surcharge Decimal(10, 2),
total_amount Decimal(10, 2),
payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
trip_type UInt8,
pickup FixedString(25),
dropoff FixedString(25),
cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
pickup_nyct2010_gid Int8,
pickup_ctlabel Float32,
pickup_borocode Int8,
pickup_ct2010 String,
pickup_boroct2010 String,
pickup_cdeligibil String,
pickup_ntacode FixedString(4),
pickup_ntaname String,
pickup_puma UInt16,
dropoff_nyct2010_gid UInt8,
dropoff_ctlabel Float32,
dropoff_borocode UInt8,
dropoff_ct2010 String,
dropoff_boroct2010 String,
dropoff_cdeligibil String,
dropoff_ntacode FixedString(4),
dropoff_ntaname String,
dropoff_puma UInt16
") SETTINGS input_format_try_infer_datetimes = 0
Make sure we can query it then quit the client:
SELECT count() FROM taxi.trips;
quit
Install pg_clickhouse
Build and install pg_clickhouse from PGXN or GitHub. Or spin up a
Docker container using the pg_clickhouse image, which simply adds
pg_clickhouse to the Docker Postgres image:
docker run --network host --name pg_clickhouse -e POSTGRES_PASSWORD=my_pass \
-d ghcr.io/clickhouse/pg_clickhouse:18 -U postgres
Connect pg_clickhouse
Now connect to Postgres and create pg_clickhouse:
CREATE EXTENSION pg_clickhouse;
Create a foreign server using the host name, port, and database for your
ClickHouse database.
CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');
Here we've elected to use the binary driver, which uses the ClickHouse binary
protocol. You can also use the "http" driver, which uses the HTTP interface.
Next, map a PostgreSQLu user to a ClickHouse user. The simplest way to do so
is just to map the current PostgreSQL user to a remote user for the foreign
server:
CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
OPTIONS (user 'default');
You can also specify a password option.
Now, add the taxi table, just import it all of the tables from the remote
ClickHouse database into a Postgres schema:
CREATE SCHEMA taxi;
IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi;
And now the table should be imported: In psql, use \det+ to see it:
taxi=# \det+ taxi.*
List of foreign tables
Schema | Table | Server | FDW options | Description
--------+-------+----------+-----------------------------------------------------------+-------------
taxi | trips | taxi_srv | (database 'taxi', table_name 'trips', engine 'MergeTree') | [null]
(1 row)
Success! Use \d to show all the columns:
taxi=# \d taxi.trips
Foreign table "taxi.trips"
Column | Type | Collation | Nullable | Default | FDW options
-----------------------+-----------------------------+-----------+----------+---------+-------------
trip_id | bigint | | not null | |
vendor_id | text | | not null | |
pickup_date | date | | not null | |
pickup_datetime | timestamp without time zone | | not null | |
dropoff_date | date | | not null | |
dropoff_datetime | timestamp without time zone | | not null | |
store_and_fwd_flag | smallint | | not null | |
rate_code_id | smallint | | not null | |
pickup_longitude | double precision | | not null | |
pickup_latitude | double precision | | not null | |
dropoff_longitude | double precision | | not null | |
dropoff_latitude | double precision | | not null | |
passenger_count | smallint | | not null | |
trip_distance | double precision | | not null | |
fare_amount | numeric(10,2) | | not null | |
extra | numeric(10,2) | | not null | |
mta_tax | numeric(10,2) | | not null | |
tip_amount | numeric(10,2) | | not null | |
tolls_amount | numeric(10,2) | | not null | |
ehail_fee | numeric(10,2) | | not null | |
improvement_surcharge | numeric(10,2) | | not null | |
total_amount | numeric(10,2) | | not null | |
payment_type | text | | not null | |
trip_type | smallint | | not null | |
pickup | character varying(25) | | not null | |
dropoff | character varying(25) | | not null | |
cab_type | text | | not null | |
pickup_nyct2010_gid | smallint | | not null | |
pickup_ctlabel | real | | not null | |
pickup_borocode | smallint | | not null | |
pickup_ct2010 | text | | not null | |
pickup_boroct2010 | text | | not null | |
pickup_cdeligibil | text | | not null | |
pickup_ntacode | character varying(4) | | not null | |
pickup_ntaname | text | | not null | |
pickup_puma | integer | | not null | |
dropoff_nyct2010_gid | smallint | | not null | |
dropoff_ctlabel | real | | not null | |
dropoff_borocode | smallint | | not null | |
dropoff_ct2010 | text | | not null | |
dropoff_boroct2010 | text | | not null | |
dropoff_cdeligibil | text | | not null | |
dropoff_ntacode | character varying(4) | | not null | |
dropoff_ntaname | text | | not null | |
dropoff_puma | integer | | not null | |
Server: taxi_srv
FDW options: (database 'taxi', table_name 'trips', engine 'MergeTree')
Now query the table:
SELECT count(*) FROM taxi.trips;
count
---------
1999657
(1 row)
Note how quickly the query executed. pg_clickhouse pushes down the entire
query, including the COUNT() aggregate, so it runs on ClickHouse and only
returns the single row to Postgres. Use EXPLAIN to see it:
EXPLAIN select count(*) from taxi.trips;
QUERY PLAN
-------------------------------------------------
Foreign Scan (cost=1.00..-0.90 rows=1 width=8)
Relations: Aggregate on (trips)
(2 rows)
Note that "Foreign Scan" appears at the root of the plan, meaning that the
entire query was pushed down to ClickHouse.
Analyze the data
Run some queries to analyze the data. Explore the following examples or try
your own SQL query.
-
Calculate the average tip amount:
taxi=# \timing
Timing is on.
taxi=# SELECT round(avg(tip_amount), 2) FROM taxi.trips;
round
-------
1.68
(1 row)
Time: 9.438 ms
-
Calculate the average cost based on the number of passengers:
taxi=# SELECT
passenger_count,
avg(total_amount)::NUMERIC(10, 2) AS average_total_amount
FROM taxi.trips
GROUP BY passenger_count;
passenger_count | average_total_amount
-----------------+----------------------
0 | 22.68
1 | 15.96
2 | 17.14
3 | 16.75
4 | 17.32
5 | 16.34
6 | 16.03
7 | 59.79
8 | 36.40
9 | 9.79
(10 rows)
Time: 27.266 ms
-
Calculate the daily number of pickups per neighborhood:
taxi=# SELECT
pickup_date,
pickup_ntaname,
SUM(1) AS number_of_trips
FROM taxi.trips
GROUP BY pickup_date, pickup_ntaname
ORDER BY pickup_date ASC LIMIT 10;
pickup_date | pickup_ntaname | number_of_trips
-------------+--------------------------------+-----------------
2015-07-01 | Williamsburg | 1
2015-07-01 | park-cemetery-etc-Queens | 6
2015-07-01 | Maspeth | 1
2015-07-01 | Stuyvesant Town-Cooper Village | 44
2015-07-01 | Rego Park | 1
2015-07-01 | Greenpoint | 7
2015-07-01 | Highbridge | 1
2015-07-01 | Briarwood-Jamaica Hills | 3
2015-07-01 | Airport | 550
2015-07-01 | East Harlem North | 32
(10 rows)
Time: 30.978 ms
-
Calculate the length of each trip in minutes, then group the results by
trip length:
taxi=# SELECT
avg(tip_amount) AS avg_tip,
avg(fare_amount) AS avg_fare,
avg(passenger_count) AS avg_passenger,
count(*) AS count,
round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) as trip_minutes
FROM taxi.trips
WHERE round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) > 0
GROUP BY trip_minutes
ORDER BY trip_minutes DESC
LIMIT 5;
avg_tip | avg_fare | avg_passenger | count | trip_minutes
-------------------+------------------+------------------+-------+--------------
1.96 | 8 | 1 | 1 | 27512
0 | 12 | 2 | 1 | 27500
0.562727272727273 | 17.4545454545455 | 2.45454545454545 | 11 | 1440
0.716564885496183 | 14.2786259541985 | 1.94656488549618 | 131 | 1439
1.00945205479452 | 12.8787671232877 | 1.98630136986301 | 146 | 1438
(5 rows)
Time: 45.477 ms
-
Show the number of pickups in each neighborhood broken down by hour of the day:
taxi=# SELECT
pickup_ntaname,
date_part('hour', pickup_datetime) as pickup_hour,
SUM(1) AS pickups
FROM taxi.trips
WHERE pickup_ntaname != ''
GROUP BY pickup_ntaname, pickup_hour
ORDER BY pickup_ntaname, date_part('hour', pickup_datetime)
LIMIT 5;
pickup_ntaname | pickup_hour | pickups
----------------+-------------+---------
Airport | 0 | 3509
Airport | 1 | 1184
Airport | 2 | 401
Airport | 3 | 152
Airport | 4 | 213
(5 rows)
Time: 36.895 ms
-
Retrieve rides to LaGuardia or JFK airports:
taxi=# SELECT
pickup_datetime,
dropoff_datetime,
total_amount,
pickup_nyct2010_gid,
dropoff_nyct2010_gid,
CASE
WHEN dropoff_nyct2010_gid = 138 THEN 'LGA'
WHEN dropoff_nyct2010_gid = 132 THEN 'JFK'
END AS airport_code,
EXTRACT(YEAR FROM pickup_datetime) AS year,
EXTRACT(DAY FROM pickup_datetime) AS day,
EXTRACT(HOUR FROM pickup_datetime) AS hour
FROM taxi.trips
WHERE dropoff_nyct2010_gid IN (132, 138)
ORDER BY pickup_datetime
LIMIT 5;
pickup_datetime | dropoff_datetime | total_amount | pickup_nyct2010_gid | dropoff_nyct2010_gid | airport_code | year | day | hour
---------------------+---------------------+--------------+---------------------+----------------------+--------------+------+-----+------
2015-07-01 00:04:14 | 2015-07-01 00:15:29 | 13.30 | -34 | 132 | JFK | 2015 | 1 | 0
2015-07-01 00:09:42 | 2015-07-01 00:12:55 | 6.80 | 50 | 138 | LGA | 2015 | 1 | 0
2015-07-01 00:23:04 | 2015-07-01 00:24:39 | 4.80 | -125 | 132 | JFK | 2015 | 1 | 0
2015-07-01 00:27:51 | 2015-07-01 00:39:02 | 14.72 | -101 | 138 | LGA | 2015 | 1 | 0
2015-07-01 00:32:03 | 2015-07-01 00:55:39 | 39.34 | 48 | 138 | LGA | 2015 | 1 | 0
(5 rows)
Time: 17.450 ms
Create a Dictionary
Create a dictionary associated with a table in your ClickHouse service. The
table and dictionary are based on a CSV file that contains a row for each
neighborhood in New York City.
The neighborhoods are mapped to the names of the five New York City boroughs
(Bronx, Brooklyn, Manhattan, Queens and Staten Island), as well as Newark
Airport (EWR).
Here's an excerpt from the CSV file you're using in table format. The
LocationID column in the file maps to the pickup_nyct2010_gid and
dropoff_nyct2010_gid columns in your trips table:
| LocationID | Borough | Zone | service_zone |
|---|
| 1 | EWR | Newark Airport | EWR |
| 2 | Queens | Jamaica Bay | Boro Zone |
| 3 | Bronx | Allerton/Pelham Gardens | Boro Zone |
| 4 | Manhattan | Alphabet City | Yellow Zone |
| 5 | Staten Island | Arden Heights | Boro Zone |
-
Still in Postgres, use the clickhouse_raw_query function to create a
ClickHouse dictionary named taxi_zone_dictionary and populate the
dictionary from the CSV file in S3:
SELECT clickhouse_raw_query($$
CREATE DICTIONARY taxi.taxi_zone_dictionary (
LocationID Int64 DEFAULT 0,
Borough String,
zone String,
service_zone String
)
PRIMARY KEY LocationID
SOURCE(HTTP(URL 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/taxi_zone_lookup.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 0)
LAYOUT(HASHED_ARRAY())
$$, 'host=localhost dbname=taxi');
Note
Setting LIFETIME to 0 disables automatic updates to avoid unnecessary
traffic to our S3 bucket. In other cases, you might configure it
differently. For details, see Refreshing dictionary data using
LIFETIME.
- Now import it:
IMPORT FOREIGN SCHEMA taxi LIMIT TO (taxi_zone_dictionary)
FROM SERVER taxi_srv INTO taxi;
- Confirm we can query it:
taxi=# SELECT * FROM taxi.taxi_zone_dictionary limit 3;
LocationID | Borough | Zone | service_zone
------------+-----------+-----------------------------------------------+--------------
77 | Brooklyn | East New York/Pennsylvania Avenue | Boro Zone
106 | Brooklyn | Gowanus | Boro Zone
103 | Manhattan | Governor's Island/Ellis Island/Liberty Island | Yellow Zone
(3 rows)
- Excellent. Now use the
dictGet function unction to retrieve a
borough's name in a query. For this query sums up the number of taxi
rides per borough that end at either the LaGuardia or JFK airport:
taxi=# SELECT
count(1) AS total,
COALESCE(NULLIF(dictGet(
'taxi.taxi_zone_dictionary', 'Borough',
toUInt64(pickup_nyct2010_gid)
), ''), 'Unknown') AS borough_name
FROM taxi.trips
WHERE dropoff_nyct2010_gid = 132 OR dropoff_nyct2010_gid = 138
GROUP BY borough_name
ORDER BY total DESC;
total | borough_name
-------+---------------
23683 | Unknown
7053 | Manhattan
6828 | Brooklyn
4458 | Queens
2670 | Bronx
554 | Staten Island
53 | EWR
(7 rows)
Time: 66.245 ms
This query sums up the number of taxi rides per borough that end at either
the LaGuardia or JFK airport. Notice there are quite a few trips where the
pickup neighborhood is unknown.
Write some queries that join the taxi_zone_dictionary with your trips
table.
-
Start with a simple JOIN that acts similarly to the previous airport
query above:
taxi=# SELECT
count(1) AS total,
"Borough"
FROM taxi.trips
JOIN taxi.taxi_zone_dictionary
ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID")
WHERE pickup_nyct2010_gid > 0
AND dropoff_nyct2010_gid IN (132, 138)
GROUP BY "Borough"
ORDER BY total DESC;
total | borough_name
-------+---------------
7053 | Manhattan
6828 | Brooklyn
4458 | Queens
2670 | Bronx
554 | Staten Island
53 | EWR
(6 rows)
Time: 48.449 ms
Note
Notice the output of the above JOIN query is the same as the dictGet
query above, (except that the Unknown values are not included). Behind
the scenes, ClickHouse is actually calling the dictGet function for
the taxi_zone_dictionary dictionary, but the JOIN syntax is more
familiar for SQL developers.
taxi=# explain SELECT
count(1) AS total,
"Borough"
FROM taxi.trips
JOIN taxi.taxi_zone_dictionary
ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID")
WHERE pickup_nyct2010_gid > 0
AND dropoff_nyct2010_gid IN (132, 138)
GROUP BY "Borough"
ORDER BY total DESC;
QUERY PLAN
-----------------------------------------------------------------------
Foreign Scan (cost=1.00..5.10 rows=1000 width=40)
Relations: Aggregate on ((trips) INNER JOIN (taxi_zone_dictionary))
(2 rows)
Time: 2.012 ms
-
This query returns rows for the the 1000 trips with the highest tip
amount, then performs an inner join of each row with the dictionary:
taxi=# SELECT *
FROM taxi.trips
JOIN taxi.taxi_zone_dictionary
ON trips.dropoff_nyct2010_gid = taxi.taxi_zone_dictionary."LocationID"
WHERE tip_amount > 0
ORDER BY tip_amount DESC
LIMIT 1000;
Note
Generally, we avoid using SELECT * in PostgreSQL and ClickHouse. You
should only retrieve the columns you actually need.