AvroConfluent
| 入力 | 出力 | エイリアス |
|---|---|---|
| ✔ | ✔ |
説明
Apache Avro は、効率的なデータ処理のためにバイナリエンコードを使用する行指向のシリアル化フォーマットです。AvroConfluent フォーマットは、Confluent スキーマレジストリ (またはその API 互換サービス) を使用して、Avro でエンコードされたメッセージの読み取りと書き込みをサポートします。
各メッセージは Confluent のワイヤ形式を使用します。つまり、マジックバイト (0x00) の後に 4 バイトのビッグエンディアンのスキーマ ID が続き、その後に Avro のバイナリデータが続きます。読み取り時には、ClickHouse はレジストリに対してクエリを実行してスキーマ ID を解決します。書き込み時には、ClickHouse は出力カラムから導出したスキーマを登録し、その結果得られた ID を各行の先頭に付加します。スキーマは、パフォーマンスを最適化するためにキャッシュされます。
データ型のマッピング
次の表は、Apache Avro 形式でサポートされているすべてのデータ型と、INSERT クエリおよび SELECT クエリにおける対応する ClickHouse のデータ型を示しています。
Avro データ型 INSERT | ClickHouse データ型 | Avro データ型 SELECT |
|---|---|---|
boolean, int, long, float, double | Int(8\16\32), UInt(8\16\32) | int |
boolean, int, long, float, double | Int64, UInt64 | long |
boolean, int, long, float, double | Float32 | float |
boolean, int, long, float, double | Float64 | double |
bytes, string, fixed, enum | String | bytes または string * |
bytes, string, fixed | FixedString(N) | fixed(N) |
enum | Enum(8\16) | enum |
array(T) | Array(T) | array(T) |
map(V, K) | Map(V, K) | map(string, K) |
union(null, T), union(T, null) | Nullable(T) | union(null, T) |
union(T1, T2, …) ** | Variant(T1, T2, …) | union(T1, T2, …) ** |
null | Nullable(Nothing) | null |
int (date) *** | Date, Date32 | int (date) *** |
long (timestamp-millis) *** | DateTime64(3) | long (timestamp-millis) *** |
long (timestamp-micros) *** | DateTime64(6) | long (timestamp-micros) *** |
bytes (decimal) *** | DateTime64(N) | bytes (decimal) *** |
int | IPv4 | int |
fixed(16) | IPv6 | fixed(16) |
bytes (decimal) *** | Decimal(P, S) | bytes (decimal) *** |
string (uuid) *** | UUID | string (uuid) *** |
fixed(16) | Int128/UInt128 | fixed(16) |
fixed(32) | Int256/UInt256 | fixed(32) |
record | Tuple | record |
* bytes がデフォルトであり、この挙動は設定 output_format_avro_string_column_pattern によって制御されます
** Variant type はフィールド値として暗黙的に null を受け入れるため、たとえば Avro の union(T1, T2, null) は Variant(T1, T2) に変換されます。
その結果、ClickHouse から Avro を生成する際には、スキーマ推論中に任意の値が実際に null かどうかを判断できないため、常に Avro の union 型の集合に null 型を含める必要があります。
サポートされていない Avro の論理データ型:
time-millistime-microsduration
フォーマット設定
| Setting | Description | Default |
|---|---|---|
input_format_avro_allow_missing_fields | スキーマ内にフィールドが見つからない場合にエラーを発生させる代わりに、デフォルト値を使用するかどうか。 | 0 |
input_format_avro_null_as_default | NULL を許容しないカラムに null 値を挿入する際にエラーを発生させる代わりに、デフォルト値を使用するかどうか。 | 0 |
format_avro_schema_registry_url | Confluent スキーマレジストリ の URL。Basic 認証を利用する場合は、URL エンコードした認証情報を URL のパス部分に直接含めることができます。 | |
format_avro_schema_registry_connection_timeout | スキーマレジストリ HTTP クライアントの接続タイムアウト (秒) 。スキーマの取得と登録の両方で使用されます。0 より大きく、600 (10 分) 未満である必要があります。 | 1 |
format_avro_schema_registry_send_timeout | スキーマレジストリ HTTP クライアントの送信タイムアウト (秒) 。0 より大きく、600 (10 分) 未満である必要があります。 | 1 |
format_avro_schema_registry_receive_timeout | スキーマレジストリ HTTP クライアントの受信タイムアウト (秒) 。0 より大きく、600 (10 分) 未満である必要があります。 | 1 |
output_format_avro_confluent_subject | 出力用: スキーマレジストリでスキーマを登録する subject 名。書き込み時に必須です。 | |
output_format_avro_string_column_pattern | 出力用: Avro string としてシリアライズする String カラムの正規表現 (デフォルトは bytes) 。 |
例
Kafka からの読み取り
Kafka table engine を使用して Avro でエンコードされた Kafka トピックを読み取るには、format_avro_schema_registry_url 設定を使用してスキーマレジストリの URL を指定します。
Kafka への書き込み
AvroConfluent メッセージを Kafka トピックに書き込むには、スキーマレジストリの URL とサブジェクト名の両方を設定します。最初の書き込み時に、スキーマは自動的にレジストリに登録されます。
Basic 認証の使用
スキーマレジストリが Basic 認証を必要とする場合(例:Confluent Cloud を使用している場合)、format_avro_schema_registry_url 設定に URL エンコード済みの認証情報を指定できます。
トラブルシューティング
インジェスト処理の進行状況を監視し、Kafka コンシューマーで発生するエラーをデバッグするには、system.kafka_consumers システムテーブルに対してクエリを実行できます。デプロイメントに複数のレプリカがある場合(例:ClickHouse Cloud)、clusterAllReplicas テーブル関数を使用する必要があります。
スキーマの解決で問題が発生した場合は、kafkacat と clickhouse-local を使用してトラブルシューティングできます。