用于通过其协议与数据库服务器通信的 Java 客户端库。当前实现仅支持 HTTP 接口。
该库提供自己的 API 用于向服务器发送请求,并提供用于处理不同二进制数据格式 (RowBinary* 和 Native*) 的工具。
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>client-v2</artifactId>
<version>0.9.7</version>
</dependency>
// https://mvnrepository.com/artifact/com.clickhouse/client-v2
implementation("com.clickhouse:client-v2:0.9.7")
// https://mvnrepository.com/artifact/com.clickhouse/client-v2
implementation 'com.clickhouse:client-v2:0.9.7'
初始化
Client 对象由 com.clickhouse.client.api.Client.Builder#build() 初始化。每个客户端都有自己的上下文,客户端之间不共享对象。
Builder 提供了便于配置的方法。
示例:
Client client = new Client.Builder()
.addEndpoint("https://clickhouse-cloud-instance:8443/")
.setUsername(user)
.setPassword(password)
.build();
Client 实现了 AutoCloseable 接口,不再需要时应将其关闭。
身份验证
身份验证在初始化阶段为每个客户端单独配置。支持三种身份验证方式:基于密码、基于访问令牌、基于 SSL 客户端证书。
使用密码进行身份验证时,需要通过调用 setUsername(String) 和 setPassword(String) 方法来设置用户名和密码:
Client client = new Client.Builder()
.addEndpoint("https://clickhouse-cloud-instance:8443/")
.setUsername(user)
.setPassword(password)
.build();
使用访问令牌进行身份验证时,需要调用 setAccessToken(String) 来设置访问令牌:
Client client = new Client.Builder()
.addEndpoint("https://clickhouse-cloud-instance:8443/")
.setAccessToken(userAccessToken)
.build();
通过 SSL 客户端证书进行身份验证时,需要设置用户名、启用 SSL 身份验证,并设置客户端证书和客户端密钥,可分别通过调用 setUsername(String)、useSSLAuthentication(boolean)、setClientCertificate(String) 和 setClientKey(String) 来完成:
Client client = new Client.Builder()
.useSSLAuthentication(true)
.setUsername("some_user")
.setClientCertificate("some_user.crt")
.setClientKey("some_user.key")
注意
SSL 身份验证在生产环境中可能比较难以排查,因为许多来自 SSL 库的错误信息不够详细,无法提供足够的上下文。例如,如果客户端证书与私钥不匹配,服务器会立即终止连接 (对于 HTTP 来说,这发生在连接建立阶段,此时客户端尚未发送任何 HTTP 请求,因此也就不会收到任何响应) 。
请使用 openssl 等工具验证证书和密钥:
- 校验密钥完整性:
openssl rsa -in [key-file.key] -check -noout
- 检查客户端证书中的 CN 是否与该 USER 匹配:
- 从用户证书中获取 CN 值:
openssl x509 -noout -subject -in [user.cert]
- 确认在数据库中已设置相同的值:
select name, auth_type, auth_params from system.users where auth_type = 'ssl_certificate' (查询会输出 auth_params 字段,其内容类似于 {"common_names":["some_user"]})
所有设置均由实例方法 (亦称配置方法) 定义,从而使每个值的作用域和上下文一目了然。
主要配置参数在单一作用域 (客户端或操作) 中定义,互不覆盖。
客户端配置在创建时定义。请参阅 com.clickhouse.client.api.Client.Builder。
客户端配置
| Method | Arguments | Description | Default | Key |
|---|
addEndpoint(String endpoint) | endpoint - URL 格式的服务器地址 | 将服务器端点添加到可用服务器列表。目前仅支持一个端点。 | none | none |
addEndpoint(Protocol protocol, String host, int port, boolean secure) | protocol - 连接协议
host - IP 或主机名
secure - 使用 HTTPS | 将服务器端点添加到可用服务器列表。目前仅支持一个端点。 | none | none |
enableConnectionPool(boolean enable) | enable - 启用/禁用标志 | 设置是否启用连接池 | true | connection_pool_enabled |
setMaxConnections(int maxConnections) | maxConnections - 连接数量 | 设置客户端对每个服务器端点最多可以打开的连接数。 | 10 | max_open_connections |
setConnectionTTL(long timeout, ChronoUnit unit) | timeout - 超时值
unit - 时间单位 | 设置连接的生存时间 (TTL),超过该时间后连接将被视为不再活动 | -1 | connection_ttl |
setKeepAliveTimeout(long timeout, ChronoUnit unit) | timeout - 超时值
unit - 时间单位 | 设置 HTTP 连接的 Keep-Alive 超时时间。设置为 0 以禁用 Keep-Alive。 | - | http_keep_alive_timeout |
setConnectionReuseStrategy(ConnectionReuseStrategy strategy) | strategy - LIFO 或 FIFO | 选择连接池应使用的连接复用策略 | FIFO | connection_reuse_strategy |
setDefaultDatabase(String database) | database - 数据库名称 | 设置默认数据库。 | default | database |
| Method | Arguments | Description | Default | Key |
|---|
setUsername(String username) | username - 用于身份验证的用户名 | 为将在后续配置中选定的身份验证方式设置用户名 | default | user |
setPassword(String password) | password - 机密值 | 为密码认证设置机密值,并实际将密码认证设为所使用的身份验证方式 | - | password |
setAccessToken(String accessToken) | accessToken - 访问令牌字符串 | 设置访问令牌,以通过对应的认证方式进行身份验证 | - | access_token |
useSSLAuthentication(boolean useSSLAuthentication) | useSSLAuthentication - 启用 SSL 认证的标志 | 将 SSL 客户端证书设置为一种认证方式。 | - | ssl_authentication |
useHTTPBasicAuth(boolean useBasicAuth) | useBasicAuth - 启用/禁用的标志 | 设置在用户密码认证中是否应使用基本 HTTP 认证。可解决包含特殊字符的密码所导致的问题。 | true | http_use_basic_auth |
useBearerTokenAuth(String bearerToken) | bearerToken - 编码后的 Bearer 令牌 | 指定是否使用 Bearer 认证以及要使用的令牌。令牌将按原样发送。 | - | bearer_token |
| Method | Arguments | Description | Default | Key |
|---|
setConnectTimeout(long timeout, ChronoUnit unit) | timeout - 超时时间值
unit - 时间单位 | 设置所有出站连接在建立连接时的超时时间。 | - | connection_timeout |
setConnectionRequestTimeout(long timeout, ChronoUnit unit) | timeout - 超时时间值
unit - 时间单位 | 设置连接请求超时时间。仅在从连接池获取连接时生效。 | 10000 | connection_request_timeout |
setSocketTimeout(long timeout, ChronoUnit unit) | timeout - 超时时间值
unit - 时间单位 | 设置用于读写操作的 socket 超时时间 | 0 | socket_timeout |
setExecutionTimeout(long timeout, ChronoUnit timeUnit) | timeout - 超时时间值
timeUnit - 时间单位 | 设置查询的最大执行超时时间 | 0 | max_execution_time |
retryOnFailures(ClientFaultCause ...causes) | causes - ClientFaultCause 的枚举常量 | 设置可恢复/可重试的故障类型。 | NoHttpResponse ConnectTimeout ConnectionRequestTimeout | client_retry_on_failures |
setMaxRetries(int maxRetries) | maxRetries - 重试次数 | 设置针对由 retryOnFailures 定义的故障的最大重试次数。 | 3 | retry |
| Method | Arguments | Description | Default | Key |
|---|
setSocketRcvbuf(long size) | size - 以字节为单位的大小 | 设置 TCP socket 接收缓冲区。该缓冲区位于 JVM 内存之外 (堆外内存) 。 | 8196 | socket_rcvbuf |
setSocketSndbuf(long size) | size - 以字节为单位的大小 | 设置 TCP socket 发送缓冲区。该缓冲区位于 JVM 内存之外 (堆外内存) 。 | 8196 | socket_sndbuf |
setSocketKeepAlive(boolean value) | value - 启用/禁用的标志 | 为每个 TCP socket 设置 SO_KEEPALIVE 选项。启用 TCP Keep-Alive 机制,用于检查连接是否仍然存活。 | - | socket_keepalive |
setSocketTcpNodelay(boolean value) | value - 启用/禁用的标志 | 为每个 TCP socket 设置 SO_NODELAY 选项。该 TCP 选项会使 socket 尽可能立即发送数据。 | - | socket_tcp_nodelay |
setSocketLinger(int secondsToWait) | secondsToWait - 秒数 | 为客户端创建的每个 TCP socket 设置延迟关闭时间 (linger time) 。 | - | socket_linger |
| Method | Arguments | Description | Default | Key |
|---|
compressServerResponse(boolean enabled) | enabled - 启用/禁用标志 | 设置服务器是否应对其响应进行压缩。 | true | compress |
compressClientRequest(boolean enabled) | enabled - 启用/禁用标志 | 设置客户端是否应对其请求进行压缩。 | false | decompress |
useHttpCompression(boolean enabled) | enabled - 启用/禁用标志 | 设置在相应选项启用时,客户端与服务器通信时是否应使用 HTTP 压缩。 | - | - |
appCompressedData(boolean enabled) | enabled - 启用/禁用标志 | 告知客户端压缩将由应用程序负责处理。 | false | app_compressed_data |
setLZ4UncompressedBufferSize(int size) | size - 大小 (字节) | 设置用于接收数据流未压缩部分的缓冲区大小。 | 65536 | compression.lz4.uncompressed_buffer_size |
disableNativeCompression | disable - 禁用标志 | 禁用原生压缩。如果设为 true,则会禁用原生压缩。 | false | disable_native_compression |
| Method | Arguments | Description | Default | Key |
|---|
setSSLTrustStore(String path) | path - 本地系统上的文件路径 | 配置客户端是否使用 SSL truststore 来对服务器主机进行验证。 | - | trust_store |
setSSLTrustStorePassword(String password) | password - 机密值 | 设置用于解锁由 setSSLTrustStore 指定的 SSL truststore 的密码 | - | key_store_password |
setSSLTrustStoreType(String type) | type - truststore 类型名称 | 设置由 setSSLTrustStore 指定的 truststore 类型。 | - | key_store_type |
setRootCertificate(String path) | path - 本地系统上的文件路径 | 配置客户端是否使用指定的根 (CA) 证书来验证服务器主机。 | - | sslrootcert |
setClientCertificate(String path) | path - 本地系统上的文件路径 | 设置在发起 SSL 连接以及进行 SSL 认证时要使用的客户端证书路径。 | - | sslcert |
setClientKey(String path) | path - 本地系统上的文件路径 | 设置用于与服务器进行 SSL 加密通信的客户端私钥。 | - | ssl_key |
sslSocketSNI(String sni) | sni - 服务器名称字符串 | 设置在 SSL/TLS 连接中用于 SNI (Server Name Indication,服务器名称指示) 的服务器名称。 | - | ssl_socket_sni |
| 方法 | 参数 | 描述 | 默认值 | 键 |
|---|
addProxy(ProxyType type, String host, int port) | type - 代理类型
host - 代理主机名或 IP
port - 代理端口 | 设置用于与服务器通信的代理。 | - | proxy_type, proxy_host, proxy_port |
setProxyCredentials(String user, String pass) | user - 代理用户名
pass - 密码 | 设置用于代理认证的用户凭据。 | - | proxy_user, proxy_password |
| Method | Arguments | Description | Default | Key |
|---|
setHttpCookiesEnabled(boolean enabled) | enabled - 用于启用/禁用的标志位 | 设置是否应记住 HTTP cookie 并将其发送回服务器。 | - | - |
httpHeader(String key, String value) | key - HTTP 头键
value - 字符串值 | 为单个 HTTP 头设置值。之前的值将被覆盖。 | none | none |
httpHeader(String key, Collection values) | key - HTTP 头键
values - 字符串值列表 | 为单个 HTTP 头设置多个值。之前的值将被覆盖。 | none | none |
httpHeaders(Map headers) | headers - 包含 HTTP 头的映射 | 一次性设置多个 HTTP 头的值。 | none | none |
useHttpFormDataForQuery(boolean enable) | enable - 启用/禁用标志 | 设置是否应将查询参数作为 HTTP 表单数据放在请求体中,而不是放在 URL 中。仅在启用服务器端压缩时生效。如果启用了客户端级压缩,对于带参数的查询请求将会禁用该压缩,因为每个参数都会作为 multipart 内容发送。 | false | client.http.use_form_request_for_query |
| 方法 | 参数 | 描述 | 默认值 | 键 |
|---|
serverSetting(String name, String value) | name - 设置名称
value - 设置值 | 设置随每个查询一起发送到服务器的设置。单次操作的设置可以覆盖它。设置列表 | none | none |
serverSetting(String name, Collection values) | name - 设置名称
values - 设置值 | 设置随每个查询一起发送到服务器的、包含多个值的设置,例如 roles | none | none |
setOption("custom_settings_prefix", value) | value - 前缀字符串 | 设置传递给服务器的自定义服务器设置的前缀,应与服务器配置保持一致。参见 ClickHouse 文档。 | custom_ | custom_settings_prefix |
| Method | Arguments | Description | Default | Key |
|---|
useServerTimeZone(boolean useServerTimeZone) | useServerTimeZone - 启用/禁用开关 | 设置在解码 DateTime 和 Date 列值时客户端是否应使用服务器时区。 | true | use_server_time_zone |
useTimeZone(String timeZone) | timeZone - 有效的 Java 时区 ID | 设置在解码 DateTime 和 Date 列值时是否应使用指定的时区。将覆盖服务器时区。 | - | use_time_zone |
setServerTimeZone(String timeZone) | timeZone - 有效的 Java 时区 ID | 设置服务器端时区。默认使用 UTC 时区。 | UTC | server_time_zone |
| Method | Arguments | Description | Default | Key |
|---|
setOption(String key, String value) | key - 配置选项键
value - 选项值 | 设置客户端选项的原始值。适用于从属性配置文件读取配置的场景。 | - | - |
useAsyncRequests(boolean async) | async - 启用/禁用标志 | 设置客户端是否应在单独线程中执行请求。默认禁用,因为应用程序更清楚如何组织多线程任务。 | false | async |
setSharedOperationExecutor(ExecutorService executorService) | executorService - ExecutorService 实例 | 为操作任务设置 ExecutorService。 | none | none |
setQueryIdGenerator(Supplier<String> supplier) | supplier - 用于生成查询 ID 的 Supplier<String> | 设置自定义查询 ID 生成器,当操作设置 (InsertSettings、QuerySettings) 中未指定查询 ID 时将使用该生成器。 | - | - |
setClientNetworkBufferSize(int size) | size - 以字节为单位的大小 | 设置应用程序内存空间中缓冲区的大小,该缓冲区用于在 socket 与应用程序之间复制数据。 | 300000 | client_network_buffer_size |
allowBinaryReaderToReuseBuffers(boolean reuse) | reuse - 启用/禁用标志 | 如果启用,读取器将使用预分配的缓冲区执行数值转码。可降低数值数据的 GC 压力。 | - | - |
columnToMethodMatchingStrategy(ColumnToMethodMatchingStrategy strategy) | strategy - 匹配策略实现 | 设置在注册 DTO 时用于匹配 DTO 类字段和数据库列的自定义策略。 | none | none |
setClientName(String clientName) | clientName - 应用程序名称字符串 | 设置关于调用方应用程序的附加信息。该信息将作为 User-Agent 头传递。 | - | client_name |
registerClientMetrics(Object registry, String name) | registry - Micrometer registry 实例
name - 指标组名称 | 向 Micrometer (https://micrometer.io/) registry 实例注册指标采集器。 | - | - |
setServerVersion(String version) | version - 服务器版本字符串 | 设置服务器版本以避免版本自动检测。 | - | server_version |
typeHintMapping(Map typeHintMapping) | typeHintMapping - 类型提示映射 | 为 ClickHouse 类型设置类型提示映射。例如,使多维数组可以以 Java 容器的形式呈现。 | - | type_hint_mapping |
客户端标识
查询日志中有两个字段用于标识发起请求的应用程序:client_name 和 http_user_agent。原生 TCP 协议使用 client_name 来标识应用程序。HTTP 协议使用 http_user_agent 来标识应用程序。Client builder 提供了 setClientName 方法,可为两种协议设置正确的值。
http_user_agent 字段按照 User-Agent 请求头的通用格式设置:application-name[/version] [(operating-system; architecture; ...)]。
这组值会在每一层重复出现:应用程序、客户端库、HTTP 客户端库。通过 setClientName 方法设置的内容在列表中排在最前面。
例如:
client.setClientName("my-app-01/1.0");
将得到如下 http_user_agent 值:
my-app-01/1.0 clickhouse-java-v2/0.9.6-SNAPSHOT (Linux; jvm:17.0.17) Apache-HttpClient/5.4.4
应用程序可以设置自定义 HTTP 请求头 User-Agent 来标识自身。但 clickhouse-java-v2/0.9.6-SNAPSHOT 部分将被追加到该请求头的末尾。
操作标识
查询日志还包含两个字段 query_id 和 log_comment,可用于标识某个操作并为查询日志添加额外信息。
query_id 是操作的唯一标识符。应用程序可以通过调用 QuerySettings 类的 setQueryId 方法来设置它。
QuerySettings querySettings = new QuerySettings();
querySettings.setQueryId("some-query-id");
log_comment 是可以添加到查询日志的注释。可以在应用程序中通过调用 QuerySettings 类的 logComment 方法来设置。
QuerySettings querySettings = new QuerySettings();
querySettings.logComment("some-comment");
服务器设置
服务器端设置可以在创建客户端时于客户端级别配置一次 (参见 Builder 的 serverSetting 方法) ,也可以在操作级别配置 (参见操作设置类的 serverSetting 方法) 。
try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false)
.setUsername("default")
.setPassword(ClickHouseServerForTest.getPassword())
.compressClientRequest(true)
// Client level
.serverSetting("max_threads", "10")
.serverSetting("async_insert", "1")
.serverSetting("roles", Arrays.asList("role1", "role2"))
.build()) {
// Operation level
QuerySettings querySettings = new QuerySettings();
querySettings.serverSetting("session_timezone", "Europe/Zurich");
...
}
⚠️ 当通过 setOption 方法设置选项时 (无论是在 Client.Builder 还是在操作设置类中) ,服务器设置名称应以 clickhouse_setting_ 为前缀。在这种情况下,com.clickhouse.client.api.ClientConfigProperties#serverSetting() 可能会很有用。
可以在客户端级别为所有操作设置自定义 HTTP 请求头,也可以在操作级别仅为单个操作设置。
QuerySettings settings = new QuerySettings()
.httpHeader(HttpHeaders.REFERER, clientReferer)
.setQueryId(qId);
当通过 setOption 方法设置选项时 (无论是 Client.Builder 还是操作设置类) ,自定义头名称应以 http_header_ 为前缀。此情况下,方法 com.clickhouse.client.api.ClientConfigProperties#httpHeader() 可能会很有用。
常用术语定义
支持的格式 的枚举类型,包含 ClickHouse 支持的所有格式。
raw - 用户需要自行对原始数据进行转码
full - 客户端可以自行对数据进行转码,并可接收原始数据流
- - ClickHouse 不支持对该格式执行此操作
此客户端版本支持:
插入 API
接受以指定格式编码的字节输入流 (InputStream) 形式的数据。假定 data 已按 format 进行编码。
签名
CompletableFuture<InsertResponse> insert(String tableName, InputStream data, ClickHouseFormat format, InsertSettings settings)
CompletableFuture<InsertResponse> insert(String tableName, InputStream data, ClickHouseFormat format)
参数
tableName - 目标表的名称。
data - 已编码数据的输入流。
format - 数据编码所使用的格式。
settings - 请求设置。
返回值
InsertResponse 类型的 Future,包含操作结果以及服务器端指标等附加信息。
示例
try (InputStream dataStream = getDataStream()) {
try (InsertResponse response = client.insert(TABLE_NAME, dataStream, ClickHouseFormat.JSONEachRow,
insertSettings).get(3, TimeUnit.SECONDS)) {
log.info("Insert finished: {} rows written", response.getMetrics().getMetric(ServerMetrics.NUM_ROWS_WRITTEN).getLong());
} catch (Exception e) {
log.error("Failed to write JSONEachRow data", e);
throw new RuntimeException(e);
}
}
insert(String tableName, List<?> data, InsertSettings settings)
向数据库发送写入请求。对象列表会被转换为高效的内部格式,然后发送到服务器。列表项所属的类应事先使用 register(Class, TableSchema) 方法进行注册。
方法签名
client.insert(String tableName, List<?> data, InsertSettings settings)
client.insert(String tableName, List<?> data)
参数
tableName - 目标表名称。
data - DTO (数据传输对象) 对象集合。
settings - 请求设置。
返回值
返回 InsertResponse 类型的 Future,包含操作结果以及服务器端指标等附加信息。
示例
// Important step (done once) - register class to pre-compile object serializer according to the table schema.
client.register(ArticleViewEvent.class, client.getTableSchema(TABLE_NAME));
List<ArticleViewEvent> events = loadBatch();
try (InsertResponse response = client.insert(TABLE_NAME, events).get()) {
// handle response, then it will be closed and connection that served request will be released.
}
InsertSettings
插入操作的配置选项。
配置方法
| 方法 | 说明 |
|---|
setQueryId(String queryId) | 设置将要分配给该操作的查询 ID。默认值:null。 |
setDeduplicationToken(String token) | 设置去重令牌。该令牌将发送到服务器,并可用于标识查询。默认值:null。 |
setInputStreamCopyBufferSize(int size) | 复制缓冲区的大小。在写入操作期间,该缓冲区用于将用户提供的输入流中的数据复制到输出流。默认值:8196。 |
serverSetting(String name, String value) | 为某次操作设置单个服务器端配置项。 |
serverSetting(String name, Collection values) | 为某个操作设置可包含多个值的单个服务器配置项。集合中的元素应为 String 值。 |
setDBRoles(Collection dbRoles) | 设置在执行操作前要生效的数据库角色。集合中的元素应为 String 值。 |
setOption(String option, Object value) | 以原始格式设置配置选项。这不是服务器端设置。 |
InsertResponse
用于保存插入操作结果的响应对象。仅当客户端从服务器收到响应时才可用。
注意
应尽快关闭该对象以释放连接,因为在完全读取完之前,该连接无法复用。
| 方法 | 说明 |
|---|
OperationMetrics getMetrics() | 返回一个包含该操作各项指标的对象。 |
String getQueryId() | 返回为该操作分配的查询 ID,该 ID 由应用程序 (通过操作设置) 指定,或由服务器生成。 |
查询 API
query(String sqlQuery)
按原样发送 sqlQuery。响应格式由查询设置指定。QueryResponse 将持有对响应流的引用,该响应流应由能够处理该格式的读取器来消费。
方法签名
CompletableFuture<QueryResponse> query(String sqlQuery, QuerySettings settings)
CompletableFuture<QueryResponse> query(String sqlQuery)
参数
sqlQuery - 单条 SQL 语句。该查询按原样发送到服务器。
settings - 请求相关设置。
返回值
类型为 QueryResponse 的 Future —— 结果数据集以及服务器端指标等附加信息。使用完数据集后应关闭 Response 对象。
示例
final String sql = "select * from " + TABLE_NAME + " where title <> '' limit 10";
// Default format is RowBinaryWithNamesAndTypesFormatReader so reader have all information about columns
try (QueryResponse response = client.query(sql).get(3, TimeUnit.SECONDS);) {
// Create a reader to access the data in a convenient way
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);
while (reader.hasNext()) {
reader.next(); // Read the next record from stream and parse it
// get values
double id = reader.getDouble("id");
String title = reader.getString("title");
String url = reader.getString("url");
// collecting data
}
} catch (Exception e) {
log.error("Failed to read data", e);
}
// put business logic outside of the reading block to release http connection asap.
query(String sqlQuery, Map<String, Object> queryParams, QuerySettings settings)
按原样发送 sqlQuery,并同时发送查询参数,以便服务器能够编译该 SQL 表达式。
签名
CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Object> queryParams, QuerySettings settings)
参数
sqlQuery - 带有占位符 {} 的 SQL 表达式。
queryParams - 变量到值的映射,用于在服务器端补全 SQL 表达式。
settings - 请求设置。
返回值
QueryResponse 类型的 Future —— 包含结果数据集以及如服务器端指标等附加信息。使用完数据集后,应关闭 Response 对象。
示例
// define parameters. They will be sent to the server along with the request.
Map<String, Object> queryParams = new HashMap<>();
queryParams.put("param1", 2);
try (QueryResponse response =
client.query("SELECT * FROM " + table + " WHERE col1 >= {param1:UInt32}", queryParams, new QuerySettings()).get()) {
// Create a reader to access the data in a convenient way
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);
while (reader.hasNext()) {
reader.next(); // Read the next record from stream and parse it
// reading data
}
} catch (Exception e) {
log.error("Failed to read data", e);
}
queryAll(String sqlQuery)
以 RowBinaryWithNamesAndTypes 格式查询数据。结果以集合形式返回。读取性能与使用 reader 相同,但需要更多内存来保存整个数据集。
签名
List<GenericRecord> queryAll(String sqlQuery)
参数
sqlQuery - 用于从服务器查询数据的 SQL 表达式。
返回值
完整数据集由 GenericRecord 对象列表表示,这些对象以按行的方式提供对结果数据的访问。
示例
try {
log.info("Reading whole table and process record by record");
final String sql = "select * from " + TABLE_NAME + " where title <> ''";
// Read whole result set and process it record by record
client.queryAll(sql).forEach(row -> {
double id = row.getDouble("id");
String title = row.getString("title");
String url = row.getString("url");
log.info("id: {}, title: {}, url: {}", id, title, url);
});
} catch (Exception e) {
log.error("Failed to read data", e);
}
QuerySettings
查询操作的配置选项。
配置方法
| 方法 | 说明 |
|---|
setQueryId(String queryId) | 设置将分配给该操作的查询 ID。 |
setFormat(ClickHouseFormat format) | 设置响应格式。完整列表请参见 RowBinaryWithNamesAndTypes。 |
setMaxExecutionTime(Integer maxExecutionTime) | 设置服务器端该操作的最大执行时间,不影响读取超时。 |
waitEndOfQuery(Boolean waitEndOfQuery) | 请求服务器在查询完成后再发送响应。 |
setUseServerTimeZone(Boolean useServerTimeZone) | 服务端时区 (参见客户端配置) 将用于解析操作结果中的日期/时间类型值。默认值为 false。 |
setUseTimeZone(String timeZone) | 请求服务器在时间转换时使用 timeZone。详见 session_timezone。 |
serverSetting(String name, String value) | 为某个操作设置独立的服务器端配置。 |
serverSetting(String name, Collection values) | 为某个操作设置可包含多个值的单个服务器端设置。集合中的项应为 String 类型的值。 |
setDBRoles(Collection dbRoles) | 设置在执行操作前要生效的数据库角色。集合中的元素应为 String 值。 |
setOption(String option, Object value) | 以原始格式设置配置选项。这不是服务器端设置。 |
QueryResponse
用于保存查询执行结果的响应对象。仅在客户端已从服务器收到响应时才可用。
注意
应尽快关闭此对象以释放连接,因为在完全读取完上一个响应的全部数据之前,该连接无法被复用。
| 方法 | 说明 |
|---|
ClickHouseFormat getFormat() | 返回响应数据的编码格式。 |
InputStream getInputStream() | 返回按指定格式编码的未压缩数据字节流。 |
OperationMetrics getMetrics() | 返回包含该操作指标的对象。 |
String getQueryId() | 返回为该操作分配的查询 ID,该 ID 可由应用程序 (通过操作设置) 或服务器指定。 |
TimeZone getTimeZone() | 返回在处理响应中的 Date/DateTime 类型时应使用的时区。 |
- 示例代码可在代码仓库中获取
- 参考 Spring Service 的实现
通用 API
getTableSchema(String table)
获取 table 的表结构。
方法签名
TableSchema getTableSchema(String table)
TableSchema getTableSchema(String table, String database)
参数
table - 要获取其 schema 数据的表名。
database - 定义目标表的数据库。
返回值
返回一个 TableSchema 对象,其中包含该表的列列表。
getTableSchemaFromQuery(String sql)
从 SQL 语句中获取表结构。
签名
TableSchema getTableSchemaFromQuery(String sql)
参数
sql - “SELECT” SQL 语句,其 schema 将被返回。
返回值
返回一个列与 sql 表达式匹配的 TableSchema 对象。
TableSchema
register(Class<?> clazz, TableSchema schema)
为 Java 类编译一个序列化/反序列化层,以便在基于 schema 进行数据读写时使用。该方法将为 getter/setter 方法对及其对应的列创建序列化器和反序列化器。
列匹配是通过从方法名中提取列名来完成的。例如,getFirstName 将对应列 first_name 或 firstname。
签名
void register(Class<?> clazz, TableSchema schema)
参数
clazz - 表示用于读取/写入数据的 POJO 的 Class。
schema - 用于与 POJO 属性进行匹配的数据 schema。
示例
client.register(ArticleViewEvent.class, client.getTableSchema(TABLE_NAME));
使用示例
完整示例代码存放在代码仓库的 'example` 目录 中:
读取数据
读取数据通常有两种方式:
query() 方法返回底层的 QueryResponse 对象,该对象包含用于承载数据的 InputStream。通常与 ClickHouseBinaryFormatReader 搭配使用以进行流式读取,但
也可以与任意其他自定义读取器实现配合使用。QueryResponse 还提供对结果集元数据和指标的访问能力。
queryAll() 方法与 GenericRecord 搭配使用,可方便按行访问数据。在这种情况下,会将整个结果集加载到内存中。
queryRecords() 方法返回 com.clickhouse.client.api.query.Records —— 一个用于迭代 GenericRecord 对象的迭代器。该方法采用流式处理方式
(不会将数据加载到内存中) ,并通过 GenericRecord 来访问数据。
注意: 流式处理方式要求快速读取,否则由于数据直接从网络流读取,可能导致服务器写入超时。
读取数组
ClickHouseBinaryFormatReader 方法
getList(...) - 将任意 Array(...) 读取为 List<T>。适合作为类型灵活读取时的默认选择。支持嵌套数组。
getByteArray(...), getShortArray(...), getIntArray(...), getLongArray(...), getFloatArray(...), getDoubleArray(...), getBooleanArray(...) - 最适用于由与 Java 原始类型兼容的值构成的一维数组。
getStringArray(...) - 用于读取 Array(String) (以及以名称表示的枚举值) 。
getObjectArray(...) - 针对任意 Array(...) 元素类型 (包括嵌套数组) 的通用方法。用于读取包含 Nullable 值或嵌套数组的数组。
所有方法都提供基于索引和基于名称的重载。索引从 1 开始计数。基于索引的重载会直接访问对应的列。
基于名称的方法每次调用时都需要执行一次索引查找。
try (QueryResponse response = client.query("SELECT * FROM my_table").get()) {
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);
while (reader.next() != null) {
Object[] uint64 = reader.getObjectArray("uint64_arr"); // Array(UInt64) -> BigInteger[]
Object[] arr2d = reader.getObjectArray("arr2d"); // Array(Array(Int64)) -> Object[]
// nested arrays are returned as nested Object[]:
Object[] firstInner = (Object[]) arr2d[0];
Long firstValue = (Long) firstInner[0];
}
}
GenericRecord 方法
getList(...) - 将任意 Array(...) 读取为 List<T>。适合作为灵活类型读取的默认选项。支持嵌套数组。
getByteArray(...), getShortArray(...), getIntArray(...), getLongArray(...), getFloatArray(...), getDoubleArray(...), getBooleanArray(...) - 最适用于由与 Java 原始类型兼容的值构成的一维数组。
getStringArray(...) - 用于 Array(String) 类型 (以及以名称表示的枚举值) 。
getObjectArray(...) - 适用于任意 Array(...) 元素类型 (包括嵌套数组) 的通用方法。用于读取包含 Nullable 值和嵌套数组的数组。
所有方法都提供基于索引和基于名称的重载。索引从 1 开始。基于索引的重载会直接访问列。
基于名称的方法每次都需要进行索引查找。
try (QueryResponse response = client.query("SELECT * FROM my_table").get()) {
List<GenericRecord> rows = client.queryAll(
"SELECT int_arr, arr2d_nullable FROM test_arrays ORDER BY id");
for (GenericRecord row : rows) {
Object[] intArr = row.getObjectArray("int_arr"); // Array(Int32) -> Integer[]
Object[] arr2d = row.getObjectArray("arr2d_nullable"); // Array(Array(Nullable(Int32)))
Object[] inner = (Object[]) arr2d[0];
Object maybeNull = inner[1]; // may be null
}
}
## Migration Guide
Old client (V1) was using `com.clickhouse.client.ClickHouseClient#builder` as start point. The new client (V2) uses similar pattern with `com.clickhouse.client.api.Client.Builder`. Main
differences are:
- no service loader is used to grab implementation. The `com.clickhouse.client.api.Client` is facade class for all kinds of implementation in the future.
- a fewer sources of configuration: one is provided to the builder and one is with operation settings (`QuerySettings`, `InsertSettings`). Previous version had configuration per node and was loading
env. variables in some cases.
### Configuration Parameters Match
There are 3 enum classes related to configuration in V1:
- `com.clickhouse.client.config.ClickHouseDefaults` - configuration parameters that supposed to be set in most use cases. Like `USER` and `PASSWORD`.
- `com.clickhouse.client.config.ClickHouseClientOption` - configuration parameters specific for the client. Like `HEALTH_CHECK_INTERVAL`.
- `com.clickhouse.client.http.config.ClickHouseHttpOption` - configuration parameters specific for HTTP interface. Like `RECEIVE_QUERY_PROGRESS`.
They were designed to group parameters and provide clear separation. However in some cases it lead to a confusion (is there a difference between `com.clickhouse.client.config.ClickHouseDefaults#ASYNC` and
`com.clickhouse.client.config.ClickHouseClientOption#ASYNC`). The new V2 client uses `com.clickhouse.client.api.Client.Builder` as single dictionary of all possible client configuration options.There is
`com.clickhouse.client.api.ClientConfigProperties` where all configuration parameter names are listed.
Table below shows what old options are supported in the new client and their new meaning.
**Legend:** ✔ = supported, ✗ = dropped
<Tabs groupId="v1-migration">
<TabItem value="connection-auth" label="Connection & Auth">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseDefaults#HOST` | `Client.Builder#addEndpoint` | |
| `ClickHouseDefaults#PROTOCOL` | ✗ | Only HTTP supported in V2 |
| `ClickHouseDefaults#DATABASE`<br/>`ClickHouseClientOption#DATABASE` | `Client.Builder#setDefaultDatabase` | |
| `ClickHouseDefaults#USER` | `Client.Builder#setUsername` | |
| `ClickHouseDefaults#PASSWORD` | `Client.Builder#setPassword` | |
| `ClickHouseClientOption#CONNECTION_TIMEOUT` | `Client.Builder#setConnectTimeout` | |
| `ClickHouseClientOption#CONNECTION_TTL` | `Client.Builder#setConnectionTTL` | |
| `ClickHouseHttpOption#MAX_OPEN_CONNECTIONS` | `Client.Builder#setMaxConnections` | |
| `ClickHouseHttpOption#KEEP_ALIVE`<br/>`ClickHouseHttpOption#KEEP_ALIVE_TIMEOUT` | `Client.Builder#setKeepAliveTimeout` | |
| `ClickHouseHttpOption#CONNECTION_REUSE_STRATEGY` | `Client.Builder#setConnectionReuseStrategy` | |
| `ClickHouseHttpOption#USE_BASIC_AUTHENTICATION` | `Client.Builder#useHTTPBasicAuth` | |
</TabItem>
<TabItem value="ssl" label="SSL & Security">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseDefaults#SSL_CERTIFICATE_TYPE` | ✗ | |
| `ClickHouseDefaults#SSL_KEY_ALGORITHM` | ✗ | |
| `ClickHouseDefaults#SSL_PROTOCOL` | ✗ | |
| `ClickHouseClientOption#SSL` | ✗ | See `Client.Builder#addEndpoint` |
| `ClickHouseClientOption#SSL_MODE` | ✗ | |
| `ClickHouseClientOption#SSL_ROOT_CERTIFICATE` | `Client.Builder#setRootCertificate` | SSL Auth should be enabled by `useSSLAuthentication` |
| `ClickHouseClientOption#SSL_CERTIFICATE` | `Client.Builder#setClientCertificate` | |
| `ClickHouseClientOption#SSL_KEY` | `Client.Builder#setClientKey` | |
| `ClickHouseClientOption#KEY_STORE_TYPE` | `Client.Builder#setSSLTrustStoreType` | |
| `ClickHouseClientOption#TRUST_STORE` | `Client.Builder#setSSLTrustStore` | |
| `ClickHouseClientOption#KEY_STORE_PASSWORD` | `Client.Builder#setSSLTrustStorePassword` | |
| `ClickHouseClientOption#SSL_SOCKET_SNI` | `Client.Builder#sslSocketSNI` | |
| `ClickHouseClientOption#CUSTOM_SOCKET_FACTORY` | ✗ | |
| `ClickHouseClientOption#CUSTOM_SOCKET_FACTORY_OPTIONS` | ✗ | See `Client.Builder#sslSocketSNI` to set SNI |
</TabItem>
<TabItem value="socket" label="Socket Options">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseClientOption#SOCKET_TIMEOUT` | `Client.Builder#setSocketTimeout` | |
| `ClickHouseClientOption#SOCKET_REUSEADDR` | `Client.Builder#setSocketReuseAddress` | |
| `ClickHouseClientOption#SOCKET_KEEPALIVE` | `Client.Builder#setSocketKeepAlive` | |
| `ClickHouseClientOption#SOCKET_LINGER` | `Client.Builder#setSocketLinger` | |
| `ClickHouseClientOption#SOCKET_IP_TOS` | ✗ | |
| `ClickHouseClientOption#SOCKET_TCP_NODELAY` | `Client.Builder#setSocketTcpNodelay` | |
| `ClickHouseClientOption#SOCKET_RCVBUF` | `Client.Builder#setSocketRcvbuf` | |
| `ClickHouseClientOption#SOCKET_SNDBUF` | `Client.Builder#setSocketSndbuf` | |
</TabItem>
<TabItem value="compression" label="Compression">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseClientOption#COMPRESS` | `Client.Builder#compressServerResponse` | See also `useHttpCompression` |
| `ClickHouseClientOption#DECOMPRESS` | `Client.Builder#compressClientRequest` | See also `useHttpCompression` |
| `ClickHouseClientOption#COMPRESS_ALGORITHM` | ✗ | `LZ4` for non-http. Http uses `Accept-Encoding` |
| `ClickHouseClientOption#DECOMPRESS_ALGORITHM` | ✗ | `LZ4` for non-http. Http uses `Content-Encoding` |
| `ClickHouseClientOption#COMPRESS_LEVEL` | ✗ | |
| `ClickHouseClientOption#DECOMPRESS_LEVEL` | ✗ | |
</TabItem>
<TabItem value="proxy" label="Proxy">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseClientOption#PROXY_TYPE` | `Client.Builder#addProxy` | |
| `ClickHouseClientOption#PROXY_HOST` | `Client.Builder#addProxy` | |
| `ClickHouseClientOption#PROXY_PORT` | `Client.Builder#addProxy` | |
| `ClickHouseClientOption#PROXY_USERNAME` | `Client.Builder#setProxyCredentials` | |
| `ClickHouseClientOption#PROXY_PASSWORD` | `Client.Builder#setProxyCredentials` | |
</TabItem>
<TabItem value="timeouts-retry" label="Timeouts & Retry">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseClientOption#MAX_EXECUTION_TIME` | `Client.Builder#setExecutionTimeout` | |
| `ClickHouseClientOption#RETRY` | `Client.Builder#setMaxRetries` | See also `retryOnFailures` |
| `ClickHouseHttpOption#AHC_RETRY_ON_FAILURE` | `Client.Builder#retryOnFailures` | |
| `ClickHouseClientOption#FAILOVER` | ✗ | |
| `ClickHouseClientOption#REPEAT_ON_SESSION_LOCK` | ✗ | |
| `ClickHouseClientOption#SESSION_ID` | ✗ | |
| `ClickHouseClientOption#SESSION_CHECK` | ✗ | |
| `ClickHouseClientOption#SESSION_TIMEOUT` | ✗ | |
</TabItem>
<TabItem value="timezone" label="Timezone">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseDefaults#SERVER_TIME_ZONE`<br/>`ClickHouseClientOption#SERVER_TIME_ZONE` | `Client.Builder#setServerTimeZone` | |
| `ClickHouseClientOption#USE_SERVER_TIME_ZONE` | `Client.Builder#useServerTimeZone` | |
| `ClickHouseClientOption#USE_SERVER_TIME_ZONE_FOR_DATES` | | |
| `ClickHouseClientOption#USE_TIME_ZONE` | `Client.Builder#useTimeZone` | |
</TabItem>
<TabItem value="buffers" label="Buffers & Performance">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseClientOption#BUFFER_SIZE` | `Client.Builder#setClientNetworkBufferSize` | |
| `ClickHouseClientOption#BUFFER_QUEUE_VARIATION` | ✗ | |
| `ClickHouseClientOption#READ_BUFFER_SIZE` | ✗ | |
| `ClickHouseClientOption#WRITE_BUFFER_SIZE` | ✗ | |
| `ClickHouseClientOption#REQUEST_CHUNK_SIZE` | ✗ | |
| `ClickHouseClientOption#REQUEST_BUFFERING` | ✗ | |
| `ClickHouseClientOption#RESPONSE_BUFFERING` | ✗ | |
| `ClickHouseClientOption#MAX_BUFFER_SIZE` | ✗ | |
| `ClickHouseClientOption#MAX_QUEUED_BUFFERS` | ✗ | |
| `ClickHouseClientOption#MAX_QUEUED_REQUESTS` | ✗ | |
| `ClickHouseClientOption#REUSE_VALUE_WRAPPER` | ✗ | |
</TabItem>
<TabItem value="threading" label="Threading & Async">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseDefaults#ASYNC`<br/>`ClickHouseClientOption#ASYNC` | `Client.Builder#useAsyncRequests` | |
| `ClickHouseDefaults#MAX_SCHEDULER_THREADS` | ✗ | see `setSharedOperationExecutor` |
| `ClickHouseDefaults#MAX_THREADS` | ✗ | see `setSharedOperationExecutor` |
| `ClickHouseDefaults#THREAD_KEEPALIVE_TIMEOUT` | see `setSharedOperationExecutor` | |
| `ClickHouseClientOption#MAX_THREADS_PER_CLIENT` | ✗ | |
| `ClickHouseClientOption#MAX_CORE_THREAD_TTL` | ✗ | |
</TabItem>
<TabItem value="http-headers" label="HTTP & Headers">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseHttpOption#CUSTOM_HEADERS` | `Client.Builder#httpHeaders` | |
| `ClickHouseHttpOption#CUSTOM_PARAMS` | ✗ | See `Client.Builder#serverSetting` |
| `ClickHouseClientOption#CLIENT_NAME` | `Client.Builder#setClientName` | |
| `ClickHouseHttpOption#CONNECTION_PROVIDER` | ✗ | |
| `ClickHouseHttpOption#DEFAULT_RESPONSE` | ✗ | |
| `ClickHouseHttpOption#SEND_HTTP_CLIENT_ID` | ✗ | |
| `ClickHouseHttpOption#AHC_VALIDATE_AFTER_INACTIVITY` | ✗ | Always enabled when Apache Http Client is used |
</TabItem>
<TabItem value="format-query" label="Format & Query">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseDefaults#FORMAT`<br/>`ClickHouseClientOption#FORMAT` | ✗ | Moved to operation settings (`QuerySettings` and `InsertSettings`) |
| `ClickHouseClientOption#QUERY_ID` | ✗ | See `QuerySettings` and `InsertSettings` |
| `ClickHouseClientOption#LOG_LEADING_COMMENT` | ✗ | See `QuerySettings#logComment` and `InsertSettings#logComment` |
| `ClickHouseClientOption#MAX_RESULT_ROWS` | ✗ | Is server side setting |
| `ClickHouseClientOption#RESULT_OVERFLOW_MODE` | ✗ | Is server side setting |
| `ClickHouseHttpOption#RECEIVE_QUERY_PROGRESS` | ✗ | Server side setting |
| `ClickHouseHttpOption#WAIT_END_OF_QUERY` | ✗ | Server side setting |
| `ClickHouseHttpOption#REMEMBER_LAST_SET_ROLES` | `Client#setDBRoles` | Runtime config now. See also `QuerySettings#setDBRoles` and `InsertSettings#setDBRoles` |
</TabItem>
<TabItem value="node-discovery" label="Node Discovery & Load Balancing">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseClientOption#AUTO_DISCOVERY` | ✗ | |
| `ClickHouseClientOption#LOAD_BALANCING_POLICY` | ✗ | |
| `ClickHouseClientOption#LOAD_BALANCING_TAGS` | ✗ | |
| `ClickHouseClientOption#HEALTH_CHECK_INTERVAL` | ✗ | |
| `ClickHouseClientOption#HEALTH_CHECK_METHOD` | ✗ | |
| `ClickHouseClientOption#NODE_DISCOVERY_INTERVAL` | ✗ | |
| `ClickHouseClientOption#NODE_DISCOVERY_LIMIT` | ✗ | |
| `ClickHouseClientOption#NODE_CHECK_INTERVAL` | ✗ | |
| `ClickHouseClientOption#NODE_GROUP_SIZE` | ✗ | |
| `ClickHouseClientOption#CHECK_ALL_NODES` | ✗ | |
</TabItem>
<TabItem value="misc" label="Miscellaneous">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseDefaults#AUTO_SESSION` | ✗ | Session support will be reviewed |
| `ClickHouseDefaults#BUFFERING` | ✗ | |
| `ClickHouseDefaults#MAX_REQUESTS` | ✗ | |
| `ClickHouseDefaults#ROUNDING_MODE` | | |
| `ClickHouseDefaults#SERVER_VERSION`<br/>`ClickHouseClientOption#SERVER_VERSION` | `Client.Builder#setServerVersion` | |
| `ClickHouseDefaults#SRV_RESOLVE` | ✗ | |
| `ClickHouseClientOption#CUSTOM_SETTINGS` | | |
| `ClickHouseClientOption#PRODUCT_NAME` | ✗ | Use client name |
| `ClickHouseClientOption#RENAME_RESPONSE_COLUMN` | ✗ | |
| `ClickHouseClientOption#SERVER_REVISION` | ✗ | |
| `ClickHouseClientOption#TRANSACTION_TIMEOUT` | ✗ | |
| `ClickHouseClientOption#WIDEN_UNSIGNED_TYPES` | ✗ | |
| `ClickHouseClientOption#USE_BINARY_STRING` | ✗ | |
| `ClickHouseClientOption#USE_BLOCKING_QUEUE` | ✗ | |
| `ClickHouseClientOption#USE_COMPILATION` | ✗ | |
| `ClickHouseClientOption#USE_OBJECTS_IN_ARRAYS` | ✗ | |
| `ClickHouseClientOption#MAX_MAPPER_CACHE` | ✗ | |
| `ClickHouseClientOption#MEASURE_REQUEST_TIME` | ✗ | |
</TabItem>
</Tabs>
### General Differences
- Client V2 uses less proprietary classes to increase portability. For example, V2 works with any implementation of `java.io.InputStream` for
writing data to a server.
- Client V2 `async` settings is `off` by default. It means no extra threads and more application control over client. This setting should be `off` for majority of use cases. Enabling `async` will create a separate thread for a request. It only make sense when using application controlled
executor (see `com.clickhouse.client.api.Client.Builder#setSharedOperationExecutor`)
### Writing Data
- use any implementation of `java.io.InputStream`. V1 `com.clickhouse.data.ClickHouseInputStream` is supported but NOT recommended.
- once end of input stream is detected it handled accordingly. Previously output stream of a request should be closed.
__V1 Insert TSV formatted data.__
```java
InputStream inData = getInData();
ClickHouseRequest.Mutation request = client.read(server)
.write()
.table(tableName)
.format(ClickHouseFormat.TSV);
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future;
try (ClickHousePipedOutputStream requestBody = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(config)) {
// start the worker thread which transfer data from the input into ClickHouse
future = request.data(requestBody.getInputStream()).execute();
// Copy data from inData stream to requestBody stream
// We need to close the stream before getting a response
requestBody.close();
try (ClickHouseResponse response = future.get()) {
ClickHouseResponseSummary summary = response.getSummary();
Assert.assertEquals(summary.getWrittenRows(), numRows, "Num of written rows");
}
}
V2 以 TSV 格式插入数据。
InputStream inData = getInData();
InsertSettings settings = new InsertSettings().setInputStreamCopyBufferSize(8198 * 2); // set copy buffer size
try (InsertResponse response = client.insert(tableName, inData, ClickHouseFormat.TSV, settings).get(30, TimeUnit.SECONDS)) {
// Insert is complete at this point
} catch (Exception e) {
// Handle exception
}
- 只需调用一个方法,无需额外创建请求对象。
- 当所有数据复制完成后,请求体流会自动关闭。
- new low-level API is available
com.clickhouse.client.api.Client#insert(java.lang.String, java.util.List<java.lang.String>, com.clickhouse.client.api.DataStreamWriter, com.clickhouse.data.ClickHouseFormat, com.clickhouse.client.api.insert.InsertSettings)。com.clickhouse.client.api.DataStreamWriter 用于实现自定义数据写入逻辑,例如从队列读取数据。
读取数据
- 默认情况下,数据以
RowBinaryWithNamesAndTypes 格式读取。当需要进行数据绑定时,目前仅支持此格式。
- 可以使用
List<GenericRecord> com.clickhouse.client.api.Client#queryAll(java.lang.String) 方法将数据读取为一组记录。该方法会将数据读入内存并自动释放连接,无需额外处理。GenericRecord 提供访问结果数据的接口,并实现了一些类型转换。
Collection<GenericRecord> records = client.queryAll("SELECT * FROM table");
for (GenericRecord record : records) {
int rowId = record.getInteger("rowID");
String name = record.getString("name");
LocalDateTime ts = record.getLocalDateTime("ts");
}