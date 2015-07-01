跳到主要内容
pg_clickhouse 教程

概览

本教程延续 [ClickHouse tutorial] 的内容，但所有查询均通过 pg_clickhouse 执行。

启动 ClickHouse

首先，如果你还没有 ClickHouse 数据库，请先创建一个。一个快速开始的方式是使用 Docker 镜像：

docker run -d --network host --name clickhouse -p 8123:8123 -p9000:9000 --ulimit nofile=262144:262144 clickhouse
docker exec -it clickhouse clickhouse-client

创建表

让我们参考 [ClickHouse 教程] 中的示例，使用纽约市出租车数据集创建一个简单的数据库：

CREATE DATABASE taxi;
CREATE TABLE taxi.trips
(
    trip_id UInt32,
    vendor_id Enum8(
        '1'      =  1, '2'      =  2, '3'      =  3, '4'      =  4,
        'CMT'    =  5, 'VTS'    =  6, 'DDS'    =  7, 'B02512' = 10,
        'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
        ''       = 15
    ),
    pickup_date Date,
    pickup_datetime DateTime,
    dropoff_date Date,
    dropoff_datetime DateTime,
    store_and_fwd_flag UInt8,
    rate_code_id UInt8,
    pickup_longitude Float64,
    pickup_latitude Float64,
    dropoff_longitude Float64,
    dropoff_latitude Float64,
    passenger_count UInt8,
    trip_distance Float64,
    fare_amount Decimal(10, 2),
    extra Decimal(10, 2),
    mta_tax Decimal(10, 2),
    tip_amount Decimal(10, 2),
    tolls_amount Decimal(10, 2),
    ehail_fee Decimal(10, 2),
    improvement_surcharge Decimal(10, 2),
    total_amount Decimal(10, 2),
    payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
    trip_type UInt8,
    pickup FixedString(25),
    dropoff FixedString(25),
    cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
    pickup_nyct2010_gid Int8,
    pickup_ctlabel Float32,
    pickup_borocode Int8,
    pickup_ct2010 String,
    pickup_boroct2010 String,
    pickup_cdeligibil String,
    pickup_ntacode FixedString(4),
    pickup_ntaname String,
    pickup_puma UInt16,
    dropoff_nyct2010_gid UInt8,
    dropoff_ctlabel Float32,
    dropoff_borocode UInt8,
    dropoff_ct2010 String,
    dropoff_boroct2010 String,
    dropoff_cdeligibil String,
    dropoff_ntacode FixedString(4),
    dropoff_ntaname String,
    dropoff_puma UInt16
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime;

添加数据集

接下来导入数据：

INSERT INTO taxi.trips
SELECT * FROM s3(
    'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{1..2}.gz',
    'TabSeparatedWithNames', "
    trip_id UInt32,
    vendor_id Enum8(
        '1'      =  1, '2'      =  2, '3'      =  3, '4'      =  4,
        'CMT'    =  5, 'VTS'    =  6, 'DDS'    =  7, 'B02512' = 10,
        'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
        ''       = 15
    ),
    pickup_date Date,
    pickup_datetime DateTime,
    dropoff_date Date,
    dropoff_datetime DateTime,
    store_and_fwd_flag UInt8,
    rate_code_id UInt8,
    pickup_longitude Float64,
    pickup_latitude Float64,
    dropoff_longitude Float64,
    dropoff_latitude Float64,
    passenger_count UInt8,
    trip_distance Float64,
    fare_amount Decimal(10, 2),
    extra Decimal(10, 2),
    mta_tax Decimal(10, 2),
    tip_amount Decimal(10, 2),
    tolls_amount Decimal(10, 2),
    ehail_fee Decimal(10, 2),
    improvement_surcharge Decimal(10, 2),
    total_amount Decimal(10, 2),
    payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
    trip_type UInt8,
    pickup FixedString(25),
    dropoff FixedString(25),
    cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
    pickup_nyct2010_gid Int8,
    pickup_ctlabel Float32,
    pickup_borocode Int8,
    pickup_ct2010 String,
    pickup_boroct2010 String,
    pickup_cdeligibil String,
    pickup_ntacode FixedString(4),
    pickup_ntaname String,
    pickup_puma UInt16,
    dropoff_nyct2010_gid UInt8,
    dropoff_ctlabel Float32,
    dropoff_borocode UInt8,
    dropoff_ct2010 String,
    dropoff_boroct2010 String,
    dropoff_cdeligibil String,
    dropoff_ntacode FixedString(4),
    dropoff_ntaname String,
    dropoff_puma UInt16
") SETTINGS input_format_try_infer_datetimes = 0

先确认我们可以查询它，然后退出客户端：

SELECT count() FROM taxi.trips;
quit

安装 pg_clickhouse

PGXNGitHub 构建并安装 pg_clickhouse。或者使用 [pg_clickhouse image] 启动一个 Docker 容器，该镜像只是将 pg_clickhouse 添加到 Docker 的 Postgres image 中：

docker run --network host --name pg_clickhouse -e POSTGRES_PASSWORD=my_pass \
       -d ghcr.io/clickhouse/pg_clickhouse:18 -U postgres

连接 pg_clickhouse

现在连接到 Postgres，并创建 pg_clickhouse：

CREATE EXTENSION pg_clickhouse;

使用 ClickHouse 数据库的主机名、端口和数据库名称创建一个外部服务器。

CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
       OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');

在此我们选择使用二进制驱动，它使用 ClickHouse 的二进制协议。您也可以使用 "http" 驱动，它使用 HTTP 接口。

接下来，将 PostgreSQL 用户映射到 ClickHouse 用户。最简单的方法就是将当前 PostgreSQL 用户映射到外部服务器上的远程用户：

CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
       OPTIONS (user 'default');

你还可以指定一个 password 选项。

现在，要添加 taxi 表，只需将远程 ClickHouse 数据库中的所有表导入到一个 Postgres 模式中：

CREATE SCHEMA taxi;
IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi;

现在，表应该已经导入完成。在 psql 中使用 \det+ 来查看：

taxi=# \det+ taxi.*
                                       List of foreign tables
 Schema | Table |  Server  |                        FDW options                        | Description 
--------+-------+----------+-----------------------------------------------------------+-------------
 taxi   | trips | taxi_srv | (database 'taxi', table_name 'trips', engine 'MergeTree') | [null]
(1 row)

成功！使用 \d 查看所有列：

taxi=# \d taxi.trips
                                     Foreign table "taxi.trips"
        Column         |            Type             | Collation | Nullable | Default | FDW options 
-----------------------+-----------------------------+-----------+----------+---------+-------------
 trip_id               | bigint                      |           | not null |         | 
 vendor_id             | text                        |           | not null |         | 
 pickup_date           | date                        |           | not null |         | 
 pickup_datetime       | timestamp without time zone |           | not null |         | 
 dropoff_date          | date                        |           | not null |         | 
 dropoff_datetime      | timestamp without time zone |           | not null |         | 
 store_and_fwd_flag    | smallint                    |           | not null |         | 
 rate_code_id          | smallint                    |           | not null |         | 
 pickup_longitude      | double precision            |           | not null |         | 
 pickup_latitude       | double precision            |           | not null |         | 
 dropoff_longitude     | double precision            |           | not null |         | 
 dropoff_latitude      | double precision            |           | not null |         | 
 passenger_count       | smallint                    |           | not null |         | 
 trip_distance         | double precision            |           | not null |         | 
 fare_amount           | numeric(10,2)               |           | not null |         | 
 extra                 | numeric(10,2)               |           | not null |         | 
 mta_tax               | numeric(10,2)               |           | not null |         | 
 tip_amount            | numeric(10,2)               |           | not null |         | 
 tolls_amount          | numeric(10,2)               |           | not null |         | 
 ehail_fee             | numeric(10,2)               |           | not null |         | 
 improvement_surcharge | numeric(10,2)               |           | not null |         | 
 total_amount          | numeric(10,2)               |           | not null |         | 
 payment_type          | text                        |           | not null |         | 
 trip_type             | smallint                    |           | not null |         | 
 pickup                | character varying(25)       |           | not null |         | 
 dropoff               | character varying(25)       |           | not null |         | 
 cab_type              | text                        |           | not null |         | 
 pickup_nyct2010_gid   | smallint                    |           | not null |         | 
 pickup_ctlabel        | real                        |           | not null |         | 
 pickup_borocode       | smallint                    |           | not null |         | 
 pickup_ct2010         | text                        |           | not null |         | 
 pickup_boroct2010     | text                        |           | not null |         | 
 pickup_cdeligibil     | text                        |           | not null |         | 
 pickup_ntacode        | character varying(4)        |           | not null |         | 
 pickup_ntaname        | text                        |           | not null |         | 
 pickup_puma           | integer                     |           | not null |         | 
 dropoff_nyct2010_gid  | smallint                    |           | not null |         | 
 dropoff_ctlabel       | real                        |           | not null |         | 
 dropoff_borocode      | smallint                    |           | not null |         | 
 dropoff_ct2010        | text                        |           | not null |         | 
 dropoff_boroct2010    | text                        |           | not null |         | 
 dropoff_cdeligibil    | text                        |           | not null |         | 
 dropoff_ntacode       | character varying(4)        |           | not null |         | 
 dropoff_ntaname       | text                        |           | not null |         | 
 dropoff_puma          | integer                     |           | not null |         | 
Server: taxi_srv
FDW options: (database 'taxi', table_name 'trips', engine 'MergeTree')

现在查询该表：

 SELECT count(*) FROM taxi.trips;
   count  
 ---------
  1999657
 (1 row)

注意查询执行得有多快。pg_clickhouse 会下推整个 查询，包括 COUNT() 聚合，这样查询就在 ClickHouse 上运行，只 向 Postgres 返回单行数据。使用 EXPLAIN 查看执行计划：

 EXPLAIN select count(*) from taxi.trips;
                    QUERY PLAN                    
 -------------------------------------------------
  Foreign Scan  (cost=1.00..-0.90 rows=1 width=8)
    Relations: Aggregate on (trips)
 (2 rows)

请注意，在执行计划的根节点中出现了 "Foreign Scan"，这意味着整个查询都被下推到 ClickHouse 执行。

分析数据

运行一些查询来分析数据。可以参考以下示例，或尝试编写自己的 SQL 查询。

  • 计算平均小费金额：

    taxi=# \timing
Timing is on.
taxi=# SELECT round(avg(tip_amount), 2) FROM taxi.trips;
 round 
-------
  1.68
(1 row)

Time: 9.438 ms

  • 根据乘客人数计算人均费用：

    taxi=# SELECT
        passenger_count,
        avg(total_amount)::NUMERIC(10, 2) AS average_total_amount
    FROM taxi.trips
    GROUP BY passenger_count;
 passenger_count | average_total_amount 
-----------------+----------------------
               0 |                22.68
               1 |                15.96
               2 |                17.14
               3 |                16.75
               4 |                17.32
               5 |                16.34
               6 |                16.03
               7 |                59.79
               8 |                36.40
               9 |                 9.79
(10 rows)

Time: 27.266 ms

  • 计算各社区每日上车次数：

    taxi=# SELECT
    pickup_date,
    pickup_ntaname,
    SUM(1) AS number_of_trips
FROM taxi.trips
GROUP BY pickup_date, pickup_ntaname
ORDER BY pickup_date ASC LIMIT 10;
 pickup_date |         pickup_ntaname         | number_of_trips 
-------------+--------------------------------+-----------------
 2015-07-01  | Williamsburg                   |               1
 2015-07-01  | park-cemetery-etc-Queens       |               6
 2015-07-01  | Maspeth                        |               1
 2015-07-01  | Stuyvesant Town-Cooper Village |              44
 2015-07-01  | Rego Park                      |               1
 2015-07-01  | Greenpoint                     |               7
 2015-07-01  | Highbridge                     |               1
 2015-07-01  | Briarwood-Jamaica Hills        |               3
 2015-07-01  | Airport                        |             550
 2015-07-01  | East Harlem North              |              32
(10 rows)

Time: 30.978 ms

  • 计算每次行程的时长（分钟），然后按行程时长对结果进行分组：

    taxi=# SELECT
    avg(tip_amount) AS avg_tip,
    avg(fare_amount) AS avg_fare,
    avg(passenger_count) AS avg_passenger,
    count(*) AS count,
    round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) as trip_minutes
FROM taxi.trips
WHERE round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) > 0
GROUP BY trip_minutes
ORDER BY trip_minutes DESC
LIMIT 5;
      avg_tip      |     avg_fare     |  avg_passenger   | count | trip_minutes 
