メインコンテンツへスキップ
メインコンテンツへスキップ

Amazon MSK と ClickHouse の連携

注意: 動画で示されているポリシーは、クイックスタート用の許可範囲が広い設定であり、短時間での導入のみを意図したものです。以下の最小権限の IAM ガイダンスを参照してください。

前提条件

次のことを前提とします:

ClickHouse 公式 Kafka コネクタと Amazon MSK の連携

接続情報を確認する

HTTP(S) で ClickHouse に接続するには、次の情報が必要です。

Parameter(s)Description
HOST and PORT通常、TLS を使用する場合のポートは 8443、TLS を使用しない場合のポートは 8123 です。
DATABASE NAME既定で default という名前のデータベースが用意されています。接続したいデータベースの名前を使用してください。
USERNAME and PASSWORD既定のユーザー名は default です。用途に応じて適切なユーザー名を使用してください。

ClickHouse Cloud サービスに関する詳細情報は、ClickHouse Cloud コンソールで確認できます。 サービスを選択し、Connect をクリックします。

ClickHouse Cloud サービスの Connect ボタン

HTTPS を選択します。接続情報は、サンプルの curl コマンド内に表示されます。

ClickHouse Cloud HTTPS 接続詳細

セルフマネージドの ClickHouse を使用している場合、接続情報は ClickHouse 管理者によって設定されます。

手順

  1. ClickHouse Connector Sink に目を通しておく。
  2. MSK インスタンスを作成する
  3. IAM ロールを作成して割り当てる
  4. ClickHouse Connect Sink の リリースページ から jar ファイルをダウンロードする。
  5. ダウンロードした jar ファイルを、Amazon MSK コンソールの カスタムプラグインページ にインストールする。
  6. コネクタがパブリックな ClickHouse インスタンスと通信する場合は、インターネットアクセスを有効化する
  7. 設定に、トピック名、ClickHouse インスタンスのホスト名、およびパスワードを指定する。
connector.class=com.clickhouse.kafka.connect.ClickHouseSinkConnector
tasks.max=1
topics=<topic_name>
ssl=true
security.protocol=SSL
hostname=<hostname>
database=<database_name>
password=<password>
ssl.truststore.location=/tmp/kafka.client.truststore.jks
port=8443
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
exactlyOnce=true
username=default
schemas.enable=false

推奨される IAM 権限(最小特権)

セットアップに必要な最小限の権限だけを使用してください。まずは以下のベースラインを基準にし、実際に利用する場合にのみオプションのサービスを追加します。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "MSKClusterAccess",
      "Effect": "Allow",
      "Action": [
        "kafka:DescribeCluster",
        "kafka:GetBootstrapBrokers",
        "kafka:DescribeClusterV2",
        "kafka:ListClusters",
        "kafka:ListClustersV2"
      ],
      "Resource": "*"
    },
    {
      "Sid": "KafkaAuthorization",
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:Connect",
        "kafka-cluster:DescribeCluster",
        "kafka-cluster:DescribeGroup",
        "kafka-cluster:DescribeTopic",
        "kafka-cluster:ReadData"
      ],
      "Resource": "*"
    },
    {
      "Sid": "OptionalGlueSchemaRegistry",
      "Effect": "Allow",
      "Action": [
        "glue:GetSchema*",
        "glue:ListSchemas",
        "glue:ListSchemaVersions"
      ],
      "Resource": "*"
    },
    {
      "Sid": "OptionalSecretsManager",
      "Effect": "Allow",
      "Action": [
        "secretsmanager:GetSecretValue"
      ],
      "Resource": [
        "arn:aws:secretsmanager:<region>:<account-id>:secret:<your-secret-name>*"
      ]
    },
    {
      "Sid": "OptionalS3Read",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject"
      ],
      "Resource": "arn:aws:s3:::<your-bucket>/<optional-prefix>/*"
    }
  ]
}
  • AWS Glue Schema Registry を使用する場合にのみ、Glue ブロックを使用してください。
  • Secrets Manager から認証情報やトラストストアを取得する場合にのみ、Secrets Manager ブロックを使用してください。ARN のスコープを適切に絞ってください。
  • S3 からアーティファクト(例: truststore)を読み込む場合にのみ、S3 ブロックを使用してください。対象のバケット/プレフィックスにスコープを絞ってください。

