Amazon Kinesis を ClickHouse Cloud と統合する
前提条件
事前に ClickPipes intro に目を通し、IAM credentials または IAM Role を設定しておいてください。ClickHouse Cloud で利用するロールの設定方法については、Kinesis Role-Based Access guide を参照してください。
最初の ClickPipe を作成する
- 使用している ClickHouse Cloud サービスの SQL Console にアクセスします。

- 左側メニューで
Data Sourcesボタンを選択し、「ClickPipe をセットアップ」をクリックします。

- データソースを選択します。

- ClickPipe の名前、説明(任意)、IAM ロールまたは認証情報、その他の接続詳細を入力してフォームを完成させます。

- Kinesis Stream と開始オフセットを選択します。UI には、選択したソース(Kafka トピックなど)からのサンプルドキュメントが表示されます。ClickPipe のパフォーマンスと安定性を向上させるために、Kinesis ストリームで Enhanced Fan-out を有効にすることもできます(Enhanced Fan-out の詳細はこちらを参照してください)。

- 次のステップでは、新しい ClickHouse テーブルにデータを取り込むか、既存のテーブルを再利用するかを選択できます。画面の指示に従って、テーブル名、スキーマ、および設定を変更します。上部のサンプルテーブルで、変更内容をリアルタイムにプレビューできます。

また、提供されているコントロールを使用して詳細設定をカスタマイズすることもできます。

- もしくは、既存の ClickHouse テーブルにデータを取り込むように設定することもできます。その場合、UI では、ソースのフィールドを、選択した宛先テーブル内の ClickHouse のフィールドにマッピングできます。

- 最後に、内部 ClickPipes ユーザーの権限を構成できます。
権限: ClickPipes は、宛先テーブルにデータを書き込むための専用ユーザーを作成します。この内部ユーザーに対して、カスタムロールまたはあらかじめ定義されたロールのいずれかを使用してロールを選択できます。
Full access: クラスタへのフルアクセス権を持ちます。宛先テーブルでマテリアライズドビューや Dictionary を使用する場合に便利です。Only destination table: 宛先テーブルに対するINSERT権限のみを持ちます。

- 「セットアップを完了」をクリックすると、システムは ClickPipe を登録し、サマリー テーブルに一覧表示されるようになります。


サマリー テーブルでは、ClickHouse 内のソースまたは宛先テーブルからサンプルデータを表示するためのコントロールが提供されます。

また、ClickPipe を削除したり、取り込みジョブのサマリーを表示したりするためのコントロールも提供されます。

- おめでとうございます! これで最初の ClickPipe のセットアップが完了しました。ストリーミング ClickPipe の場合は、リモートデータソースからリアルタイムでデータを継続的に取り込み続けます。そうでない場合は、バッチを取り込んだあとに完了します。
サポートされているデータ形式
サポートされている形式は以下のとおりです:
サポートされているデータ型
標準データ型のサポート
ClickPipes で現在サポートされている ClickHouse のデータ型は次のとおりです。
- 基本数値型 - [U]Int8/16/32/64、Float32/64、および BFloat16
- 大きな整数型 - [U]Int128/256
- Decimal 型
- Boolean
- String
- FixedString
- Date, Date32
- DateTime, DateTime64(UTC タイムゾーンのみ)
- Enum8/Enum16
- UUID
- IPv4
- IPv6
- すべての ClickHouse の LowCardinality 型
- 上記のいずれかの型(Nullable を含む)をキーおよび値に使用する Map 型
- 上記のいずれかの型(Nullable を含む、1 段階のネストのみ)を要素に使用する Tuple 型および Array 型
- SimpleAggregateFunction 型(AggregatingMergeTree または SummingMergeTree を宛先とする場合)
Variant 型のサポート
ソースデータストリーム内の任意の JSON フィールドに対して、Variant(String, Int64, DateTime) のような Variant 型を手動で指定できます。
ClickPipes が使用する正しい Variant のサブタイプを判定する仕組み上、Variant 定義内で使用できる整数型または日時型は 1 種類のみです。
たとえば、Variant(Int64, UInt32) はサポートされません。
JSON 型のサポート
常に JSON オブジェクトである JSON フィールドは、JSON 型の宛先カラムに割り当てることができます。固定パスやスキップされたパスを含め、目的の JSON 型に 宛先カラムを手動で変更する必要があります。
Kinesis 仮想カラム
Kinesis ストリームでは、次の仮想カラムがサポートされています。新しい宛先テーブルを作成する際、Add Column ボタンを使用して仮想カラムを追加できます。
| Name | Description | Recommended Data Type |
|---|---|---|
| _key | Kinesis パーティションキー | String |
| _timestamp | Kinesis おおよその到着タイムスタンプ(ミリ秒精度) | DateTime64(3) |
| _stream | Kinesis ストリーム名 | String |
| _sequence_number | Kinesis シーケンス番号 | String |
| _raw_message | Kinesis メッセージ全体 | String |
_raw_message フィールドは、Kinesis JSON レコード全体のみが必要な場合(ClickHouse の JsonExtract* 関数を使用して下流のマテリアライズドビューを作成する場合など)に使用できます。そのようなパイプでは、「非仮想」カラムをすべて削除することで ClickPipes のパフォーマンスが向上する場合があります。
制限事項
- DEFAULT はサポートされていません。
パフォーマンス
バッチ処理
ClickPipes はデータを ClickHouse にバッチ単位で挿入します。これは、データベース内に過度に多くのパーツが作成されてクラスターのパフォーマンス問題につながることを防ぐためです。
バッチは、次のいずれかの条件を満たしたときに挿入されます:
- バッチサイズが最大サイズ(レプリカメモリ 1GB あたり 100,000 行または 32MB)に達した場合
- バッチが開かれている時間が最大値(5 秒)に達した場合
レイテンシ
レイテンシ(Kinesis メッセージがストリームに送信されてから、そのメッセージが ClickHouse で利用可能になるまでの時間として定義)は、複数の要因(Kinesis のレイテンシ、ネットワークレイテンシ、メッセージサイズ/フォーマットなど)に依存します。上記セクションで説明した バッチ処理 もレイテンシに影響します。期待できるレイテンシを把握するために、必ずお客様のユースケースでテストすることを推奨します。
低レイテンシの要件がある場合は、お問い合わせください。
スケーリング
ClickPipes for Kinesis は、水平方向および垂直方向の両方にスケールするように設計されています。デフォルトでは、1 つのコンシューマを持つコンシューマグループを作成します。これは ClickPipe 作成時、またはそれ以降いつでも Settings -> Advanced Settings -> Scaling から設定できます。
ClickPipes は、アベイラビリティゾーンに分散したアーキテクチャにより高可用性を提供します。 そのためには、少なくとも 2 つのコンシューマへのスケーリングが必要です。
実行中のコンシューマ数にかかわらず、フォールトトレランスは設計上備わっています。 コンシューマまたはその基盤インフラストラクチャに障害が発生した場合でも、 ClickPipe はコンシューマを自動的に再起動し、メッセージ処理を継続します。
認証
Amazon Kinesis ストリームにアクセスするには、IAM 認証情報または IAM ロールを使用できます。IAM ロールの設定方法の詳細については、ClickHouse Cloud で利用できるロールの設定方法を説明したこのガイドを参照してください。