跳到主要内容
跳到主要内容

s3 表函数

提供了一个类表接口,以从 Amazon S3Google Cloud Storage 选择/插入文件。这个表函数类似于 hdfs 函数,但提供特定于 S3 的功能。

如果您的集群中有多个副本,可以使用 s3Cluster 函数 来并行插入。

使用 s3 表函数INSERT INTO...SELECT 时,数据以流式方式读取和插入。在从 S3 持续读取块并推送到目标表时,仅有少量数据块驻留在内存中。

语法

s3(url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,structure] [,compression_method],[,headers], [,partition_strategy], [,partition_columns_in_data_file])
s3(named_collection[, option=value [,..]])
GCS

S3 表函数通过使用 GCS XML API 和 HMAC 密钥与 Google Cloud Storage 集成。有关端点和 HMAC 的更多详细信息,请参见 Google 互操作性文档

对于 GCS,在 access_key_idsecret_access_key 的地方替换为您的 HMAC 密钥和 HMAC 密码。

参数

s3 表函数支持以下普通参数:

参数描述
url包含文件路径的桶 url。在只读模式下支持以下通配符:***?{abc,def}{N..M},其中 NM 是数字,'abc''def' 是字符串。有关更多信息,请见 这里
NOSIGN如果在凭据位置提供此关键字,则所有请求将不会被签名。
access_key_idsecret_access_key指定用于给定端点的凭据的密钥。可选。
session_token与给定密钥一起使用的会话令牌。传递密钥时可选。
format文件的 格式
structure表的结构。格式为 'column1_name column1_type, column2_name column2_type, ...'
compression_method参数为可选。支持的值:nonegzipgzbrotlibrxzLZMAzstdzst。默认情况下,它将根据文件扩展名自动检测压缩方法。
headers参数为可选。允许在 S3 请求中传递头部。以 headers(key=value) 格式传递,例如 headers('x-amz-request-payer' = 'requester')
partition_strategy参数为可选。支持的值:WILDCARDHIVEWILDCARD 需要路径中有一个 {_partition_id},该值将替换为分区密钥。HIVE 不允许通配符,假设路径是表根,并生成带有 Snowflake ID 作为文件名和文件格式作为扩展名的 Hive 风格分区目录。默认为 WILDCARD
partition_columns_in_data_file参数为可选。仅与 HIVE 分区策略一起使用。告诉 ClickHouse 是否期望在数据文件中写入分区列。默认为 false
storage_class_name参数为可选。支持的值:STANDARDINTELLIGENT_TIERING。允许指定 AWS S3 Intelligent Tiering。默认为 STANDARD
GCS

GCS url 的格式为因为 Google XML API 的端点不同于 JSON API:

https://storage.googleapis.com/<bucket>/<folder>/<filename(s)>

而不是 ~~https://storage.cloud.google.com~~。

还可以使用 命名集合 传递参数。在这种情况下,urlaccess_key_idsecret_access_keyformatstructurecompression_method 的工作方式相同,并且支持一些额外的参数:

参数描述
filename如果指定,将附加到 url。
use_environment_credentials默认启用,允许使用环境变量 AWS_CONTAINER_CREDENTIALS_RELATIVE_URIAWS_CONTAINER_CREDENTIALS_FULL_URIAWS_CONTAINER_AUTHORIZATION_TOKENAWS_EC2_METADATA_DISABLED 传递额外的参数。
no_sign_request默认禁用。
expiration_window_seconds默认值为 120。

返回值

一个具有指定结构的表,用于在指定文件中读取或写入数据。

示例

从 S3 文件 https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv 中选择前 5 行:

SELECT *
FROM s3(
   'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv',
   'CSVWithNames'
)
LIMIT 5;
┌───────Date─┬────Open─┬────High─┬─────Low─┬───Close─┬───Volume─┬─OpenInt─┐
│ 1984-09-07 │ 0.42388 │ 0.42902 │ 0.41874 │ 0.42388 │ 23220030 │       0 │
│ 1984-09-10 │ 0.42388 │ 0.42516 │ 0.41366 │ 0.42134 │ 18022532 │       0 │
│ 1984-09-11 │ 0.42516 │ 0.43668 │ 0.42516 │ 0.42902 │ 42498199 │       0 │
│ 1984-09-12 │ 0.42902 │ 0.43157 │ 0.41618 │ 0.41618 │ 37125801 │       0 │
│ 1984-09-13 │ 0.43927 │ 0.44052 │ 0.43927 │ 0.43927 │ 57822062 │       0 │
└────────────┴─────────┴─────────┴─────────┴─────────┴──────────┴─────────┘
备注