-------------------+------------------+------------------+-------+--------------
              1.96 |                8 |                1 |     1 |        27512
                 0 |               12 |                2 |     1 |        27500
 0.562727272727273 | 17.4545454545455 | 2.45454545454545 |    11 |         1440
 0.716564885496183 | 14.2786259541985 | 1.94656488549618 |   131 |         1439
  1.00945205479452 | 12.8787671232877 | 1.98630136986301 |   146 |         1438
(5 rows)

Time: 45.477 ms

  • 按一天中的小时，展示各个街区的接载次数：

    taxi=# SELECT
    pickup_ntaname,
    date_part('hour', pickup_datetime) as pickup_hour,
    SUM(1) AS pickups
FROM taxi.trips
WHERE pickup_ntaname != ''
GROUP BY pickup_ntaname, pickup_hour
ORDER BY pickup_ntaname, date_part('hour', pickup_datetime)
LIMIT 5;
 pickup_ntaname | pickup_hour | pickups 
----------------+-------------+---------
 Airport        |           0 |    3509
 Airport        |           1 |    1184
 Airport        |           2 |     401
 Airport        |           3 |     152
 Airport        |           4 |     213
(5 rows)

Time: 36.895 ms

  • 查询前往 LaGuardia 或 JFK 机场的行程：

    taxi=# SELECT
    pickup_datetime,
    dropoff_datetime,
    total_amount,
    pickup_nyct2010_gid,
    dropoff_nyct2010_gid,
    CASE
        WHEN dropoff_nyct2010_gid = 138 THEN 'LGA'
        WHEN dropoff_nyct2010_gid = 132 THEN 'JFK'
    END AS airport_code,
    EXTRACT(YEAR FROM pickup_datetime) AS year,
    EXTRACT(DAY FROM pickup_datetime) AS day,
    EXTRACT(HOUR FROM pickup_datetime) AS hour
