跳到主要内容
跳到主要内容

s3 表函数

提供一个类似表的接口,用于在 Amazon S3Google Cloud Storage 中读取/写入文件。此表函数类似于 hdfs 函数,但提供了特定于 S3 的功能。

如果集群中有多个副本,可以使用 s3Cluster 函数 来并行执行插入操作。

INSERT INTO...SELECT 中使用 s3 table function 时,数据以流式方式读取和插入。内存中只会保留少量数据块,同时这些数据块会持续从 S3 中读取并推送到目标表中。

语法

s3(url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,structure] [,compression_method],[,headers], [,partition_strategy], [,partition_columns_in_data_file])
s3(named_collection[, option=value [,..]])
GCS

S3 表函数通过使用 GCS XML API 和 HMAC 密钥与 Google Cloud Storage 集成。有关端点和 HMAC 的更多详细信息,请参阅 Google interoperability docs

对于 GCS,请在出现 access_key_idsecret_access_key 的位置替换为你的 HMAC key 和 HMAC secret。

参数

s3 表函数支持以下普通参数:

ParameterDescription
url带有文件路径的 Bucket URL。只读模式下支持以下通配符:***?{abc,def}{N..M},其中 NM 为数字,'abc''def' 为字符串。更多信息参见这里
NOSIGN如果在凭证位置提供此关键字,则所有请求都不会被签名。
access_key_id and secret_access_key指定与给定端点一起使用的凭证密钥。可选。
session_token与给定密钥一起使用的会话令牌。传递密钥时可选。
format文件的格式
structure表结构。格式为 'column1_name column1_type, column2_name column2_type, ...'
compression_method可选参数。支持的值:nonegzipgzbrotlibrxzLZMAzstdzst。默认情况下,将根据文件扩展名自动检测压缩方法。
headers可选参数。允许在 S3 请求中传递 HTTP 头。以 headers(key=value) 格式传递,例如:headers('x-amz-request-payer' = 'requester')
partition_strategy可选参数。支持的值:WILDCARDHIVEWILDCARD 要求路径中包含 {_partition_id},该值将被分区键替换。HIVE 不允许使用通配符,假定路径为表根目录,并生成 Hive 风格的分区目录,以 Snowflake ID 作为文件名、以文件格式作为扩展名。默认值为 WILDCARD
partition_columns_in_data_file可选参数。仅在 HIVE 分区策略下使用。用于告知 ClickHouse 是否需要在数据文件中写入分区列。默认值为 false
storage_class_name可选参数。支持的值:STANDARDINTELLIGENT_TIERING。允许指定 AWS S3 Intelligent Tiering。默认值为 STANDARD
GCS

GCS URL 使用如下格式,因为 Google XML API 的端点与 JSON API 不同:

  https://storage.googleapis.com/<bucket>/<folder>/<filename(s)>

而不是 https://storage.cloud.google.com

参数也可以通过命名集合传递。在这种情况下,urlaccess_key_idsecret_access_keyformatstructurecompression_method 的行为相同,同时还支持一些额外参数:

ArgumentDescription
filename如果指定,则会追加到 URL 末尾。
use_environment_credentials默认启用,允许通过环境变量 AWS_CONTAINER_CREDENTIALS_RELATIVE_URIAWS_CONTAINER_CREDENTIALS_FULL_URIAWS_CONTAINER_AUTHORIZATION_TOKENAWS_EC2_METADATA_DISABLED 传递额外参数。
no_sign_request默认禁用。
expiration_window_seconds默认值为 120。

返回值

一个具有指定结构的表,用于在指定文件中读写数据。

示例

从由 S3 文件 https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv 创建的表中选取前 5 行:

