所有类型的 ClickPipes 都可以通过 ClickHouse Cloud OpenAPI 以编程方式创建、更新和删除。本页面介绍身份验证和可用的 ClickPipes 端点,并为每种 ClickPipe 类型提供 curl 请求示例。
身份验证
ClickHouse Cloud API 使用 HTTP Basic 身份验证。您需要一个对目标服务具有相应权限的 API 密钥 (key ID + key secret) 。有关如何创建 API 密钥,请参见管理 API 密钥。
在运行下面的任何示例之前,请先设置以下环境变量:
export KEY_ID=<your_key_id>
export KEY_SECRET=<your_key_secret>
export ORG_ID=<your_organization_id>
export SERVICE_ID=<your_service_id>
基本 URL
https://api.clickhouse.cloud/v1
所有 ClickPipes 端点都限定在某个 ClickHouse Cloud 服务范围内:
| Method | Path | Description |
|---|
GET | /organizations/{organizationId}/services/{serviceId}/clickpipes | 列出所有 ClickPipes |
POST | /organizations/{organizationId}/services/{serviceId}/clickpipes | 创建一个 ClickPipe |
GET | /organizations/{organizationId}/services/{serviceId}/clickpipes/{clickPipeId} | 获取一个 ClickPipe |
PATCH | /organizations/{organizationId}/services/{serviceId}/clickpipes/{clickPipeId} | 更新一个 ClickPipe |
DELETE | /organizations/{organizationId}/services/{serviceId}/clickpipes/{clickPipeId} | 删除一个 ClickPipe |
GET | /organizations/{organizationId}/services/{serviceId}/clickpipes/{clickPipeId}/settings | 获取 ClickPipe 设置 |
PUT | /organizations/{organizationId}/services/{serviceId}/clickpipes/{clickPipeId}/settings | 更新 ClickPipe 设置 |
PATCH | /organizations/{organizationId}/services/{serviceId}/clickpipes/{clickPipeId}/scaling | 更新 ClickPipe 扩缩容 |
PATCH | /organizations/{organizationId}/services/{serviceId}/clickpipes/{clickPipeId}/state | 更新 ClickPipe 状态 (启动/停止) |
对于 CDC ClickPipes (Postgres、MySQL、MongoDB) ,还提供了额外的组织级端点,用于共享 CDC 基础设施的扩缩容:
| Method | Path | Description |
|---|
GET | /organizations/{organizationId}/clickpipes/cdcScaling | 获取 CDC ClickPipes 扩缩容 |
PATCH | /organizations/{organizationId}/clickpipes/cdcScaling | 更新 CDC ClickPipes 扩缩容 |
有关每个端点的完整请求和响应 schema,请参阅 Swagger UI。
列出 ClickPipes
curl -u "$KEY_ID:$KEY_SECRET" \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes"
获取 ClickPipe
CLICKPIPE_ID=<your_clickpipe_id>
curl -u "$KEY_ID:$KEY_SECRET" \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes/$CLICKPIPE_ID"
停止或启动 ClickPipe
# Stop
curl -u "$KEY_ID:$KEY_SECRET" \
-X PATCH \
-H "Content-Type: application/json" \
-d '{"action": "stop"}' \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes/$CLICKPIPE_ID/state"
# Start
curl -u "$KEY_ID:$KEY_SECRET" \
-X PATCH \
-H "Content-Type: application/json" \
-d '{"action": "start"}' \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes/$CLICKPIPE_ID/state"
删除 ClickPipe
curl -u "$KEY_ID:$KEY_SECRET" \
-X DELETE \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes/$CLICKPIPE_ID"
创建 ClickPipes
POST /clickpipes 的请求体会因数据源类型不同而有所差异。以下示例展示了每种受支持的 ClickPipe 类型的结构。权威 JSON schema 请参阅 Swagger UI。
Kafka
支持的 Kafka 兼容数据源:kafka、confluent、msk、azureeventhub、redpanda、warpstream。
curl -u "$KEY_ID:$KEY_SECRET" \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "My Kafka ClickPipe",
"source": {
"kafka": {
"type": "confluent",
"format": "JSONEachRow",
"brokers": "broker.example.com:9092",
"topics": "my_topic",
"consumerGroup": "clickpipes-consumer-group",
"authentication": "PLAIN",
"credentials": {
"username": "my_user",
"password": "my_password"
}
}
},
"destination": {
"table": "my_table",
"managedTable": true,
"tableDefinition": {
"engine": { "type": "MergeTree" }
},
"columns": [
{ "name": "id", "type": "UInt64" },
{ "name": "message", "type": "String" },
{ "name": "timestamp", "type": "DateTime" }
]
}
}' \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes"
Amazon Kinesis
curl -u "$KEY_ID:$KEY_SECRET" \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "My Kinesis ClickPipe",
"source": {
"kinesis": {
"format": "JSONEachRow",
"streamName": "my-stream",
"region": "us-east-1",
"iteratorType": "TRIM_HORIZON",
"authentication": "IAM_USER",
"accessKey": {
"accessKeyId": "AKIAIOSFODNN7EXAMPLE",
"secretKey": "<secret_key>"
}
}
},
"destination": {
"table": "my_table",
"managedTable": true,
"tableDefinition": {
"engine": { "type": "MergeTree" }
},
"columns": [
{ "name": "id", "type": "UInt64" },
{ "name": "message", "type": "String" }
]
}
}' \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes"
Amazon S3
curl -u "$KEY_ID:$KEY_SECRET" \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "My S3 ClickPipe",
"source": {
"objectStorage": {
"type": "s3",
"url": "https://my-bucket.s3.amazonaws.com/data/*.json",
"format": "JSONEachRow",
"authentication": "IAM_USER",
"accessKey": {
"accessKeyId": "AKIAIOSFODNN7EXAMPLE",
"secretKey": "<secret_key>"
}
}
},
"destination": {
"table": "my_table",
"managedTable": true,
"tableDefinition": {
"engine": { "type": "MergeTree" }
},
"columns": [
{ "name": "id", "type": "UInt64" },
{ "name": "message", "type": "String" }
]
}
}' \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes"
Google Cloud Storage
serviceAccountKey 必须为 GCP 服务账户 JSON 密钥文件内容的 base64 编码。
curl -u "$KEY_ID:$KEY_SECRET" \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "My GCS ClickPipe",
"source": {
"objectStorage": {
"type": "gcs",
"url": "gs://my-bucket/data/*.json",
"format": "JSONEachRow",
"authentication": "SERVICE_ACCOUNT",
"serviceAccountKey": "<base64_encoded_service_account_json>"
}
},
"destination": {
"table": "my_table",
"managedTable": true,
"tableDefinition": {
"engine": { "type": "MergeTree" }
},
"columns": [
{ "name": "id", "type": "UInt64" },
{ "name": "message", "type": "String" }
]
}
}' \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes"
Azure Blob Storage
curl -u "$KEY_ID:$KEY_SECRET" \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "My Azure Blob ClickPipe",
"source": {
"objectStorage": {
"type": "azureblobstorage",
"azureContainerName": "my-container",
"path": "data/*.json",
"format": "JSONEachRow",
"authentication": "CONNECTION_STRING",
"connectionString": "DefaultEndpointsProtocol=https;AccountName=myaccount;AccountKey=mykey;EndpointSuffix=core.windows.net"
}
},
"destination": {
"table": "my_table",
"managedTable": true,
"tableDefinition": {
"engine": { "type": "MergeTree" }
},
"columns": [
{ "name": "id", "type": "UInt64" },
{ "name": "message", "type": "String" }
]
}
}' \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes"
Postgres CDC
curl -u "$KEY_ID:$KEY_SECRET" \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "My Postgres CDC ClickPipe",
"source": {
"postgres": {
"host": "postgres.example.com",
"port": 5432,
"database": "mydb",
"credentials": {
"username": "postgres_user",
"password": "<password>"
},
"settings": {
"replicationMode": "cdc"
},
"tableMappings": [
{
"sourceSchemaName": "public",
"sourceTable": "users",
"targetTable": "public_users"
}
]
}
},
"destination": {
"database": "default"
}
}' \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes"
MySQL CDC
curl -u "$KEY_ID:$KEY_SECRET" \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "My MySQL CDC ClickPipe",
"source": {
"mysql": {
"host": "mysql.example.com",
"port": 3306,
"credentials": {
"username": "mysql_user",
"password": "<password>"
},
"settings": {
"replicationMode": "cdc"
},
"tableMappings": [
{
"sourceSchemaName": "mydb",
"sourceTable": "orders",
"targetTable": "mydb_orders"
}
]
}
},
"destination": {
"database": "default"
}
}' \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes"
MongoDB CDC
curl -u "$KEY_ID:$KEY_SECRET" \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "My MongoDB CDC ClickPipe",
"source": {
"mongodb": {
"uri": "mongodb+srv://cluster0.example.mongodb.net",
"readPreference": "secondaryPreferred",
"credentials": {
"username": "mongo_user",
"password": "<password>"
},
"settings": {
"replicationMode": "cdc"
},
"tableMappings": [
{
"sourceDatabaseName": "mydb",
"sourceCollection": "users",
"targetTable": "mydb_users"
}
]
}
},
"destination": {
"database": "default"
}
}' \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes"
BigQuery
serviceAccountFile 必须填写为 GCP 服务账户 JSON 密钥文件内容经 base64 编码后的结果。
curl -u "$KEY_ID:$KEY_SECRET" \
-X POST \
-H "Content-Type: application/json" \
-d '{
"name": "My BigQuery ClickPipe",
"source": {
"bigquery": {
"snapshotStagingPath": "gs://my-staging-bucket/staging/",
"credentials": {
"serviceAccountFile": "<base64_encoded_service_account_json>"
},
"settings": {
"replicationMode": "snapshot"
},
"tableMappings": [
{
"sourceDatasetName": "my_dataset",
"sourceTable": "my_table",
"targetTable": "my_bigquery_table"
}
]
}
},
"destination": {
"database": "default"
}
}' \
"https://api.clickhouse.cloud/v1/organizations/$ORG_ID/services/$SERVICE_ID/clickpipes"