跳转到主内容
跳转到主内容

PostgreSQL 表引擎

PostgreSQL 引擎允许对存储在远程 PostgreSQL 服务器上的数据执行 SELECTINSERT 查询。

注意

当前仅支持 PostgreSQL 12 及以上版本的表引擎。

提示

了解我们的 Managed Postgres 服务。其采用与计算节点物理同机架的 NVMe 存储,相比使用 EBS 等网络附加存储的替代方案,对于受磁盘限制的工作负载可提供最高 10 倍的性能提升,并允许你通过 ClickPipes 中的 Postgres CDC(变更数据捕获)连接器将 Postgres 数据复制到 ClickHouse。

创建数据表

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 type1 [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 type2 [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
) ENGINE = PostgreSQL({host:port, database, table, user, password[, schema, [, on_conflict]] | named_collection[, option=value [,..]]})

参阅 CREATE TABLE 查询的详细说明。

表结构可以与原始 PostgreSQL 表结构不同:

  • 列名应当与原始 PostgreSQL 表中的列名相同,但可以只使用其中部分列,并按任意顺序排列。
  • 列类型可以与原始 PostgreSQL 表中的类型不同。ClickHouse 会尝试将值转换为 ClickHouse 的数据类型。
  • external_table_functions_use_nulls 设置定义了如何处理 Nullable 列。默认值:1。当为 0 时,表函数不会创建 Nullable 列,而是插入默认值来替代 null。这同样适用于数组中的 NULL 值。

引擎参数

  • host:port — PostgreSQL 服务器地址。
  • database — 远程数据库名称。
  • table — 远程表名称。
  • user — PostgreSQL 用户。
  • password — 用户密码。
  • schema — 非默认表的 schema。可选。
  • on_conflict — 冲突解决策略。例如:ON CONFLICT DO NOTHING。可选。注意:添加此选项会降低插入效率。

Named collections(自 21.11 版本起可用)推荐在生产环境中使用。示例如下:

<named_collections>
    <postgres_creds>
        <host>localhost</host>
        <port>5432</port>
        <user>postgres</user>
        <password>****</password>
        <schema>schema1</schema>
    </postgres_creds>
</named_collections>

可以通过键值对参数来覆盖某些参数:

SELECT * FROM postgresql(postgres_creds, table='table1');

实现细节

PostgreSQL 端的 SELECT 查询在只读 PostgreSQL 事务中以 COPY (SELECT ...) TO STDOUT 的形式运行,每个 SELECT 查询结束后都会提交事务。

诸如 =, !=, >, >=, <, <=IN 这类简单的 WHERE 子句在 PostgreSQL 服务器上执行。

所有连接、聚合、排序、IN [ array ] 条件以及 LIMIT 采样约束,都会在对 PostgreSQL 的查询完成之后,仅在 ClickHouse 中执行。

PostgreSQL 端的 INSERT 查询在 PostgreSQL 事务中以 COPY "table_name" (field1, field2, ... fieldN) FROM STDIN 的形式运行,并在每条 INSERT 语句之后自动提交。

PostgreSQL 的 Array 类型会被转换为 ClickHouse 的数组。

注意

请注意:在 PostgreSQL 中,以 type_name[] 创建的数组数据,在同一列的不同行中,可能包含维度不同的多维数组。但在 ClickHouse 中,同一列的所有行中的多维数组维度必须相同。

支持多个副本,副本之间需要用 | 分隔。例如:

CREATE TABLE test_replicas (id UInt32, name String) ENGINE = PostgreSQL(`postgres{2|3|4}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword');

PostgreSQL 字典源支持副本优先级。映射中的数字越大,优先级越低。最高优先级为 0

在下面的示例中,副本 example01-1 具有最高优先级:

<postgresql>
    <port>5432</port>
    <user>clickhouse</user>
    <password>qwerty</password>
    <replica>
        <host>example01-1</host>
        <priority>1</priority>
    </replica>
    <replica>
        <host>example01-2</host>
        <priority>2</priority>
    </replica>
    <db>db_name</db>
    <table>table_name</table>
    <where>id=10</where>
    <invalidate_query>SQL_QUERY</invalidate_query>
</postgresql>
</source>

使用示例

PostgreSQL 中的表

postgres=# CREATE TABLE "public"."test" (
"int_id" SERIAL,
"int_nullable" INT NULL DEFAULT NULL,
"float" FLOAT NOT NULL,
"str" VARCHAR(100) NOT NULL DEFAULT '',
"float_nullable" FLOAT NULL DEFAULT NULL,
PRIMARY KEY (int_id));

CREATE TABLE

postgres=# INSERT INTO test (int_id, str, "float") VALUES (1,'test',2);
INSERT 0 1

postgresql> SELECT * FROM test;
  int_id | int_nullable | float | str  | float_nullable
 --------+--------------+-------+------+----------------
       1 |              |     2 | test |
 (1 row)

在 ClickHouse 中创建表并连接到上文创建的 PostgreSQL 表

此示例使用 PostgreSQL 表引擎,将 ClickHouse 表连接到 PostgreSQL 表,并在 PostgreSQL 数据库上同时执行 SELECT 和 INSERT 查询:

CREATE TABLE default.postgresql_table
(
    `float_nullable` Nullable(Float32),
    `str` String,
    `int_id` Int32
)
ENGINE = PostgreSQL('localhost:5432', 'public', 'test', 'postgres_user', 'postgres_password');

使用 SELECT 查询将 PostgreSQL 表中的初始数据插入到 ClickHouse 表中

postgresql table function 会将数据从 PostgreSQL 复制到 ClickHouse,通常用于在 ClickHouse 而非 PostgreSQL 中执行查询或分析,从而提升数据的查询性能,也可用于将数据从 PostgreSQL 迁移到 ClickHouse。由于我们将数据从 PostgreSQL 复制到 ClickHouse,因此会在 ClickHouse 中使用 MergeTree 表引擎,并将其命名为 postgresql_copy:

CREATE TABLE default.postgresql_copy
(
    `float_nullable` Nullable(Float32),
    `str` String,
    `int_id` Int32
)
ENGINE = MergeTree
ORDER BY (int_id);
INSERT INTO default.postgresql_copy
SELECT * FROM postgresql('localhost:5432', 'public', 'test', 'postgres_user', 'postgres_password');

将 PostgreSQL 表中的增量数据插入到 ClickHouse 表中

如果在初始插入之后,随后需要在 PostgreSQL 表和 ClickHouse 表之间执行持续同步,可以在 ClickHouse 中使用 WHERE 子句,根据时间戳或唯一序列 ID,仅插入新增到 PostgreSQL 的数据。

这需要跟踪之前已插入的最大 ID 或时间戳,例如:

SELECT max(`int_id`) AS maxIntID FROM default.postgresql_copy;

然后从 PostgreSQL 表中插入大于该最大值的记录

INSERT INTO default.postgresql_copy
SELECT * FROM postgresql('localhost:5432', 'public', 'test', 'postges_user', 'postgres_password');
WHERE int_id > maxIntID;

从生成的 ClickHouse 表中查询数据

SELECT * FROM postgresql_copy WHERE str IN ('test');
┌─float_nullable─┬─str──┬─int_id─┐
│           ᴺᵁᴸᴸ │ test │      1 │
└────────────────┴──────┴────────┘

使用非默认模式

postgres=# CREATE SCHEMA "nice.schema";

postgres=# CREATE TABLE "nice.schema"."nice.table" (a integer);

postgres=# INSERT INTO "nice.schema"."nice.table" SELECT i FROM generate_series(0, 99) as t(i)
CREATE TABLE pg_table_schema_with_dots (a UInt32)
        ENGINE PostgreSQL('localhost:5432', 'clickhouse', 'nice.table', 'postgrsql_user', 'password', 'nice.schema');

另请参阅