SELECT *
FROM s3(
   'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv',
   'CSVWithNames'
)
LIMIT 5;
┌───────日期─┬────开盘─┬────最高─┬─────最低─┬───收盘─┬───成交量─┬─未平仓合约─┐
│ 1984-09-07 │ 0.42388 │ 0.42902 │ 0.41874 │ 0.42388 │ 23220030 │       0 │
│ 1984-09-10 │ 0.42388 │ 0.42516 │ 0.41366 │ 0.42134 │ 18022532 │       0 │
│ 1984-09-11 │ 0.42516 │ 0.43668 │ 0.42516 │ 0.42902 │ 42498199 │       0 │
│ 1984-09-12 │ 0.42902 │ 0.43157 │ 0.41618 │ 0.41618 │ 37125801 │       0 │
│ 1984-09-13 │ 0.43927 │ 0.44052 │ 0.43927 │ 0.43927 │ 57822062 │       0 │
└────────────┴─────────┴─────────┴─────────┴─────────┴──────────┴─────────┘
注意

ClickHouse 使用文件扩展名来确定数据格式。比如,我们本可以在不带 CSVWithNames 的情况下运行之前的命令:

SELECT *
FROM s3(
   'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv'
)
LIMIT 5;

ClickHouse 也可以自动识别文件的压缩方式。比如,如果文件使用 .csv.gz 扩展名进行压缩,ClickHouse 会自动解压该文件。

注意

名称类似 *.parquet.snappy*.parquet.zstd 的 Parquet 文件可能会让 ClickHouse 产生混淆,并导致 TOO_LARGE_COMPRESSED_BLOCKZSTD_DECODER_FAILED 错误。 这是因为 ClickHouse 会尝试将整个文件当作 Snappy 或 ZSTD 编码的数据来读取,而实际上 Parquet 是在行组和列级别进行压缩的。

Parquet 元数据已经指定了每一列的压缩方式,因此文件扩展名是多余的。 此时只需使用 compression_method = 'none' 即可:

SELECT *
FROM s3(
  'https://<my-bucket>.s3.<my-region>.amazonaws.com/path/to/my-data.parquet.snappy',
  compression_format = 'none'
);

使用方法

假设在 S3 上有若干文件,其 URI 如下:

统计文件名以数字 1 到 3 结尾的这些文件中的行数:

SELECT count(*)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/my-test-bucket-768/{some,another}_prefix/some_file_{1..3}.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
┌─count()─┐
│      18 │
└─────────┘

计算这两个目录下所有文件的行总数:

SELECT count(*)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/my-test-bucket-768/{some,another}_prefix/*', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
┌─count()─┐
│      24 │
└─────────┘
提示

如果你的文件列表中包含带前导零的数字范围,请对每一位数字分别使用花括号的写法,或者使用 ?

统计名为 file-000.csvfile-001.csv、...、file-999.csv 这些文件中的总行数:

SELECT count(*)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/my-test-bucket-768/big_prefix/file-{000..999}.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32');
┌─count()─┐
│      12 │
└─────────┘

将数据写入文件 test-data.csv.gz

INSERT INTO FUNCTION s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
VALUES ('test-data', 1), ('test-data-2', 2);

从现有表中将数据导出到文件 test-data.csv.gz

INSERT INTO FUNCTION s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
SELECT name, value FROM existing_table;

Glob 模式 ** 可用于递归遍历目录。如下示例所示,它会递归获取 my-test-bucket-768 目录中的所有文件:

SELECT * FROM s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**', 'CSV', 'name String, value UInt32', 'gzip');

以下语句会递归地从 my-test-bucket 目录下任意子文件夹中的所有 test-data.csv.gz 文件获取数据:

SELECT * FROM s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');

注意:可以在服务器配置文件中指定自定义 URL 映射规则。例如:

SELECT * FROM s3('s3://clickhouse-public-datasets/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');

URL 's3://clickhouse-public-datasets/my-test-bucket-768/**/test-data.csv.gz' 会被替换为 'http://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz'

可以在 config.xml 中添加自定义映射规则(custom mapper):

