跳到主要内容
跳到主要内容

复制

该引擎基于 Atomic 引擎。它通过将 DDL 日志写入 ZooKeeper,并在给定数据库的所有副本上执行,支持元数据的复制。

一个 ClickHouse 服务器可以同时运行和更新多个复制数据库。但是,不能有同一个复制数据库的多个副本。

创建数据库

CREATE DATABASE testdb ENGINE = Replicated('zoo_path', 'shard_name', 'replica_name') [SETTINGS ...]

引擎参数

  • zoo_path — ZooKeeper 路径。同一个 ZooKeeper 路径对应同一个数据库。
  • shard_name — 分片名称。数据库副本通过 shard_name 分组为分片。
  • replica_name — 副本名称。对同一分片的所有副本,副本名称必须不同。

对于 ReplicatedMergeTree 表,如果未提供参数,则使用默认参数:/clickhouse/tables/{uuid}/{shard}{replica}。这些可以在服务器设置中的 default_replica_pathdefault_replica_name 中更改。宏 {uuid} 被展开为表的 uuid,{shard}{replica} 被展开为来自服务器配置的值,而不是来自数据库引擎参数。但是将来,将能够使用 Replicated 数据库的 shard_namereplica_name

特性和建议

Replicated 数据库的 DDL 查询工作方式与 ON CLUSTER 查询类似,但有一些细微的差别。

首先,DDL 请求尝试在发起者(最初从用户接收请求的主机)上执行。如果请求未被满足,用户会立即收到错误,其他主机不会尝试满足该请求。如果请求在发起者上成功完成,则所有其他主机会自动重试,直到它们完成该请求。发起者会尝试等待查询在其他主机上完成(最长不超过 distributed_ddl_task_timeout),并返回一个包含每个主机查询执行状态的表。

发生错误时的行为由 distributed_ddl_output_mode 设置进行调节,对于 Replicated 数据库,最好将其设置为 null_status_on_timeout — 即如果某些主机未能在 distributed_ddl_task_timeout 时间内执行请求,则不抛出异常,而是在表中显示它们的 NULL 状态。

system.clusters 系统表包含一个名称与复制数据库相同的集群,该集群由数据库的所有副本组成。此集群在创建/删除副本时会自动更新,可以用于 Distributed 表。

当创建数据库的新副本时,该副本会自行创建表。如果副本长时间不可用并且落后于复制日志——它会检查其本地元数据与 ZooKeeper 中的当前元数据, 将额外的包含数据的表移动到一个单独的非复制数据库(以免意外删除任何多余的内容),创建缺失的表,如果表名已被重命名,更新表名。数据是在 ReplicatedMergeTree 级别复制的,即如果表未被复制,则数据将不被复制(数据库仅对元数据负责)。

ALTER TABLE FREEZE|ATTACH|FETCH|DROP|DROP DETACHED|DETACH PARTITION|PART 查询被允许但不被复制。数据库引擎仅将分区/部分添加/提取/移除到当前副本中。然而,如果表本身使用了复制表引擎,则在使用 ATTACH 后数据会被复制。

如果您只需要配置一个集群而不维护表的复制,请参考 Cluster Discovery 功能。

使用示例

创建一个包含三个主机的集群:

node1 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','shard1','replica1');
node2 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','shard1','other_replica');
node3 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','other_shard','{replica}');

运行 DDL 查询:

CREATE TABLE r.rmt (n UInt64) ENGINE=ReplicatedMergeTree ORDER BY n;
┌─────hosts────────────┬──status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ shard1|replica1      │    0    │       │          2          │        0         │
│ shard1|other_replica │    0    │       │          1          │        0         │
│ other_shard|r1       │    0    │       │          0          │        0         │
└──────────────────────┴─────────┴───────┴─────────────────────┴──────────────────┘

显示系统表:

SELECT cluster, shard_num, replica_num, host_name, host_address, port, is_local
FROM system.clusters WHERE cluster='r';
┌─cluster─┬─shard_num─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┐
│ r       │     1     │      1      │   node3   │  127.0.0.1   │ 9002 │     0    │
│ r       │     2     │      1      │   node2   │  127.0.0.1   │ 9001 │     0    │
│ r       │     2     │      2      │   node1   │  127.0.0.1   │ 9000 │     1    │
└─────────┴───────────┴─────────────┴───────────┴──────────────┴──────┴──────────┘

创建一个分布式表并插入数据:

node2 :) CREATE TABLE r.d (n UInt64) ENGINE=Distributed('r','r','rmt', n % 2);
node3 :) INSERT INTO r.d SELECT * FROM numbers(10);
node1 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY host;
┌─hosts─┬─groupArray(n)─┐
│ node3 │  [1,3,5,7,9]  │
│ node2 │  [0,2,4,6,8]  │
└───────┴───────────────┘

在另一个主机上添加副本:

node4 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','other_shard','r2');

集群配置将如下所示:

┌─cluster─┬─shard_num─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┐
│ r       │     1     │      1      │   node3   │  127.0.0.1   │ 9002 │     0    │
│ r       │     1     │      2      │   node4   │  127.0.0.1   │ 9003 │     0    │
│ r       │     2     │      1      │   node2   │  127.0.0.1   │ 9001 │     0    │
│ r       │     2     │      2      │   node1   │  127.0.0.1   │ 9000 │     1    │
└─────────┴───────────┴─────────────┴───────────┴──────────────┴──────┴──────────┘

该分布式表也将从新主机接收数据:

node2 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY host;
┌─hosts─┬─groupArray(n)─┐
│ node2 │  [1,3,5,7,9]  │
│ node4 │  [0,2,4,6,8]  │
└───────┴───────────────┘