ClickHouse 使用文件名扩展名来确定数据的格式。例如,我们可以在没有 CSVWithNames 的情况下运行之前的命令:

SELECT *
FROM s3(
   'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv'
)
LIMIT 5;

ClickHouse 还可以确定文件的压缩方法。例如,如果文件以 .csv.gz 扩展名进行了压缩,ClickHouse 会自动解压缩文件。

用法

假设我们在 S3 上有几个具有以下 URI 的文件:

计算文件中以数字 1 到 3 结尾的行数:

SELECT count(*)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/my-test-bucket-768/{some,another}_prefix/some_file_{1..3}.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
┌─count()─┐
│      18 │
└─────────┘

计算这两个目录中所有文件的总行数:

SELECT count(*)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/my-test-bucket-768/{some,another}_prefix/*', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
┌─count()─┐
│      24 │
└─────────┘
提示

如果您的文件列表包含带有前导零的数字范围,请使用每个数字单独用大括号包裹的构造,或使用 ?

计算名为 file-000.csvfile-001.csv、...、file-999.csv 的文件的总行数:

SELECT count(*)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/my-test-bucket-768/big_prefix/file-{000..999}.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32');
┌─count()─┐
│      12 │
└─────────┘

插入数据到文件 test-data.csv.gz

INSERT INTO FUNCTION s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
VALUES ('test-data', 1), ('test-data-2', 2);

从现有表插入数据到文件 test-data.csv.gz

INSERT INTO FUNCTION s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
SELECT name, value FROM existing_table;

Glob ** 可用于递归目录遍历。考虑以下示例,它将递归地从 my-test-bucket-768 目录中获取所有文件:

SELECT * FROM s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**', 'CSV', 'name String, value UInt32', 'gzip');

下面从 my-test-bucket 目录中的任何文件夹中递归获取所有 test-data.csv.gz 文件:

SELECT * FROM s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');

注意。可以在服务器配置文件中指定自定义 URL 映射器。示例:

SELECT * FROM s3('s3://clickhouse-public-datasets/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');

URL 's3://clickhouse-public-datasets/my-test-bucket-768/**/test-data.csv.gz' 将被替换为 'http://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz'

自定义映射器可以添加到 config.xml 中:

<url_scheme_mappers>
   <s3>
      <to>https://{bucket}.s3.amazonaws.com</to>
   </s3>
   <gs>
      <to>https://{bucket}.storage.googleapis.com</to>
   </gs>
   <oss>
      <to>https://{bucket}.oss.aliyuncs.com</to>
   </oss>
</url_scheme_mappers>

对于生产用例,建议使用 命名集合。以下是示例:


CREATE NAMED COLLECTION creds AS
        access_key_id = '***',
        secret_access_key = '***';
SELECT count(*)
FROM s3(creds, url='https://s3-object-url.csv')

分区写入

分区策略

仅支持 INSERT 查询。

WILDCARD(默认):在文件路径中将 {_partition_id} 通配符替换为实际的分区密钥。

HIVE 为读取和写入实现 Hive 风格分区。它使用以下格式生成文件:<prefix>/<key1=val1/key2=val2...>/<snowflakeid>.<toLower(file_format)>

HIVE 分区策略示例

INSERT INTO FUNCTION s3(s3_conn, filename='t_03363_function', format=Parquet, partition_strategy='hive') PARTITION BY (year, country) SELECT 2020 as year, 'Russia' as country, 1 as id;
SELECT _path, * FROM s3(s3_conn, filename='t_03363_function/**.parquet');

   ┌─_path──────────────────────────────────────────────────────────────────────┬─id─┬─country─┬─year─┐
1. │ test/t_03363_function/year=2020/country=Russia/7351295896279887872.parquet │  1 │ Russia  │ 2020 │
   └────────────────────────────────────────────────────────────────────────────┴────┴─────────┴──────┘

WILDCARD 分区策略示例

  1. 在键中使用分区 ID 创建单独的文件:
INSERT INTO TABLE FUNCTION
    s3('http://bucket.amazonaws.com/my_bucket/file_{_partition_id}.csv', 'CSV', 'a String, b UInt32, c UInt32')
    PARTITION BY a VALUES ('x', 2, 3), ('x', 4, 5), ('y', 11, 12), ('y', 13, 14), ('z', 21, 22), ('z', 23, 24);

