メインコンテンツまでスキップ
メインコンテンツまでスキップ

AvroConfluent

入力出力エイリアス

説明

Apache Avro は、効率的なデータ処理のためにバイナリエンコーディングを使用する行指向シリアル化フォーマットです。 AvroConfluent フォーマットは、Confluent Schema Registry(またはAPI互換性のあるサービス)を使用してシリアル化された単一オブジェクトのAvroエンコードされたKafkaメッセージのデコードをサポートします。

各Avroメッセージには ClickHouse が自動的に構成されたスキーマレジストリをクエリして解決するスキーマIDが埋め込まれています。一度解決されると、スキーマは最適なパフォーマンスのためにキャッシュされます。

データ型マッピング

以下の表は、Apache Avro形式がサポートするすべてのデータ型と、それに対応するClickHouseのdata typesにおけるINSERTおよびSELECTクエリの対応表です。

Avroデータ型 INSERTClickHouseデータ型Avroデータ型 SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytesまたはstring *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* bytesはデフォルトであり、output_format_avro_string_column_patternを設定することで管理されます。

** Variantタイプは、フィールド値としてnullを暗黙的に受け入れるため、例えばAvroのunion(T1, T2, null)Variant(T1, T2)に変換されます。その結果、ClickHouseからAvroを生成する際には、スキーマ推論中に実際にどの値がnullであるか分からないため、常にAvroのunion型集合にnullタイプを含める必要があります。

*** Avro論理タイプ

サポートされていないAvro論理データ型:

  • time-millis
  • time-micros
  • duration

フォーマット設定

設定説明デフォルト
input_format_avro_allow_missing_fieldsスキーマにフィールドが見つからない場合、エラーをスローする代わりにデフォルト値を使用するかどうか。0
input_format_avro_null_as_default非Nullableカラムに null 値を挿入する場合、エラーをスローする代わりにデフォルト値を使用するかどうか。0
format_avro_schema_registry_urlConfluent Schema Registry のURL。基本認証の場合、URLパスにURLエンコードされた資格情報を直接含めることができます。

スキーマレジストリの使用

Kafkaテーブルエンジンを使用してAvroエンコードされたKafkaトピックを読み取るには、format_avro_schema_registry_url 設定を使用してスキーマレジストリのURLを提供します。

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

基本認証の使用

スキーマレジストリが基本認証を必要とする場合(例:Confluent Cloudを使用している場合)、 format_avro_schema_registry_url 設定にURLエンコードされた資格情報を提供できます。

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

トラブルシューティング

Kafkaコンシューマーの取り込み進行状況を監視し、エラーをデバッグするには、system.kafka_consumers システムテーブルをクエリできます。デプロイメントに複数のレプリカがある場合(例:ClickHouse Cloud)、clusterAllReplicas テーブル関数を使用する必要があります。

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

スキーマ解決の問題が発生した場合は、kafkacatclickhouse-localを使用してトラブルシューティングできます:

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c