<url_scheme_mappers>
   <s3>
      <to>https://{bucket}.s3.amazonaws.com</to>
   </s3>
   <gs>
      <to>https://{bucket}.storage.googleapis.com</to>
   </gs>
   <oss>
      <to>https://{bucket}.oss.aliyuncs.com</to>
   </oss>
</url_scheme_mappers>

在生产环境中,建议使用命名集合。示例如下:


CREATE NAMED COLLECTION creds AS
        access_key_id = '***',
        secret_access_key = '***';
SELECT count(*)
FROM s3(creds, url='https://s3-object-url.csv')

分区写入

分区策略

仅适用于 INSERT 语句。

WILDCARD(默认):将文件路径中的 {_partition_id} 通配符替换为实际的分区键。

HIVE 为读写实现 Hive 风格分区。它生成的文件采用以下格式:<prefix>/<key1=val1/key2=val2...>/<snowflakeid>.<toLower(file_format)>

HIVE 分区策略示例

INSERT INTO FUNCTION s3(s3_conn, filename='t_03363_function', format=Parquet, partition_strategy='hive') PARTITION BY (year, country) SELECT 2020 as year, 'Russia' as country, 1 as id;
SELECT _path, * FROM s3(s3_conn, filename='t_03363_function/**.parquet');

   ┌─_path──────────────────────────────────────────────────────────────────────┬─id─┬─country─┬─year─┐
1. │ test/t_03363_function/year=2020/country=Russia/7351295896279887872.parquet │  1 │ Russia  │ 2020 │
   └────────────────────────────────────────────────────────────────────────────┴────┴─────────┴──────┘

WILDCARD 分区策略示例

  1. 在键中使用分区 ID 作为一部分会创建独立的文件:
INSERT INTO TABLE FUNCTION
    s3('http://bucket.amazonaws.com/my_bucket/file_{_partition_id}.csv', 'CSV', 'a String, b UInt32, c UInt32')
    PARTITION BY a VALUES ('x', 2, 3), ('x', 4, 5), ('y', 11, 12), ('y', 13, 14), ('z', 21, 22), ('z', 23, 24);

因此,数据会被写入三个文件:file_x.csvfile_y.csvfile_z.csv

  1. 在 bucket 名称中使用分区 ID 会在不同的 bucket 中生成文件:
INSERT INTO TABLE FUNCTION
    s3('http://bucket.amazonaws.com/my_bucket_{_partition_id}/file.csv', 'CSV', 'a UInt32, b UInt32, c UInt32')
    PARTITION BY a VALUES (1, 2, 3), (1, 4, 5), (10, 11, 12), (10, 13, 14), (20, 21, 22), (20, 23, 24);

因此,数据会被写入到不同 bucket 下的三个文件:my_bucket_1/file.csvmy_bucket_10/file.csvmy_bucket_20/file.csv

访问公共 bucket

ClickHouse 会尝试从多种不同的来源获取凭证。 有时,在访问某些公共 bucket 时,这可能会引发问题,导致客户端返回 403 错误代码。 可以通过使用 NOSIGN 关键字来避免此问题,它会强制客户端忽略所有凭证,并且不对请求进行签名。

SELECT *
FROM s3(
   'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv',
   NOSIGN,
   'CSVWithNames'
)
LIMIT 5;

使用 S3 凭证(ClickHouse Cloud)

对于非公开的 bucket,用户可以向该函数传入 aws_access_key_idaws_secret_access_key。例如:

SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv', '<KEY>', '<SECRET>','TSVWithNames')

这适用于一次性访问,或凭证可以轻松轮换的场景。然而,不建议将其作为面向频繁访问或高敏感凭证场景的长期解决方案。在这种情况下,我们建议用户使用基于角色的访问控制。

ClickHouse Cloud 中关于 S3 的基于角色访问控制有专门文档说明,参见此处

配置完成后,可以通过 extra_credentials 参数将 roleARN 传递给 s3 函数。例如:

SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv','CSVWithNames',extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/ClickHouseAccessRole-001'))

更多示例请参见此处

使用归档文件

假设我们在 S3 上有几个归档文件,其 URI 如下:

可以使用 :: 从这些归档文件中提取数据。通配模式既可以用于 URL 部分,也可以用于 :: 之后的部分(用于匹配归档文件内部的文件名)。

SELECT *
FROM s3(
   'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-1{0..2}.csv.zip :: *.csv'
);
注意

ClickHouse 支持三种归档格式: ZIP TAR 7Z 虽然可以从任何受支持的存储位置访问 ZIP 和 TAR 归档文件,但 7Z 归档文件只能从安装了 ClickHouse 的本地文件系统读取。

插入数据

请注意,行只能插入到新文件中。不会进行合并循环或文件拆分操作。一旦文件写入完成,后续的插入操作将失败。更多详情请参见此处

虚拟列

  • _path — 文件路径。类型:LowCardinality(String)。对于归档文件,以如下格式显示路径:"{path_to_archive}::{path_to_file_inside_archive}"
  • _file — 文件名。类型:LowCardinality(String)。对于归档文件,显示归档内文件的名称。
  • _size — 文件大小(字节数)。类型:Nullable(UInt64)。如果文件大小未知,该值为 NULL。对于归档文件,显示归档内文件未压缩时的文件大小。
  • _time — 文件的最后修改时间。类型:Nullable(DateTime)。如果时间未知,该值为 NULL

use_hive_partitioning setting

这是一个提示配置,用于指示 ClickHouse 在读取时解析采用 Hive 风格分区的文件。它对写入没有任何影响。若要实现读写对称,请使用 partition_strategy 参数。

use_hive_partitioning 设置为 1 时,ClickHouse 会在路径中检测 Hive 风格的分区(/name=value/),并允许在查询中将分区列作为虚拟列使用。这些虚拟列的名称与分区路径中的名称相同,但会以下划线 _ 开头。

示例

SELECT * FROM s3('s3://data/path/date=*/country=*/code=*/*.parquet') WHERE date > '2020-01-01' AND country = 'Netherlands' AND code = 42;

访问 requester-pays 存储桶

要访问 requester-pays(请求者付费)存储桶,所有请求中都必须携带请求头 x-amz-request-payer = requester。可以通过向 S3 函数传递参数 headers('x-amz-request-payer' = 'requester') 来实现。例如:

SELECT
    count() AS num_rows,
    uniqExact(_file) AS num_files
FROM s3('https://coiled-datasets-rp.s3.us-east-1.amazonaws.com/1trc/measurements-100*.parquet', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', headers('x-amz-request-payer' = 'requester'))

┌───num_rows─┬─num_files─┐
│ 1110000000 │       111 │
└────────────┴───────────┘

返回 1 行。用时:3.089 秒。已处理 10.9 亿行,0.00 B(每秒 3.5355 亿行,0.00 B/秒)。
峰值内存使用量:192.27 KiB。

存储设置

嵌套 Avro 模式

当读取包含嵌套记录且不同文件之间嵌套结构不一致的 Avro 文件时(例如,某些文件在嵌套对象中多了一个字段),ClickHouse 可能会返回如下错误:

The number of leaves in record doesn't match the number of elements in tuple...

这是因为 ClickHouse 要求所有嵌套记录结构都符合相同的模式(schema)。
要处理这种情况,可以:

  • 使用 schema_inference_mode='union' 来合并不同的嵌套记录模式,或
  • 手动对齐嵌套结构并启用
    use_structure_from_insertion_table_in_table_functions=1
性能说明

在非常大的 S3 数据集上,schema_inference_mode='union' 可能会花费更长时间,因为它必须扫描每个文件来推断模式。

示例

INSERT INTO data_stage
SELECT
    id,
    data
FROM s3('https://bucket-name/*.avro', 'Avro')
SETTINGS schema_inference_mode='union';