FROM taxi.trips
WHERE dropoff_nyct2010_gid IN (132, 138)
ORDER BY pickup_datetime
LIMIT 5;
   pickup_datetime   |  dropoff_datetime   | total_amount | pickup_nyct2010_gid | dropoff_nyct2010_gid | airport_code | year | day | hour 
---------------------+---------------------+--------------+---------------------+----------------------+--------------+------+-----+------
 2015-07-01 00:04:14 | 2015-07-01 00:15:29 |        13.30 |                 -34 |                  132 | JFK          | 2015 |   1 |    0
 2015-07-01 00:09:42 | 2015-07-01 00:12:55 |         6.80 |                  50 |                  138 | LGA          | 2015 |   1 |    0
 2015-07-01 00:23:04 | 2015-07-01 00:24:39 |         4.80 |                -125 |                  132 | JFK          | 2015 |   1 |    0
 2015-07-01 00:27:51 | 2015-07-01 00:39:02 |        14.72 |                -101 |                  138 | LGA          | 2015 |   1 |    0
 2015-07-01 00:32:03 | 2015-07-01 00:55:39 |        39.34 |                  48 |                  138 | LGA          | 2015 |   1 |    0
(5 rows)

Time: 17.450 ms

创建字典

在你的 ClickHouse 服务中创建一个与表关联的字典。该表和字典都基于一个 CSV 文件，该文件为纽约市的每个社区提供一行数据。

