跳到主要内容
跳到主要内容

AvroConfluent

输入输出别名

描述

Apache Avro 是一种面向行的序列化格式,使用二进制编码以实现高效的数据处理。 AvroConfluent 格式支持解码使用 Confluent Schema Registry(或 API 兼容服务)序列化的单对象、Avro 编码的 Kafka 消息。

每个 Avro 消息嵌入一个模式 ID,ClickHouse 通过查询配置的模式注册表自动解析。解析后,模式会被缓存以优化性能。

数据类型映射

以下表格展示了 Apache Avro 格式所支持的所有数据类型,以及它们在 INSERTSELECT 查询中对应的 ClickHouse 数据类型

Avro 数据类型 INSERTClickHouse 数据类型Avro 数据类型 SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytesstring *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* bytes 是默认设置,由 output_format_avro_string_column_pattern 控制

** Variant 类型 隐式接受 null 作为字段值,因此例如 Avro 的 union(T1, T2, null) 将被转换为 Variant(T1, T2)。因此,当从 ClickHouse 生成 Avro 时,我们必须始终将 null 类型包含到 Avro union 类型集中,因为在模式推断期间,我们不知道任何值是否实际上为 null

*** Avro 逻辑类型

不支持的 Avro 逻辑数据类型:

  • time-millis
  • time-micros
  • duration

格式设置

设置描述默认
input_format_avro_allow_missing_fields当模式中未找到字段时,是否使用默认值而不是抛出错误。0
input_format_avro_null_as_default当向非可空列插入 null 值时,是否使用默认值而不是抛出错误。0
format_avro_schema_registry_urlConfluent Schema Registry 的 URL。对于基本身份验证,URL 编码的凭据可以直接包含在 URL 路径中。

示例

使用模式注册表

要使用 Kafka 表引擎 读取 Avro 编码的 Kafka 主题,请使用 format_avro_schema_registry_url 设置提供模式注册表的 URL。

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

使用基本身份验证

如果您的模式注册表需要基本身份验证(例如,如果您使用的是 Confluent Cloud),您可以在 format_avro_schema_registry_url 设置中提供 URL 编码的凭据。

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

故障排除

要监控数据摄取进度并调试 Kafka 消费者的错误,您可以查询 system.kafka_consumers 系统表。如果您的部署有多个副本(例如,ClickHouse Cloud),您必须使用 clusterAllReplicas 表函数。

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

如果遇到模式解析问题,可以使用 kafkacatclickhouse-local 进行故障排除:

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c