跳到主要内容
跳到主要内容

迁移数据

本文是从 PostgreSQL 迁移到 ClickHouse 指南的第一部分。通过一个实际示例,演示如何采用实时复制(CDC)方案高效完成迁移。文中涉及的许多概念同样适用于从 PostgreSQL 到 ClickHouse 的手动批量数据传输。

数据集

作为一个用于展示从 Postgres 迁移到 ClickHouse 的典型过程的示例数据集,我们使用了 此处 文档化的 Stack Overflow 数据集。该数据集包含从 2008 年到 2024 年 4 月期间在 Stack Overflow 上产生的每一条 postvoteusercommentbadge。该数据在 PostgreSQL 中的模式(schema)如下所示:

PostgreSQL Stack Overflow 模式

用于在 PostgreSQL 中创建这些表的 DDL 命令可在 此处 获取。

该模式虽然不一定是最优的,但利用了多个常用的 PostgreSQL 特性,包括主键、外键、分区以及索引。

我们将把这些概念逐一迁移到它们在 ClickHouse 中的对应实现。

对于希望将该数据集导入 PostgreSQL 实例以测试迁移步骤的用户,我们提供了包含 DDL 的 pg_dump 格式数据供下载,后续的数据加载命令如下所示:

# 用户
wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/users.sql.gz
gzip -d users.sql.gz
psql < users.sql

posts 表

wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/posts.sql.gz gzip -d posts.sql.gz psql < posts.sql

posthistory

wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/posthistory.sql.gz gzip -d posthistory.sql.gz psql < posthistory.sql

评论

wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/comments.sql.gz gzip -d comments.sql.gz psql < comments.sql

votes 表

wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/votes.sql.gz gzip -d votes.sql.gz psql < votes.sql

badges 徽章

wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/badges.sql.gz gzip -d badges.sql.gz psql < badges.sql

postlinks

wget https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/pdump/2024/postlinks.sql.gz gzip -d postlinks.sql.gz psql < postlinks.sql


虽然对 ClickHouse 而言此数据集规模较小,但对 Postgres 来说却相当庞大。上述内容为涵盖 2024 年前三个月的数据子集。

