pg_clickhouse 教程
概览
本教程延续 [ClickHouse tutorial] 的内容,但所有查询均通过 pg_clickhouse 执行。
启动 ClickHouse
首先,如果你还没有 ClickHouse 数据库,请先创建一个。一个快速开始的方式是使用 Docker 镜像:
docker run -d --network host --name clickhouse -p 8123:8123 -p9000:9000 --ulimit nofile=262144:262144 clickhouse
docker exec -it clickhouse clickhouse-client
创建表
让我们参考 [ClickHouse 教程] 中的示例,使用纽约市出租车数据集创建一个简单的数据库:
CREATE DATABASE taxi;
CREATE TABLE taxi.trips
(
trip_id UInt32,
vendor_id Enum8(
'1' = 1, '2' = 2, '3' = 3, '4' = 4,
'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10,
'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
'' = 15
),
pickup_date Date,
pickup_datetime DateTime,
dropoff_date Date,
dropoff_datetime DateTime,
store_and_fwd_flag UInt8,
rate_code_id UInt8,
pickup_longitude Float64,
pickup_latitude Float64,
dropoff_longitude Float64,
dropoff_latitude Float64,
passenger_count UInt8,
trip_distance Float64,
fare_amount Decimal(10, 2),
extra Decimal(10, 2),
mta_tax Decimal(10, 2),
tip_amount Decimal(10, 2),
tolls_amount Decimal(10, 2),
ehail_fee Decimal(10, 2),
improvement_surcharge Decimal(10, 2),
total_amount Decimal(10, 2),
payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
trip_type UInt8,
pickup FixedString(25),
dropoff FixedString(25),
cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
pickup_nyct2010_gid Int8,
pickup_ctlabel Float32,
pickup_borocode Int8,
pickup_ct2010 String,
pickup_boroct2010 String,
pickup_cdeligibil String,
pickup_ntacode FixedString(4),
pickup_ntaname String,
pickup_puma UInt16,
dropoff_nyct2010_gid UInt8,
dropoff_ctlabel Float32,
dropoff_borocode UInt8,
dropoff_ct2010 String,
dropoff_boroct2010 String,
dropoff_cdeligibil String,
dropoff_ntacode FixedString(4),
dropoff_ntaname String,
dropoff_puma UInt16
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime;
添加数据集
接下来导入数据:
INSERT INTO taxi.trips
SELECT * FROM s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{1..2}.gz',
'TabSeparatedWithNames', "
trip_id UInt32,
vendor_id Enum8(
'1' = 1, '2' = 2, '3' = 3, '4' = 4,
'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10,
'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
'' = 15
),
pickup_date Date,
pickup_datetime DateTime,
dropoff_date Date,
dropoff_datetime DateTime,
store_and_fwd_flag UInt8,
rate_code_id UInt8,
pickup_longitude Float64,
pickup_latitude Float64,
dropoff_longitude Float64,
dropoff_latitude Float64,
passenger_count UInt8,
trip_distance Float64,
fare_amount Decimal(10, 2),
extra Decimal(10, 2),
mta_tax Decimal(10, 2),
tip_amount Decimal(10, 2),
tolls_amount Decimal(10, 2),
ehail_fee Decimal(10, 2),
improvement_surcharge Decimal(10, 2),
total_amount Decimal(10, 2),
payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
trip_type UInt8,
pickup FixedString(25),
dropoff FixedString(25),
cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
pickup_nyct2010_gid Int8,
pickup_ctlabel Float32,
pickup_borocode Int8,
pickup_ct2010 String,
pickup_boroct2010 String,
pickup_cdeligibil String,
pickup_ntacode FixedString(4),
pickup_ntaname String,
pickup_puma UInt16,
dropoff_nyct2010_gid UInt8,
dropoff_ctlabel Float32,
dropoff_borocode UInt8,
dropoff_ct2010 String,
dropoff_boroct2010 String,
dropoff_cdeligibil String,
dropoff_ntacode FixedString(4),
dropoff_ntaname String,
dropoff_puma UInt16
") SETTINGS input_format_try_infer_datetimes = 0
先确认我们可以查询它,然后退出客户端:
SELECT count() FROM taxi.trips;
quit
安装 pg_clickhouse
从 PGXN 或 GitHub 构建并安装 pg_clickhouse。或者使用 [pg_clickhouse image] 启动一个 Docker 容器,该镜像只是将 pg_clickhouse 添加到 Docker 的 Postgres image 中:
docker run --network host --name pg_clickhouse -e POSTGRES_PASSWORD=my_pass \
-d ghcr.io/clickhouse/pg_clickhouse:18 -U postgres
连接 pg_clickhouse
现在连接到 Postgres,并创建 pg_clickhouse:
CREATE EXTENSION pg_clickhouse;
使用 ClickHouse 数据库的主机名、端口和数据库名称创建一个外部服务器。
CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');
在此我们选择使用二进制驱动,它使用 ClickHouse 的二进制协议。您也可以使用 "http" 驱动,它使用 HTTP 接口。
接下来,将 PostgreSQL 用户映射到 ClickHouse 用户。最简单的方法就是将当前 PostgreSQL 用户映射到外部服务器上的远程用户:
CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
OPTIONS (user 'default');
你还可以指定一个 password 选项。
现在,要添加 taxi 表,只需将远程 ClickHouse 数据库中的所有表导入到一个 Postgres 模式中:
CREATE SCHEMA taxi;
IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi;
现在,表应该已经导入完成。在 psql 中使用 \det+ 来查看:
taxi=# \det+ taxi.*
List of foreign tables
Schema | Table | Server | FDW options | Description
--------+-------+----------+-----------------------------------------------------------+-------------
taxi | trips | taxi_srv | (database 'taxi', table_name 'trips', engine 'MergeTree') | [null]
(1 row)
成功!使用 \d 查看所有列:
taxi=# \d taxi.trips
Foreign table "taxi.trips"
Column | Type | Collation | Nullable | Default | FDW options
-----------------------+-----------------------------+-----------+----------+---------+-------------
trip_id | bigint | | not null | |
vendor_id | text | | not null | |
pickup_date | date | | not null | |
pickup_datetime | timestamp without time zone | | not null | |
dropoff_date | date | | not null | |
dropoff_datetime | timestamp without time zone | | not null | |
store_and_fwd_flag | smallint | | not null | |
rate_code_id | smallint | | not null | |
pickup_longitude | double precision | | not null | |
pickup_latitude | double precision | | not null | |
dropoff_longitude | double precision | | not null | |
dropoff_latitude | double precision | | not null | |
passenger_count | smallint | | not null | |
trip_distance | double precision | | not null | |
fare_amount | numeric(10,2) | | not null | |
extra | numeric(10,2) | | not null | |
mta_tax | numeric(10,2) | | not null | |
tip_amount | numeric(10,2) | | not null | |
tolls_amount | numeric(10,2) | | not null | |
ehail_fee | numeric(10,2) | | not null | |
improvement_surcharge | numeric(10,2) | | not null | |
total_amount | numeric(10,2) | | not null | |
payment_type | text | | not null | |
trip_type | smallint | | not null | |
pickup | character varying(25) | | not null | |
dropoff | character varying(25) | | not null | |
cab_type | text | | not null | |
pickup_nyct2010_gid | smallint | | not null | |
pickup_ctlabel | real | | not null | |
pickup_borocode | smallint | | not null | |
pickup_ct2010 | text | | not null | |
pickup_boroct2010 | text | | not null | |
pickup_cdeligibil | text | | not null | |
pickup_ntacode | character varying(4) | | not null | |
pickup_ntaname | text | | not null | |
pickup_puma | integer | | not null | |
dropoff_nyct2010_gid | smallint | | not null | |
dropoff_ctlabel | real | | not null | |
dropoff_borocode | smallint | | not null | |
dropoff_ct2010 | text | | not null | |
dropoff_boroct2010 | text | | not null | |
dropoff_cdeligibil | text | | not null | |
dropoff_ntacode | character varying(4) | | not null | |
dropoff_ntaname | text | | not null | |
dropoff_puma | integer | | not null | |
Server: taxi_srv
FDW options: (database 'taxi', table_name 'trips', engine 'MergeTree')
现在查询该表:
SELECT count(*) FROM taxi.trips;
count
---------
1999657
(1 row)
注意查询执行得有多快。pg_clickhouse 会下推整个
查询,包括 COUNT() 聚合,这样查询就在 ClickHouse 上运行,只
向 Postgres 返回单行数据。使用 EXPLAIN 查看执行计划:
EXPLAIN select count(*) from taxi.trips;
QUERY PLAN
-------------------------------------------------
Foreign Scan (cost=1.00..-0.90 rows=1 width=8)
Relations: Aggregate on (trips)
(2 rows)
请注意,在执行计划的根节点中出现了 "Foreign Scan",这意味着整个查询都被下推到 ClickHouse 执行。
分析数据
运行一些查询来分析数据。可以参考以下示例,或尝试编写自己的 SQL 查询。
-
计算平均小费金额:
taxi=# \timing Timing is on. taxi=# SELECT round(avg(tip_amount), 2) FROM taxi.trips; round ------- 1.68 (1 row) Time: 9.438 ms -
根据乘客人数计算人均费用:
taxi=# SELECT passenger_count, avg(total_amount)::NUMERIC(10, 2) AS average_total_amount FROM taxi.trips GROUP BY passenger_count; passenger_count | average_total_amount -----------------+---------------------- 0 | 22.68 1 | 15.96 2 | 17.14 3 | 16.75 4 | 17.32 5 | 16.34 6 | 16.03 7 | 59.79 8 | 36.40 9 | 9.79 (10 rows) Time: 27.266 ms -
计算各社区每日上车次数:
taxi=# SELECT pickup_date, pickup_ntaname, SUM(1) AS number_of_trips FROM taxi.trips GROUP BY pickup_date, pickup_ntaname ORDER BY pickup_date ASC LIMIT 10; pickup_date | pickup_ntaname | number_of_trips -------------+--------------------------------+----------------- 2015-07-01 | Williamsburg | 1 2015-07-01 | park-cemetery-etc-Queens | 6 2015-07-01 | Maspeth | 1 2015-07-01 | Stuyvesant Town-Cooper Village | 44 2015-07-01 | Rego Park | 1 2015-07-01 | Greenpoint | 7 2015-07-01 | Highbridge | 1 2015-07-01 | Briarwood-Jamaica Hills | 3 2015-07-01 | Airport | 550 2015-07-01 | East Harlem North | 32 (10 rows) Time: 30.978 ms -
计算每次行程的时长(分钟),然后按行程时长对结果进行分组:
taxi=# SELECT avg(tip_amount) AS avg_tip, avg(fare_amount) AS avg_fare, avg(passenger_count) AS avg_passenger, count(*) AS count, round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) as trip_minutes FROM taxi.trips WHERE round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) > 0 GROUP BY trip_minutes ORDER BY trip_minutes DESC LIMIT 5; avg_tip | avg_fare | avg_passenger | count | trip_minutes -------------------+------------------+------------------+-------+-------------- 1.96 | 8 | 1 | 1 | 27512 0 | 12 | 2 | 1 | 27500 0.562727272727273 | 17.4545454545455 | 2.45454545454545 | 11 | 1440 0.716564885496183 | 14.2786259541985 | 1.94656488549618 | 131 | 1439 1.00945205479452 | 12.8787671232877 | 1.98630136986301 | 146 | 1438 (5 rows) Time: 45.477 ms -
按一天中的小时,展示各个街区的接载次数:
taxi=# SELECT pickup_ntaname, date_part('hour', pickup_datetime) as pickup_hour, SUM(1) AS pickups FROM taxi.trips WHERE pickup_ntaname != '' GROUP BY pickup_ntaname, pickup_hour ORDER BY pickup_ntaname, date_part('hour', pickup_datetime) LIMIT 5; pickup_ntaname | pickup_hour | pickups ----------------+-------------+--------- Airport | 0 | 3509 Airport | 1 | 1184 Airport | 2 | 401 Airport | 3 | 152 Airport | 4 | 213 (5 rows) Time: 36.895 ms -
查询前往 LaGuardia 或 JFK 机场的行程:
taxi=# SELECT pickup_datetime, dropoff_datetime, total_amount, pickup_nyct2010_gid, dropoff_nyct2010_gid, CASE WHEN dropoff_nyct2010_gid = 138 THEN 'LGA' WHEN dropoff_nyct2010_gid = 132 THEN 'JFK' END AS airport_code, EXTRACT(YEAR FROM pickup_datetime) AS year, EXTRACT(DAY FROM pickup_datetime) AS day, EXTRACT(HOUR FROM pickup_datetime) AS hour FROM taxi.trips WHERE dropoff_nyct2010_gid IN (132, 138) ORDER BY pickup_datetime LIMIT 5; pickup_datetime | dropoff_datetime | total_amount | pickup_nyct2010_gid | dropoff_nyct2010_gid | airport_code | year | day | hour ---------------------+---------------------+--------------+---------------------+----------------------+--------------+------+-----+------ 2015-07-01 00:04:14 | 2015-07-01 00:15:29 | 13.30 | -34 | 132 | JFK | 2015 | 1 | 0 2015-07-01 00:09:42 | 2015-07-01 00:12:55 | 6.80 | 50 | 138 | LGA | 2015 | 1 | 0 2015-07-01 00:23:04 | 2015-07-01 00:24:39 | 4.80 | -125 | 132 | JFK | 2015 | 1 | 0 2015-07-01 00:27:51 | 2015-07-01 00:39:02 | 14.72 | -101 | 138 | LGA | 2015 | 1 | 0 2015-07-01 00:32:03 | 2015-07-01 00:55:39 | 39.34 | 48 | 138 | LGA | 2015 | 1 | 0 (5 rows) Time: 17.450 ms
创建字典
在你的 ClickHouse 服务中创建一个与表关联的字典。该表和字典都基于一个 CSV 文件,该文件为纽约市的每个社区提供一行数据。
这些社区会被映射到纽约市五个行政区(Bronx、Brooklyn、Manhattan、Queens 和 Staten Island)的名称,以及 Newark Airport(EWR)。
下面是你正在使用的 CSV 文件的节选,以表格形式展示。文件中的 LocationID 列映射到 trips 表中的 pickup_nyct2010_gid 和
dropoff_nyct2010_gid 列:
| LocationID | Borough | Zone | service_zone |
|---|---|---|---|
| 1 | EWR | Newark Airport | EWR |
| 2 | Queens | Jamaica Bay | Boro Zone |
| 3 | Bronx | Allerton/Pelham Gardens | Boro Zone |
| 4 | Manhattan | Alphabet City | Yellow Zone |
| 5 | Staten Island | Arden Heights | Boro Zone |
-
仍然在 Postgres 中,使用
clickhouse_raw_query函数创建一个名为taxi_zone_dictionary的 ClickHouse dictionary,并从 S3 中的 CSV 文件填充该字典:SELECT clickhouse_raw_query($$ CREATE DICTIONARY taxi.taxi_zone_dictionary ( LocationID Int64 DEFAULT 0, Borough String, zone String, service_zone String ) PRIMARY KEY LocationID SOURCE(HTTP(URL 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/taxi_zone_lookup.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 0) LAYOUT(HASHED_ARRAY()) $$, 'host=localhost dbname=taxi');注意将
LIFETIME设置为 0 会禁用自动更新,以避免对我们的 S3 存储桶产生不必要的流量。在其他场景中,你可能会采用不同的配置。详情参见 Refreshing dictionary data using LIFETIME。- 现在导入它:
IMPORT FOREIGN SCHEMA taxi LIMIT TO (taxi_zone_dictionary) FROM SERVER taxi_srv INTO taxi;- 确认我们可以对它进行查询:
taxi=# SELECT * FROM taxi.taxi_zone_dictionary limit 3; LocationID | Borough | Zone | service_zone ------------+-----------+-----------------------------------------------+-------------- 77 | Brooklyn | East New York/Pennsylvania Avenue | Boro Zone 106 | Brooklyn | Gowanus | Boro Zone 103 | Manhattan | Governor's Island/Ellis Island/Liberty Island | Yellow Zone (3 rows)- 很好。现在使用
dictGet函数在查询中获取行政区名称。该查询会汇总所有在 LaGuardia 或 JFK 机场结束的出租车行程,并按行政区统计数量:
taxi=# SELECT count(1) AS total, COALESCE(NULLIF(dictGet( 'taxi.taxi_zone_dictionary', 'Borough', toUInt64(pickup_nyct2010_gid) ), ''), 'Unknown') AS borough_name FROM taxi.trips WHERE dropoff_nyct2010_gid = 132 OR dropoff_nyct2010_gid = 138 GROUP BY borough_name ORDER BY total DESC; total | borough_name -------+--------------- 23683 | Unknown 7053 | Manhattan 6828 | Brooklyn 4458 | Queens 2670 | Bronx 554 | Staten Island 53 | EWR (7 rows) Time: 66.245 ms此查询汇总了所有在 LaGuardia 或 JFK 机场结束的出租车行程,并按行政区统计数量。请注意,有相当多行程的上车社区是未知的。
执行一次 JOIN 操作
编写一些将 taxi_zone_dictionary 与你的 trips 表进行 JOIN 的查询。
-
从一个与上面机场查询类似的简单
JOIN开始:taxi=# SELECT count(1) AS total, "Borough" FROM taxi.trips JOIN taxi.taxi_zone_dictionary ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID") WHERE pickup_nyct2010_gid > 0 AND dropoff_nyct2010_gid IN (132, 138) GROUP BY "Borough" ORDER BY total DESC; total | borough_name -------+--------------- 7053 | Manhattan 6828 | Brooklyn 4458 | Queens 2670 | Bronx 554 | Staten Island 53 | EWR (6 rows) Time: 48.449 ms注意请注意,上述
JOIN查询的输出与前面的dictGet查询相同(只是未包含Unknown值)。在底层,ClickHouse 实际上为taxi_zone_dictionary字典调用了dictGet函数,但JOIN语法对 SQL 开发者来说更加常见。taxi=# explain SELECT count(1) AS total, "Borough" FROM taxi.trips JOIN taxi.taxi_zone_dictionary ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID") WHERE pickup_nyct2010_gid > 0 AND dropoff_nyct2010_gid IN (132, 138) GROUP BY "Borough" ORDER BY total DESC; QUERY PLAN ----------------------------------------------------------------------- Foreign Scan (cost=1.00..5.10 rows=1000 width=40) Relations: Aggregate on ((trips) INNER JOIN (taxi_zone_dictionary)) (2 rows) Time: 2.012 ms -
此查询返回小费金额最高的 1000 次出行对应的行,然后对每一行与字典执行一次内连接(inner join):
taxi=# SELECT * FROM taxi.trips JOIN taxi.taxi_zone_dictionary ON trips.dropoff_nyct2010_gid = taxi.taxi_zone_dictionary."LocationID" WHERE tip_amount > 0 ORDER BY tip_amount DESC LIMIT 1000;
通常,我们会在 PostgreSQL 和 ClickHouse 中避免使用 SELECT *。你应当只获取实际需要的列。