メインコンテンツまでスキップ
メインコンテンツまでスキップ

Apache BeamとClickHouseの統合

Apache Beam は、バッチ処理とストリーム(継続的)データ処理パイプラインの両方を定義して実行できるオープンソースの統一プログラミングモデルです。Apache Beamの柔軟性は、ETL(抽出、変換、ロード)操作から複雑なイベント処理やリアルタイム分析まで、幅広いデータ処理シナリオをサポートできることにあります。この統合は、基盤となる挿入レイヤーのためにClickHouseの公式 JDBCコネクタ を利用します。

統合パッケージ

Apache BeamとClickHouseを統合するために必要な統合パッケージは、Apache Beam I/O Connectors の下で管理および開発されています。これには、多くの人気のあるデータストレージシステムやデータベースの統合バンドルが含まれます。 org.apache.beam.sdk.io.clickhouse.ClickHouseIO 実装は、Apache Beamレポジトリ内にあります。

Apache Beam ClickHouseパッケージのセットアップ

パッケージのインストール

次の依存関係をパッケージ管理フレームワークに追加します:

推奨Beamバージョン

ClickHouseIOコネクタは、Apache Beamバージョン2.59.0からの使用が推奨されます。それ以前のバージョンではコネクタの機能が完全にはサポートされない可能性があります。

アーティファクトは、公式Mavenリポジトリで見つけることができます。

コード例

以下の例は、input.csvというCSVファイルをPCollectionとして読み込み、定義されたスキーマを使用してRowオブジェクトに変換し、ClickHouseIOを使ってローカルのClickHouseインスタンスに挿入します:

サポートされているデータ型

ClickHouseApache Beamサポートされていますかノート
TableSchema.TypeName.FLOAT32Schema.TypeName#FLOAT
TableSchema.TypeName.FLOAT64Schema.TypeName#DOUBLE
TableSchema.TypeName.INT8Schema.TypeName#BYTE
TableSchema.TypeName.INT16Schema.TypeName#INT16
TableSchema.TypeName.INT32Schema.TypeName#INT32
TableSchema.TypeName.INT64Schema.TypeName#INT64
TableSchema.TypeName.STRINGSchema.TypeName#STRING
TableSchema.TypeName.UINT8Schema.TypeName#INT16
TableSchema.TypeName.UINT16Schema.TypeName#INT32
TableSchema.TypeName.UINT32Schema.TypeName#INT64
TableSchema.TypeName.UINT64Schema.TypeName#INT64
TableSchema.TypeName.DATESchema.TypeName#DATETIME
TableSchema.TypeName.DATETIMESchema.TypeName#DATETIME
TableSchema.TypeName.ARRAYSchema.TypeName#ARRAY
TableSchema.TypeName.ENUM8Schema.TypeName#STRING
TableSchema.TypeName.ENUM16Schema.TypeName#STRING
TableSchema.TypeName.BOOLSchema.TypeName#BOOLEAN
TableSchema.TypeName.TUPLESchema.TypeName#ROW
TableSchema.TypeName.FIXEDSTRINGFixedBytesFixedBytesは、org.apache.beam.sdk.schemas.logicaltypes内の固定長バイト配列を表すLogicalTypeです。
Schema.TypeName#DECIMAL
Schema.TypeName#MAP

ClickHouseIO.Writeパラメータ

以下のセッタ関数を使用してClickHouseIO.Writeの設定を調整できます:

パラメータセッタ関数引数の型デフォルト値説明
withMaxInsertBlockSize(long maxInsertBlockSize)1000000挿入する行のブロックの最大サイズ。
withMaxRetries(int maxRetries)5失敗した挿入の最大リトライ回数。
withMaxCumulativeBackoff(Duration maxBackoff)Duration.standardDays(1000)リトライの最大累積バックオフ期間。
withInitialBackoff(Duration initialBackoff)Duration.standardSeconds(5)最初のリトライ前の初期バックオフ期間。
withInsertDistributedSync(Boolean sync)truetrueの場合、分散テーブルの挿入操作を同期します。
withInsertQuorum(Long quorum)null挿入操作を確認するために必要なレプリカの数。
withInsertDeduplicate(Boolean deduplicate)truetrueの場合、挿入操作の重複排除が有効になります。
withTableSchema(TableSchema schema)null対象のClickHouseテーブルのスキーマ。

制限事項

コネクタを使用する際には、以下の制限事項を考慮してください:

  • 現時点で、Sink操作のみがサポートされています。コネクタはSource操作をサポートしていません。
  • ClickHouseは、ReplicatedMergeTreeReplicatedMergeTree上に構築されたDistributedテーブルに挿入する際に重複排除を行います。レプリケーションがない場合、通常のMergeTreeに挿入すると、挿入が失敗してから再試行が成功した場合に重複が発生する可能性があります。ただし、各ブロックは原子的に挿入され、ブロックサイズはClickHouseIO.Write.withMaxInsertBlockSize(long)を使用して設定できます。重複排除は、挿入したブロックのチェックサムを使用して達成されます。重複排除の詳細については、重複排除および挿入重複排除設定をご覧ください。
  • コネクタはDDLステートメントを実行しないため、挿入前にターゲットテーブルが存在する必要があります。