Skip to main content
Skip to main content
Edit this page

Configuring unordered mode for continuous ingestion

By default, the S3 ClickPipe assumes files are added to a bucket in lexicographical order. It's possible to configure an S3 ClickPipe to ingest files that don't have an implicit order by setting up an Amazon SQS queue connected to the bucket, optionally using Amazon EventBridge as an event router. This allows ClickPipes to listen for ObjectCreated:* events and ingest any new files regardless of the file naming convention.

Note

Unordered mode is only supported for Amazon S3 and is not supported for public buckets or S3-compatible services. It requires setting up an Amazon SQS queue connected to the bucket, optionally using Amazon EventBridge as an event router.

How it works

In this mode, the S3 ClickPipe does an initial load of all files in the selected path, and then listens for ObjectCreated:* events in the queue that match the specified path. Any message for a previously seen file, a file not matching the path, or an event of a different type will be ignored. Files are ingested once the threshold configured in max insert bytes or max file count is reached, or after a configurable interval (by default, 30 seconds). It is not possible to start ingestion from a specific file or point in time — ClickPipes will always load all files in the selected path.

Various types of failures can occur when ingesting data, which can result in partial inserts or duplicate data. Object Storage ClickPipes are resilient to insert failures and provide exactly-once semantics using temporary staging tables. Data is first inserted into a staging table; if something goes wrong, the staging table is truncated and the insert is retried from a clean state. Only once an insert completes successfully are the partitions moved to the target table.

Create an Amazon SQS queue

1. In the AWS Console, navigate to Simple Queue Service > Create queue. Use the defaults to create a new standard queue.

Tip

We strongly recommend configuring a Dead-Letter-Queue (DLQ) for the SQS queue, so it's easier to debug and retry failed messages. If a DLQ is configured, failed messages will be reenqueued and reprocessed up to the number of times configured in the DLQ maxReceiveCount parameter.

2. Connect your S3 bucket to the SQS queue using one of the two options below. EventBridge is recommended for most use cases because it supports fan-out, more flexible event filtering, and is not subject to the S3 restriction of one notification rule per event type, per prefix.

a. In the S3 bucket properties, navigate to Event notifications > Amazon EventBridge and enable sending notifications to EventBridge. Click Save changes.

b. In the AWS Console, navigate to Amazon EventBridge > Rules > Create rule. Name the rule (e.g. S3ObjectCreated), choose the default event bus, and click Next. On the Build event pattern step, select AWS events or EventBridge partner events as the event source, then enter the following event pattern manually, replacing <bucket-name> with your bucket name:

{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["<bucket-name>"]
    }
  }
}

Optionally, add an object.key condition to the pattern to filter by prefix or suffix. If you do, make sure it matches the path set for the ClickPipe.

c. On the Select target(s) step, choose AWS service as the target type and select SQS queue. Pick the queue created in the previous step. Leave Use execution role (recommended) checked to let EventBridge auto-create the required IAM role, then click Next and complete the wizard.

d. Edit the SQS queue access policy to allow EventBridge to send messages to it. Replace <sqs-queue-arn> and <eventbridge-rule-arn> with the appropriate values:

{
  "Version": "2012-10-17",
  "Id": "example-ID",
  "Statement": [
    {
      "Sid": "AllowEventBridgeToSendMessage",
      "Effect": "Allow",
      "Principal": {
        "Service": "events.amazonaws.com"
      },
      "Action": "SQS:SendMessage",
      "Resource": "<sqs-queue-arn>",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "<eventbridge-rule-arn>"
        }
      }
    }
  ]
}

Configure an IAM role

1. In the ClickHouse Cloud console, navigate to Settings > Network security information and copy the IAM role ARN for your service.

2. In the AWS Console, navigate to IAM > Roles > Create role. Choose Custom trust policy and paste in the following, replacing <ch-cloud-arn> with the IAM role ARN copied in the previous step:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowAssumeRole",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<ch-cloud-arn>"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

3. Create an inline policy for the IAM role with the required permissions to read objects from S3 and manage messages in the SQS queue. Replace <bucket-arn> and <sqs-queue-arn> with the appropriate values:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "S3BucketMetadataAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketLocation",
        "s3:ListBucket"
      ],
      "Resource": "<bucket-arn>"
    },
    {
      "Sid": "AllowGetListObjects",
      "Effect": "Allow",
      "Action": [
        "s3:Get*",
        "s3:List*"
      ],
      "Resource": "<bucket-arn>/*"
    },
    {
      "Sid": "SQSNotificationsAccess",
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ListQueues",
        "sqs:ReceiveMessage",
        "sqs:GetQueueAttributes"
      ],
      "Resource": "<sqs-queue-arn>"
    }
  ]
}

Create a ClickPipe with unordered mode

1. In the ClickHouse Cloud console, navigate to Data Sources > Create ClickPipe and select Amazon S3. Enter the details to connect to your S3 bucket. Under Authentication method, choose IAM role and provide the ARN of the role you created in the previous step.

2. Under Incoming data, toggle on Continuous ingestion. Select Any order as the ingestion mode and provide the SQS queue URL for the queue connected to your bucket.

3. Under Parse information, define a Sorting key for the target table. Make any necessary adjustments to the mapped schema, then configure a role for the ClickPipes database user.

4. Review the configuration and click Create ClickPipe. ClickPipes will perform an initial scan of your bucket to load all existing files that match the specified path, and will then begin processing files as new ObjectCreated:* events arrive in the queue.