这些社区会被映射到纽约市五个行政区（Bronx、Brooklyn、Manhattan、Queens 和 Staten Island）的名称，以及 Newark Airport（EWR）。

下面是你正在使用的 CSV 文件的节选，以表格形式展示。文件中的 LocationID 列映射到 trips 表中的 pickup_nyct2010_giddropoff_nyct2010_gid 列：

LocationIDBoroughZoneservice_zone
1EWRNewark AirportEWR
2QueensJamaica BayBoro Zone
3BronxAllerton/Pelham GardensBoro Zone
4ManhattanAlphabet CityYellow Zone
5Staten IslandArden HeightsBoro Zone

  1. 仍然在 Postgres 中，使用 clickhouse_raw_query 函数创建一个名为 taxi_zone_dictionary 的 ClickHouse dictionary，并从 S3 中的 CSV 文件填充该字典：

    SELECT clickhouse_raw_query($$
    CREATE DICTIONARY taxi.taxi_zone_dictionary (
        LocationID Int64 DEFAULT 0,
        Borough String,
        zone String,
        service_zone String
    )
    PRIMARY KEY LocationID
    SOURCE(HTTP(URL 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/taxi_zone_lookup.csv' FORMAT 'CSVWithNames'))
    LIFETIME(MIN 0 MAX 0)
    LAYOUT(HASHED_ARRAY())
$$, 'host=localhost dbname=taxi');
    注意

    LIFETIME 设置为 0 会禁用自动更新，以避免对我们的 S3 存储桶产生不必要的流量。在其他场景中，你可能会采用不同的配置。详情参见 Refreshing dictionary data using LIFETIME

    1. 现在导入它：
    IMPORT FOREIGN SCHEMA taxi LIMIT TO (taxi_zone_dictionary)
FROM SERVER taxi_srv INTO taxi;
    1. 确认我们可以对它进行查询：
    taxi=# SELECT * FROM taxi.taxi_zone_dictionary limit 3;
 LocationID |  Borough  |                     Zone                      | service_zone 
------------+-----------+-----------------------------------------------+--------------
         77 | Brooklyn  | East New York/Pennsylvania Avenue             | Boro Zone
        106 | Brooklyn  | Gowanus                                       | Boro Zone
        103 | Manhattan | Governor's Island/Ellis Island/Liberty Island | Yellow Zone
(3 rows)
    1. 很好。现在使用 dictGet 函数在查询中获取行政区名称。该查询会汇总所有在 LaGuardia 或 JFK 机场结束的出租车行程，并按行政区统计数量：
    taxi=# SELECT
        count(1) AS total,
        COALESCE(NULLIF(dictGet(
            'taxi.taxi_zone_dictionary', 'Borough',
            toUInt64(pickup_nyct2010_gid)
        ), ''), 'Unknown') AS borough_name
    FROM taxi.trips
    WHERE dropoff_nyct2010_gid = 132 OR dropoff_nyct2010_gid = 138
    GROUP BY borough_name
    ORDER BY total DESC;
 total | borough_name  
-------+---------------
 23683 | Unknown
  7053 | Manhattan
  6828 | Brooklyn
  4458 | Queens
  2670 | Bronx
   554 | Staten Island
    53 | EWR
(7 rows)

Time: 66.245 ms

    此查询汇总了所有在 LaGuardia 或 JFK 机场结束的出租车行程，并按行政区统计数量。请注意，有相当多行程的上车社区是未知的。

执行一次 JOIN 操作

编写一些将 taxi_zone_dictionary 与你的 trips 表进行 JOIN 的查询。

  1. 从一个与上面机场查询类似的简单 JOIN 开始：

    taxi=# SELECT
    count(1) AS total,
    "Borough"
FROM taxi.trips
JOIN taxi.taxi_zone_dictionary
  ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID")
WHERE pickup_nyct2010_gid > 0
  AND dropoff_nyct2010_gid IN (132, 138)
GROUP BY "Borough"
ORDER BY total DESC;
 total | borough_name  
-------+---------------
  7053 | Manhattan
  6828 | Brooklyn
  4458 | Queens
  2670 | Bronx
   554 | Staten Island
    53 | EWR
(6 rows)

Time: 48.449 ms
    注意

    请注意，上述 JOIN 查询的输出与前面的 dictGet 查询相同（只是未包含 Unknown 值）。在底层，ClickHouse 实际上为 taxi_zone_dictionary 字典调用了 dictGet 函数，但 JOIN 语法对 SQL 开发者来说更加常见。

    taxi=# explain SELECT
        count(1) AS total,
        "Borough"
    FROM taxi.trips
    JOIN taxi.taxi_zone_dictionary
      ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID")
    WHERE pickup_nyct2010_gid > 0
      AND dropoff_nyct2010_gid IN (132, 138)
    GROUP BY "Borough"
    ORDER BY total DESC;
                              QUERY PLAN                               
-----------------------------------------------------------------------
 Foreign Scan  (cost=1.00..5.10 rows=1000 width=40)
   Relations: Aggregate on ((trips) INNER JOIN (taxi_zone_dictionary))
(2 rows)
Time: 2.012 ms

  2. 此查询返回小费金额最高的 1000 次出行对应的行，然后对每一行与字典执行一次内连接（inner join）：

    taxi=# SELECT *
FROM taxi.trips
JOIN taxi.taxi_zone_dictionary
    ON trips.dropoff_nyct2010_gid = taxi.taxi_zone_dictionary."LocationID"
WHERE tip_amount > 0
ORDER BY tip_amount DESC
LIMIT 1000;
注意

通常，我们会在 PostgreSQL 和 ClickHouse 中避免使用 SELECT *。你应当只获取实际需要的列。