结果数据写入三个文件:file_x.csvfile_y.csvfile_z.csv

  1. 在桶名称中使用分区 ID 创建不同桶中的文件:
INSERT INTO TABLE FUNCTION
    s3('http://bucket.amazonaws.com/my_bucket_{_partition_id}/file.csv', 'CSV', 'a UInt32, b UInt32, c UInt32')
    PARTITION BY a VALUES (1, 2, 3), (1, 4, 5), (10, 11, 12), (10, 13, 14), (20, 21, 22), (20, 23, 24);

结果数据写入三个不同桶中的文件:my_bucket_1/file.csvmy_bucket_10/file.csvmy_bucket_20/file.csv

访问公共桶

ClickHouse 会尝试从许多不同类型的源获取凭据。有时,当访问一些公共桶时,会导致客户端返回 403 错误代码的问题。通过使用 NOSIGN 关键字可以避免此问题,强制客户端忽略所有凭据,并且不签名请求。

SELECT *
FROM s3(
   'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv',
   NOSIGN,
   'CSVWithNames'
)
LIMIT 5;

使用 S3 凭证(ClickHouse Cloud)

对于非公共桶,用户可以将 aws_access_key_idaws_secret_access_key 传递给函数。例如:

SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv', '<KEY>', '<SECRET>','TSVWithNames')

这适用于一次性访问或凭据可以轻松轮换的情况。然而,这不建议作为重复访问或凭据敏感的长期解决方案。在这种情况下,我们建议用户依赖基于角色的访问。

ClickHouse Cloud 的 S3 角色访问文档 在这里

配置后,可以通过 extra_credentials 参数将 roleARN 传递给 s3 函数。例如:

SELECT count() FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/mta/*.tsv','CSVWithNames',extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/ClickHouseAccessRole-001'))

进一步的示例可以在 这里 找到。

处理归档

假设我们在 S3 上有几个归档文件,具有以下 URI:

可以使用::从这些归档中提取数据。通配符可以在 url 部分和::(负责归档内文件名的部分)后面部分使用。

SELECT *
FROM s3(
   'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-1{0..2}.csv.zip :: *.csv'
);
备注

ClickHouse 支持三种归档格式: ZIP TAR 7Z ZIP 和 TAR 归档可以从任何支持的存储位置访问,而 7Z 归档只能从安装 ClickHouse 的本地文件系统读取。

插入数据

注意,行只能插入到新文件中。没有合并周期或文件分割操作。一旦文件被写入,后续插入将失败。有关更多详细信息,请参见 这里

虚拟列

  • _path — 文件路径。类型:LowCardinality(String)。在归档的情况下,以格式:"{path_to_archive}::{path_to_file_inside_archive}" 显示。
  • _file — 文件名。类型:LowCardinality(String)。在归档的情况下,显示归档内的文件名。
  • _size — 文件大小(以字节为单位)。类型:Nullable(UInt64)。如果文件大小未知,则值为 NULL。在归档的情况下,显示归档内文件的未压缩大小。
  • _time — 文件的最后修改时间。类型:Nullable(DateTime)。如果时间未知,则值为 NULL

use_hive_partitioning 设置

这是 ClickHouse 在读取时解析 Hive 风格分区文件的提示。对写入没有影响。对于对称读取和写入,请使用 partition_strategy 参数。

当设置 use_hive_partitioning 为 1 时,ClickHouse 将在路径中检测 Hive 风格分区(/name=value/),并允许在查询中将分区列用作虚拟列。这些虚拟列将具有与分区路径中相同的名称,但以 _ 开头。

示例

SELECT * FROM s3('s3://data/path/date=*/country=*/code=*/*.parquet') WHERE date > '2020-01-01' AND country = 'Netherlands' AND code = 42;

访问请求者付费桶

要访问请求者付费桶,必须在任何请求中传递头部 x-amz-request-payer = requester。这可以通过将参数 headers('x-amz-request-payer' = 'requester') 传递给 s3 函数来实现。例如:

SELECT
    count() AS num_rows,
    uniqExact(_file) AS num_files
FROM s3('https://coiled-datasets-rp.s3.us-east-1.amazonaws.com/1trc/measurements-100*.parquet', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', headers('x-amz-request-payer' = 'requester'))

┌───num_rows─┬─num_files─┐
│ 1110000000 │       111 │
└────────────┴───────────┘

1 row in set. Elapsed: 3.089 sec. Processed 1.09 billion rows, 0.00 B (353.55 million rows/s., 0.00 B/s.)
Peak memory usage: 192.27 KiB.

存储设置