メインコンテンツへスキップ
メインコンテンツへスキップ

AvroConfluent

入力出力エイリアス

説明

Apache Avro は、効率的なデータ処理のためにバイナリエンコードを使用する行指向のシリアル化フォーマットです。AvroConfluent フォーマットは、Confluent スキーマレジストリ (またはその API 互換サービス) を使用して、Avro でエンコードされたメッセージの読み取りと書き込みをサポートします。

各メッセージは Confluent のワイヤ形式を使用します。つまり、マジックバイト (0x00) の後に 4 バイトのビッグエンディアンのスキーマ ID が続き、その後に Avro のバイナリデータが続きます。読み取り時には、ClickHouse はレジストリに対してクエリを実行してスキーマ ID を解決します。書き込み時には、ClickHouse は出力カラムから導出したスキーマを登録し、その結果得られた ID を各行の先頭に付加します。スキーマは、パフォーマンスを最適化するためにキャッシュされます。

データ型のマッピング

次の表は、Apache Avro 形式でサポートされているすべてのデータ型と、INSERT クエリおよび SELECT クエリにおける対応する ClickHouse のデータ型を示しています。

Avro データ型 INSERTClickHouse データ型Avro データ型 SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytes または string *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* bytes がデフォルトであり、この挙動は設定 output_format_avro_string_column_pattern によって制御されます

** Variant type はフィールド値として暗黙的に null を受け入れるため、たとえば Avro の union(T1, T2, null)Variant(T1, T2) に変換されます。 その結果、ClickHouse から Avro を生成する際には、スキーマ推論中に任意の値が実際に null かどうかを判断できないため、常に Avro の union 型の集合に null 型を含める必要があります。

*** Avro logical types

サポートされていない Avro の論理データ型:

  • time-millis
  • time-micros
  • duration

フォーマット設定

SettingDescriptionDefault
input_format_avro_allow_missing_fieldsスキーマ内にフィールドが見つからない場合にエラーを発生させる代わりに、デフォルト値を使用するかどうか。0
input_format_avro_null_as_defaultNULL を許容しないカラムに null 値を挿入する際にエラーを発生させる代わりに、デフォルト値を使用するかどうか。0
format_avro_schema_registry_urlConfluent スキーマレジストリ の URL。Basic 認証を利用する場合は、URL エンコードした認証情報を URL のパス部分に直接含めることができます。
format_avro_schema_registry_connection_timeoutスキーマレジストリ HTTP クライアントの接続タイムアウト (秒) 。スキーマの取得と登録の両方で使用されます。0 より大きく、600 (10 分) 未満である必要があります。1
format_avro_schema_registry_send_timeoutスキーマレジストリ HTTP クライアントの送信タイムアウト (秒) 。0 より大きく、600 (10 分) 未満である必要があります。1
format_avro_schema_registry_receive_timeoutスキーマレジストリ HTTP クライアントの受信タイムアウト (秒) 。0 より大きく、600 (10 分) 未満である必要があります。1
output_format_avro_confluent_subject出力用: スキーマレジストリでスキーマを登録する subject 名。書き込み時に必須です。
output_format_avro_string_column_pattern出力用: Avro string としてシリアライズする String カラムの正規表現 (デフォルトは bytes) 。

Kafka からの読み取り

Kafka table engine を使用して Avro でエンコードされた Kafka トピックを読み取るには、format_avro_schema_registry_url 設定を使用してスキーマレジストリの URL を指定します。

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

Kafka への書き込み

AvroConfluent メッセージを Kafka トピックに書き込むには、スキーマレジストリの URL とサブジェクト名の両方を設定します。最初の書き込み時に、スキーマは自動的にレジストリに登録されます。

CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');

Basic 認証の使用

スキーマレジストリが Basic 認証を必要とする場合(例:Confluent Cloud を使用している場合)、format_avro_schema_registry_url 設定に URL エンコード済みの認証情報を指定できます。

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

トラブルシューティング

インジェスト処理の進行状況を監視し、Kafka コンシューマーで発生するエラーをデバッグするには、system.kafka_consumers システムテーブルに対してクエリを実行できます。デプロイメントに複数のレプリカがある場合(例:ClickHouse Cloud)、clusterAllReplicas テーブル関数を使用する必要があります。

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

スキーマの解決で問題が発生した場合は、kafkacatclickhouse-local を使用してトラブルシューティングできます。

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c