> 虽然我们的示例结果使用完整数据集来展示 Postgres 和 ClickHouse 之间的性能差异,但下文记录的所有步骤在功能上与使用较小子集完全相同。如需将完整数据集加载到 Postgres,请参阅[此处](https://pastila.nl/?00d47a08/1c5224c0b61beb480539f15ac375619d#XNj5vX3a7ZjkdiX7In8wqA==)。由于上述模式施加的外键约束,PostgreSQL 的完整数据集仅包含满足引用完整性的行。如有需要,可直接将无此类约束的 [Parquet 版本](/getting-started/example-datasets/stackoverflow)轻松加载到 ClickHouse 中。

迁移数据

实时复制(CDC)

请参阅此指南,为 PostgreSQL 配置 ClickPipes。该指南涵盖了多种不同类型的 Postgres 源实例。

使用基于 CDC 的方式(通过 ClickPipes 或 PeerDB),PostgreSQL 数据库中的每个表都会自动复制到 ClickHouse 中。

为了在接近实时的情况下处理更新和删除操作,ClickPipes 会将 Postgres 表映射为使用 ReplacingMergeTree 引擎的 ClickHouse 表,该引擎专门用于在 ClickHouse 中处理更新和删除。你可以在这里了解通过 ClickPipes 将数据复制到 ClickHouse 的更多信息。需要特别注意的是,使用 CDC 进行复制时,在同步更新或删除操作时会在 ClickHouse 中产生重复行。请参考这些处理方法,通过使用 FINAL 修饰符在 ClickHouse 中消除这些重复数据。

下面来看一下使用 ClickPipes 在 ClickHouse 中创建 users 表的方式。

CREATE TABLE users
(
    `id` Int32,
    `reputation` String,
    `creationdate` DateTime64(6),
    `displayname` String,
    `lastaccessdate` DateTime64(6),
    `aboutme` String,
    `views` Int32,
    `upvotes` Int32,
    `downvotes` Int32,
    `websiteurl` String,
    `location` String,
    `accountid` Int32,
    `_peerdb_synced_at` DateTime64(9) DEFAULT now64(),
    `_peerdb_is_deleted` Int8,
    `_peerdb_version` Int64
)
ENGINE = ReplacingMergeTree(_peerdb_version)
PRIMARY KEY id
ORDER BY id;

完成设置后,ClickPipes 会开始将 PostgreSQL 中的所有数据迁移到 ClickHouse。根据网络状况和部署规模,对于 Stack Overflow 数据集,这通常只需要几分钟。

手动批量加载与定期更新

采用手动方式时,可以通过以下方法完成数据集的初始批量加载:

  • 表函数(Table functions) - 在 ClickHouse 中使用 Postgres 表函数,从 Postgres 中执行 SELECT 并将结果 INSERT 到 ClickHouse 表中。适用于数百 GB 级别数据集的批量加载。
  • 导出(Exports) - 导出为诸如 CSV 或 SQL 脚本文件等中间格式。然后可以通过客户端使用 INSERT FROM INFILE 子句,或借助对象存储及其相关函数(例如 s3、gcs)将这些文件加载到 ClickHouse 中。

当从 PostgreSQL 手动加载数据时,需要先在 ClickHouse 中创建表。请参考这份同样使用 Stack Overflow 数据集、用于在 ClickHouse 中优化表结构的 数据建模文档

PostgreSQL 与 ClickHouse 之间的数据类型可能不同。要为每个表列确定对应的数据类型,可以将 DESCRIBE 命令与 Postgres 表函数 结合使用。以下命令用于查看 PostgreSQL 中 posts 表的结构,请根据你的环境进行修改:

DESCRIBE TABLE postgresql('<主机>:<端口>', 'postgres', 'posts', '<用户名>', '<密码>')
SETTINGS describe_compact_output = 1

关于 PostgreSQL 与 ClickHouse 之间数据类型映射的概览,请参阅附录文档

针对该 schema 优化数据类型的步骤,与从其他数据源(例如 S3 上的 Parquet)加载数据时完全相同。按照使用 Parquet 的替代指南中描述的流程操作,将得到如下 schema:

CREATE TABLE stackoverflow.posts
(
   `Id` Int32,
   `PostTypeId` Enum('Question' = 1, 'Answer' = 2, 'Wiki' = 3, 'TagWikiExcerpt' = 4, 'TagWiki' = 5, 'ModeratorNomination' = 6, 'WikiPlaceholder' = 7, 'PrivilegeWiki' = 8),
   `AcceptedAnswerId` UInt32,
   `CreationDate` DateTime,
   `Score` Int32,
   `ViewCount` UInt32,
   `Body` String,
   `OwnerUserId` Int32,
   `OwnerDisplayName` String,
   `LastEditorUserId` Int32,
   `LastEditorDisplayName` String,
   `LastEditDate` DateTime,
   `LastActivityDate` DateTime,
   `Title` String,
   `Tags` String,
   `AnswerCount` UInt16,
   `CommentCount` UInt8,
   `FavoriteCount` UInt8,
   `ContentLicense`LowCardinality(String),
   `ParentId` String,
   `CommunityOwnedDate` DateTime,
   `ClosedDate` DateTime
)
ENGINE = MergeTree
ORDER BY tuple()
COMMENT '已优化类型'

我们可以使用一个简单的 INSERT INTO SELECT 语句来填充数据,从 PostgreSQL 读取数据并插入到 ClickHouse 中:

INSERT INTO stackoverflow.posts SELECT * FROM postgresql('<host>:<port>', 'postgres', 'posts', '<username>', '<password>')
0 rows in set. Elapsed: 146.471 sec. Processed 59.82 million rows, 83.82 GB (408.40 thousand rows/s., 572.25 MB/s.)

增量加载同样可以被调度。如果 Postgres 表只接收插入,并且存在递增的 id 或时间戳,用户可以使用上述表函数方法来加载增量数据,即在 SELECT 中应用 WHERE 子句。若可以保证更新只会修改同一列,此方法也可用于支持更新。然而,若要支持删除,则需要对整张表进行完全重载;随着表规模的增大,这可能会变得难以实现。

我们使用 CreationDate 演示一次初始加载和一次增量加载(我们假设如果行被更新,此字段会随之更新)。

-- 初始加载
INSERT INTO stackoverflow.posts SELECT * FROM postgresql('<host>', 'postgres', 'posts', 'postgres', '<password')

INSERT INTO stackoverflow.posts SELECT * FROM postgresql('<host>', 'postgres', 'posts', 'postgres', '<password') WHERE CreationDate > ( SELECT (max(CreationDate) FROM stackoverflow.posts)

ClickHouse 会将简单的 WHERE 子句(例如 =, !=, >, >=, <, <= 和 IN)下推至 PostgreSQL 服务器。通过确保在用于标识变更集的列上建立索引,可以让增量加载更加高效。

在使用查询复制时,一种检测 UPDATE 操作的可行方法是使用 XMIN 系统列(事务 ID)作为水位线——该列的变化表明发生了变更,因此可以将其应用到目标表。采用此方法的用户应注意,XMIN 值可能会发生回绕,并且比较操作需要对整张表进行扫描,从而使变更跟踪变得更加复杂。

点击这里查看第 2 部分