跳转到主内容
跳转到主内容

AvroConfluent

输入输出别名

描述

Apache Avro 是一种面向行的序列化格式,使用二进制编码以实现高效的数据处理。AvroConfluent 格式支持使用 Confluent Schema Registry (或 API 兼容服务) 读取和写入采用 Avro 编码的消息。

每条消息都使用 Confluent 传输格式:一个魔数字节 (0x00) ,后跟 4 字节的大端 schema ID,再后跟 Avro 二进制数据。在读取时,ClickHouse 会通过查询 Schema Registry 解析 schema ID。在写入时,ClickHouse 会注册根据输出列派生的 schema,并将生成的 ID 预加到每一行之前。schema 会被缓存,以获得最佳性能。

数据类型映射

下表展示了 Apache Avro 格式支持的所有数据类型,以及它们在 INSERTSELECT 查询中对应的 ClickHouse 数据类型

Avro 数据类型 INSERTClickHouse 数据类型Avro 数据类型 SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytesstring *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* 默认值为 bytes,此行为由设置 output_format_avro_string_column_pattern 控制

** Variant 类型 会隐式接受 null 作为字段值,因此,例如 Avro 的 union(T1, T2, null) 会被转换为 Variant(T1, T2)。 因此,当从 ClickHouse 生成 Avro 时,我们必须始终在 Avro 的 union 类型集合中包含 null 类型,因为在模式推断期间我们无法得知是否有任何值实际为 null

*** Avro 逻辑类型

不支持的 Avro 逻辑数据类型:

  • time-millis
  • time-micros
  • duration

格式设置

SettingDescriptionDefault
input_format_avro_allow_missing_fields指定在模式中找不到字段时,是否使用默认值而不是抛出错误。0
input_format_avro_null_as_default指定在向非空列插入 null 值时,是否使用默认值而不是抛出错误。0
format_avro_schema_registry_urlConfluent Schema Registry 的 URL。对于基本身份验证,可以在 URL 中直接包含经过 URL 编码的凭据。
format_avro_schema_registry_connection_timeoutSchema Registry HTTP 客户端的连接超时时间 (秒) (用于模式拉取和注册) 。必须大于 0 且小于 600 (10 分钟) 。1
format_avro_schema_registry_send_timeoutSchema Registry HTTP 客户端的发送超时时间 (秒) 。必须大于 0 且小于 600 (10 分钟) 。1
format_avro_schema_registry_receive_timeoutSchema Registry HTTP 客户端的接收超时时间 (秒) 。必须大于 0 且小于 600 (10 分钟) 。1
output_format_avro_confluent_subject用于输出:schema 在 Schema Registry 中注册时使用的 subject 名称。写入时必需。
output_format_avro_string_column_pattern用于输出:要序列化为 Avro string 的 String 列的正则表达式 (默认为 bytes) 。

示例

从 Kafka 读取

要使用 Kafka 表引擎 读取使用 Avro 编码的 topic,请通过 format_avro_schema_registry_url 设置指定 schema registry 的 URL。

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

写入 Kafka

要将 AvroConfluent 消息写入 Kafka topic,请同时设置 Schema Registry 的 URL 和 subject 名称。首次写入时,schema 会自动注册到 registry 中。

CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');

使用基本身份验证

如果 schema registry 需要基本身份验证(例如使用 Confluent Cloud 时),可以在 format_avro_schema_registry_url 设置中提供经过 URL 编码的凭证。

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

故障排查

要监控摄取进度并调试 Kafka 消费者的错误,可以查询 system.kafka_consumers 系统表。如果您的部署有多个副本(例如 ClickHouse Cloud),则必须使用 clusterAllReplicas 表函数。

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

如果遇到 schema 解析相关问题,可以使用 kafkacat 搭配 clickhouse-local 进行排查:

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c