Перейти к основному содержанию
Перейти к основному содержанию

AvroConfluent

ВводВыводПсевдоним

Описание

Apache Avro — это строчно-ориентированный формат сериализации, который использует двоичное кодирование для эффективной обработки данных. Формат AvroConfluent поддерживает декодирование отдельных объектов — сообщений Kafka, закодированных в Avro и сериализованных с использованием Confluent Schema Registry (или API-совместимых сервисов).

Каждое Avro-сообщение содержит идентификатор схемы, по которому ClickHouse автоматически получает схему, обращаясь к настроенному реестру схем. После этого схемы кэшируются для оптимальной производительности.

Сопоставление типов данных

В таблице ниже приведены все типы данных, поддерживаемые форматом Apache Avro, и их соответствующие типы данных ClickHouse в запросах INSERT и SELECT.

Тип данных Avro INSERTТип данных ClickHouseТип данных Avro SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytes или string *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* По умолчанию используется bytes; поведение определяется настройкой output_format_avro_string_column_pattern

** Тип Variant неявно допускает null в качестве значения поля, поэтому, например, Avro union(T1, T2, null) будет преобразован в Variant(T1, T2). В результате при формировании Avro из ClickHouse мы всегда должны включать тип null в набор типов Avro union, так как при выводе схемы мы не знаем, является ли какое-либо значение фактически null.

*** Логические типы Avro

Неподдерживаемые логические типы данных Avro:

  • time-millis
  • time-micros
  • duration

Настройки формата

ПараметрОписаниеЗначение по умолчанию
input_format_avro_allow_missing_fieldsИспользовать ли значение по умолчанию вместо возникновения ошибки, когда поле не найдено в схеме.0
input_format_avro_null_as_defaultИспользовать ли значение по умолчанию вместо возникновения ошибки при вставке значения null в столбец, не допускающий значения null.0
format_avro_schema_registry_urlURL Confluent Schema Registry. Для базовой аутентификации URL-кодированные учетные данные могут быть включены непосредственно в путь URL.

Примеры

Использование реестра схем

Чтобы прочитать Kafka-топик, закодированный в Avro, с помощью движка таблиц Kafka, используйте настройку format_avro_schema_registry_url для указания URL-адреса реестра схем.

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

Использование базовой аутентификации

Если для вашего реестра схем требуется базовая аутентификация (например, при использовании Confluent Cloud), вы можете указать URL-кодированные учетные данные в настройке format_avro_schema_registry_url.

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

Диагностика неполадок

Чтобы отслеживать ход ингестии и отлаживать ошибки потребителя Kafka, вы можете выполнить запрос к системной таблице system.kafka_consumers. Если в вашем развертывании несколько реплик (например, ClickHouse Cloud), необходимо использовать табличную функцию clusterAllReplicas.

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

Если вы столкнулись с проблемами с разрешением схемы, вы можете использовать kafkacat вместе с clickhouse-local для диагностики:

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c