あわせて参照してください: Kafka ベストプラクティス – IAM.

パフォーマンスチューニング

パフォーマンスを向上させる一つの方法は、worker の設定に次の項目を追加し、Kafka から取得するバッチサイズとレコード数を調整することです。

consumer.max.poll.records=[NUMBER OF RECORDS]
consumer.max.partition.fetch.bytes=[NUMBER OF RECORDS * RECORD SIZE IN BYTES]

使用する値の具体的な設定は、必要なレコード数やレコードサイズによって異なります。たとえば、デフォルト値は次のとおりです。

consumer.max.poll.records=500
consumer.max.partition.fetch.bytes=1048576

実装面およびその他の考慮事項の詳細については、Kafka および Amazon MSK の公式ドキュメントを参照してください。

MSK Connect のネットワーキングに関する注意事項

MSK Connect から ClickHouse に接続できるようにするには、MSK クラスターをプライベートサブネット内に配置し、インターネットアクセス用にプライベート NAT ゲートウェイを接続することを推奨します。設定手順は以下のとおりです。パブリックサブネットもサポートされていますが、ENI に Elastic IP アドレスを継続的に割り当てる必要があるため推奨されません。詳細は AWS のドキュメントを参照してください

  1. プライベートサブネットを作成する: VPC 内に新しいサブネットを作成し、それをプライベートサブネットとして指定します。このサブネットはインターネットへ直接アクセスできないようにします。
  2. NAT ゲートウェイを作成する: VPC のパブリックサブネット内に NAT ゲートウェイを作成します。NAT ゲートウェイにより、プライベートサブネット内のインスタンスがインターネットや他の AWS サービスへ接続できる一方で、インターネット側からそれらのインスタンスへの接続開始は防止されます。
  3. ルートテーブルを更新する: インターネット向けトラフィックを NAT ゲートウェイに転送するルートを追加します。
  4. セキュリティグループおよびネットワーク ACL の設定を確認する: 関連するトラフィックを許可するように セキュリティグループ および ネットワーク ACL (Access Control Lists) を設定します。
    1. MSK Connect ワーカー ENI から、TLS ポート(一般的には 9094)の MSK ブローカーへのトラフィック。
    2. MSK Connect ワーカー ENI から ClickHouse エンドポイントへのトラフィック: 9440(ネイティブ TLS)または 8443(HTTPS)。
    3. ブローカーのセキュリティグループで、MSK Connect ワーカーのセキュリティグループからのインバウンドを許可します。
    4. セルフホストの ClickHouse の場合は、サーバーで設定しているポート(デフォルトでは HTTP 用に 8123)を開放します。
  5. セキュリティグループを MSK にアタッチする: これらのセキュリティグループが MSK クラスターおよび MSK Connect ワーカーにアタッチされていることを確認します。
  6. ClickHouse Cloud への接続:
    1. パブリックエンドポイント + IP 許可リスト方式: プライベートサブネットから NAT 経由での送信(アウトバウンド)トラフィックが必要です。
    2. 利用可能な場合のプライベート接続(例: VPC ピアリング / PrivateLink / VPN)。VPC の DNS ホスト名/名前解決が有効化されており、DNS がプライベートエンドポイントを解決できることを確認します。
  7. 接続検証(簡易チェックリスト):
    1. コネクターの実行環境から MSK のブートストラップ DNS を解決し、ブローカーポートへ TLS で接続できること。
    2. ClickHouse の 9440 ポート(または HTTPS 用の 8443)へ TLS 接続を確立できること。
    3. AWS のサービス(Glue / Secrets Manager)を使用する場合、それらのエンドポイントへの送信(アウトバウンド)トラフィックが許可されていること。