用于通过协议与数据库服务器通信的 Java 客户端库。当前实现仅支持 HTTP 接口。
该库提供自己的 API 用于向服务器发送请求。该库还提供用于处理不同二进制数据格式(RowBinary* & Native*)的工具。
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>client-v2</artifactId>
<version>0.9.6</version>
</dependency>
// https://mvnrepository.com/artifact/com.clickhouse/client-v2
implementation("com.clickhouse:client-v2:0.9.6")
// https://mvnrepository.com/artifact/com.clickhouse/client-v2
implementation 'com.clickhouse:client-v2:0.9.6'
初始化
Client 对象由 com.clickhouse.client.api.Client.Builder#build() 初始化。每个客户端都有自己的上下文,客户端之间不共享对象。
Builder 提供了便于配置的方法。
示例:
Client client = new Client.Builder()
.addEndpoint("https://clickhouse-cloud-instance:8443/")
.setUsername(user)
.setPassword(password)
.build();
Client 实现了 AutoCloseable 接口,不再需要时应将其关闭。
身份验证
身份验证在初始化阶段针对每个客户端进行配置。支持三种身份验证方式:基于密码、访问令牌以及 SSL 客户端证书。
使用密码进行身份验证需要通过调用 setUsername(String) 和 setPassword(String) 方法来设置用户名和密码:
Client client = new Client.Builder()
.addEndpoint("https://clickhouse-cloud-instance:8443/")
.setUsername(user)
.setPassword(password)
.build();
使用访问令牌进行身份验证时,需要调用 setAccessToken(String) 来设置访问令牌:
Client client = new Client.Builder()
.addEndpoint("https://clickhouse-cloud-instance:8443/")
.setAccessToken(userAccessToken)
.build();
通过 SSL 客户端证书进行身份验证需要设置用户名、启用 SSL 身份验证、设置客户端证书和客户端密钥,方法是分别调用 setUsername(String)、useSSLAuthentication(boolean)、setClientCertificate(String) 和 setClientKey(String):
Client client = new Client.Builder()
.useSSLAuthentication(true)
.setUsername("some_user")
.setClientCertificate("some_user.crt")
.setClientKey("some_user.key")
注意
SSL 身份验证在生产环境中可能难以排查,因为许多来自 SSL 库的错误信息不足以提供详细信息。例如,如果客户端证书与私钥不匹配,服务器会立即终止连接(对于 HTTP 来说,这发生在连接初始化阶段,此时尚未发送任何 HTTP 请求,因此不会有任何响应)。
请使用诸如 openssl 之类的工具来验证证书和密钥:
- 检查私钥完整性:
openssl rsa -in [key-file.key] -check -noout
- 检查客户端证书中的 CN 是否与相应的 USER 匹配:
- 从用户证书中获取 CN -
openssl x509 -noout -subject -in [user.cert]
- 确认数据库中已设置相同的值:
select name, auth_type, auth_params from system.users where auth_type = 'ssl_certificate'(查询会输出 auth_params 字段,其内容类似 {"common_names":["some_user"]})
所有设置均通过实例方法(即配置方法)定义,使每个值的作用域和上下文清晰明确。
主要配置参数在单一作用域中定义(客户端或操作),且不会相互覆盖。
配置在创建客户端时定义。请参阅 com.clickhouse.client.api.Client.Builder。
客户端配置
| Method | Arguments | Description | Default | Key |
|---|
addEndpoint(String endpoint) | endpoint - URL 格式的服务器地址 | 将服务器端点添加到可用服务器列表。目前仅支持一个端点。 | none | none |
addEndpoint(Protocol protocol, String host, int port, boolean secure) | protocol - 连接协议
host - IP 或主机名
secure - 使用 HTTPS | 将服务器端点添加到可用服务器列表。目前仅支持一个端点。 | none | none |
enableConnectionPool(boolean enable) | enable - 启用/禁用标志 | 设置是否启用连接池 | true | connection_pool_enabled |
setMaxConnections(int maxConnections) | maxConnections - 连接数量 | 设置客户端对每个服务器端点最多可以打开的连接数。 | 10 | max_open_connections |
setConnectionTTL(long timeout, ChronoUnit unit) | timeout - 超时值
unit - 时间单位 | 设置连接的生存时间 (TTL),超过该时间后连接将被视为不再活动 | -1 | connection_ttl |
setKeepAliveTimeout(long timeout, ChronoUnit unit) | timeout - 超时值
unit - 时间单位 | 设置 HTTP 连接的 Keep-Alive 超时时间。设置为 0 以禁用 Keep-Alive。 | - | http_keep_alive_timeout |
setConnectionReuseStrategy(ConnectionReuseStrategy strategy) | strategy - LIFO 或 FIFO | 选择连接池应使用的连接复用策略 | FIFO | connection_reuse_strategy |
setDefaultDatabase(String database) | database - 数据库名称 | 设置默认数据库。 | default | database |
| Method | Arguments | Description | Default | Key |
|---|
setUsername(String username) | username - 用于认证的用户名 | 为将通过后续配置选择的认证方式设置用户名 | default | user |
setPassword(String password) | password - 机密值 | 为密码认证设置机密值,并实际选择密码认证作为认证方式 | - | password |
setAccessToken(String accessToken) | accessToken - 访问令牌字符串 | 设置访问令牌,以通过相应的认证方式进行身份验证 | - | access_token |
useSSLAuthentication(boolean useSSLAuthentication) | useSSLAuthentication - 启用 SSL 认证的标志 | 将 SSL 客户端证书设置为一种认证方式。 | - | ssl_authentication |
useHTTPBasicAuth(boolean useBasicAuth) | useBasicAuth - 启用/禁用的标志 | 设置在用户密码认证中是否应使用基本 HTTP 认证。可解决包含特殊字符的密码所导致的问题。 | true | http_use_basic_auth |
useBearerTokenAuth(String bearerToken) | bearerToken - 编码后的 Bearer 令牌 | 指定是否使用 Bearer 认证以及要使用的令牌。令牌将按原样发送。 | - | bearer_token |
| Method | Arguments | Description | Default | Key |
|---|
setConnectTimeout(long timeout, ChronoUnit unit) | timeout - 超时时间值
unit - 时间单位 | 设置所有出站连接在建立连接时的超时时间。 | - | connection_timeout |
setConnectionRequestTimeout(long timeout, ChronoUnit unit) | timeout - 超时时间值
unit - 时间单位 | 设置连接请求超时时间。仅在从连接池获取连接时生效。 | 10000 | connection_request_timeout |
setSocketTimeout(long timeout, ChronoUnit unit) | timeout - 超时时间值
unit - 时间单位 | 设置用于读写操作的 socket 超时时间 | 0 | socket_timeout |
setExecutionTimeout(long timeout, ChronoUnit timeUnit) | timeout - 超时时间值
timeUnit - 时间单位 | 设置查询的最大执行超时时间 | 0 | max_execution_time |
retryOnFailures(ClientFaultCause ...causes) | causes - ClientFaultCause 的枚举常量 | 设置可恢复/可重试的故障类型。 | NoHttpResponse ConnectTimeout ConnectionRequestTimeout | client_retry_on_failures |
setMaxRetries(int maxRetries) | maxRetries - 重试次数 | 设置针对由 retryOnFailures 定义的故障的最大重试次数。 | 3 | retry |
| Method | Arguments | Description | Default | Key |
|---|
setSocketRcvbuf(long size) | size - 以字节为单位的大小 | 设置 TCP socket 接收缓冲区。该缓冲区位于 JVM 内存之外(堆外内存)。 | 8196 | socket_rcvbuf |
setSocketSndbuf(long size) | size - 以字节为单位的大小 | 设置 TCP socket 发送缓冲区。该缓冲区位于 JVM 内存之外(堆外内存)。 | 8196 | socket_sndbuf |
setSocketKeepAlive(boolean value) | value - 启用/禁用的标志 | 为每个 TCP socket 设置 SO_KEEPALIVE 选项。TCP Keep Alive 启用用于检查连接是否存活的机制。 | - | socket_keepalive |
setSocketTcpNodelay(boolean value) | value - 启用/禁用的标志 | 为每个 TCP socket 设置 SO_NODELAY 选项。该 TCP 选项会使 socket 尽可能立即发送数据。 | - | socket_tcp_nodelay |
setSocketLinger(int secondsToWait) | secondsToWait - 秒数 | 为客户端创建的每个 TCP socket 设置延迟关闭时间(linger time)。 | - | socket_linger |
| Method | Arguments | Description | Default | Key |
|---|
compressServerResponse(boolean enabled) | enabled - 启用/禁用标志 | 设置服务器是否应对其响应进行压缩。 | true | compress |
compressClientRequest(boolean enabled) | enabled - 启用/禁用标志 | 设置客户端是否应对其请求进行压缩。 | false | decompress |
useHttpCompression(boolean enabled) | enabled - 启用/禁用标志 | 设置在相应选项启用时,客户端与服务器通信时是否应使用 HTTP 压缩。 | - | - |
appCompressedData(boolean enabled) | enabled - 启用/禁用标志 | 告知客户端压缩将由应用程序负责处理。 | false | app_compressed_data |
setLZ4UncompressedBufferSize(int size) | size - 大小(字节) | 设置用于接收数据流未压缩部分的缓冲区大小。 | 65536 | compression.lz4.uncompressed_buffer_size |
disableNativeCompression | disable - 禁用标志 | 禁用原生压缩。如果设为 true,则会禁用原生压缩。 | false | disable_native_compression |
| Method | Arguments | Description | Default | Key |
|---|
setSSLTrustStore(String path) | path - 本地系统上的文件路径 | 配置客户端是否使用 SSL truststore 来对服务器主机进行验证。 | - | trust_store |
setSSLTrustStorePassword(String password) | password - 机密值 | 设置用于解锁由 setSSLTrustStore 指定的 SSL truststore 的密码。 | - | key_store_password |
setSSLTrustStoreType(String type) | type - truststore 类型名称 | 设置由 setSSLTrustStore 指定的 truststore 类型。 | - | key_store_type |
setRootCertificate(String path) | path - 本地系统上的文件路径 | 配置客户端是否使用指定的根 (CA) 证书来验证服务器主机。 | - | sslrootcert |
setClientCertificate(String path) | path - 本地系统上的文件路径 | 设置在发起 SSL 连接以及进行 SSL 认证时要使用的客户端证书路径。 | - | sslcert |
setClientKey(String path) | path - 本地系统上的文件路径 | 设置用于与服务器进行 SSL 加密通信的客户端私钥。 | - | ssl_key |
sslSocketSNI(String sni) | sni - 服务器名称字符串 | 设置在 SSL/TLS 连接中用于 SNI(Server Name Indication,服务器名称指示)的服务器名称。 | - | ssl_socket_sni |
| 方法 | 参数 | 描述 | 默认值 | 键 |
|---|
addProxy(ProxyType type, String host, int port) | type - 代理类型
host - 代理主机名或 IP
port - 代理端口 | 设置用于与服务器通信的代理。 | - | proxy_type, proxy_host, proxy_port |
setProxyCredentials(String user, String pass) | user - 代理用户名
pass - 密码 | 设置用于通过代理进行身份验证的用户凭据。 | - | proxy_user, proxy_password |
| Method | Arguments | Description | Default | Key |
|---|
setHttpCookiesEnabled(boolean enabled) | enabled - 用于启用/禁用的标志位 | 设置是否应记住 HTTP cookie 并将其发送回服务器。 | - | - |
httpHeader(String key, String value) | key - HTTP 头键
value - 字符串值 | 为单个 HTTP 头设置值。之前的值将被覆盖。 | none | none |
httpHeader(String key, Collection values) | key - HTTP 头键
values - 字符串值列表 | 为单个 HTTP 头设置多个值。之前的值将被覆盖。 | none | none |
httpHeaders(Map headers) | headers - 包含 HTTP 头的映射 | 一次性设置多个 HTTP 头的值。 | none | none |
| Method | Arguments | Description | Default | Key |
|---|
serverSetting(String name, String value) | name - 设置名称
value - 设置值 | 设置随每个查询一起发送到服务器的设置。单次操作的设置可以覆盖它。设置列表 | none | none |
serverSetting(String name, Collection values) | name - 设置名称
values - 设置值 | 设置随每个查询一起发送到服务器的、包含多个值的设置,例如 roles | none | none |
| Method | Arguments | Description | Default | Key |
|---|
useServerTimeZone(boolean useServerTimeZone) | useServerTimeZone - 启用/禁用开关 | 设置在解码 DateTime 和 Date 列值时客户端是否应使用服务器时区。 | true | use_server_time_zone |
useTimeZone(String timeZone) | timeZone - 有效的 Java 时区 ID | 设置在解码 DateTime 和 Date 列值时是否应使用指定的时区。将覆盖服务器时区。 | - | use_time_zone |
setServerTimeZone(String timeZone) | timeZone - 有效的 Java 时区 ID | 设置服务器端时区。默认使用 UTC 时区。 | UTC | server_time_zone |
| Method | Arguments | Description | Default | Key |
|---|
setOption(String key, String value) | key - 配置选项键
value - 选项值 | 设置客户端选项的原始值。适用于从属性配置文件读取配置的场景。 | - | - |
useAsyncRequests(boolean async) | async - 启用/禁用标志 | 设置客户端是否应在单独线程中执行请求。默认禁用,因为应用程序更清楚如何组织多线程任务。 | false | async |
setSharedOperationExecutor(ExecutorService executorService) | executorService - ExecutorService 实例 | 为操作任务设置 ExecutorService。 | none | none |
setClientNetworkBufferSize(int size) | size - 以字节为单位的大小 | 设置应用程序内存空间中缓冲区的大小,该缓冲区用于在 socket 与应用程序之间复制数据。 | 300000 | client_network_buffer_size |
allowBinaryReaderToReuseBuffers(boolean reuse) | reuse - 启用/禁用标志 | 如果启用,读取器将使用预分配的缓冲区执行数值转码。可降低数值数据的 GC 压力。 | - | - |
columnToMethodMatchingStrategy(ColumnToMethodMatchingStrategy strategy) | strategy - 匹配策略实现 | 设置在注册 DTO 时用于匹配 DTO 类字段和数据库列的自定义策略。 | none | none |
setClientName(String clientName) | clientName - 应用程序名称字符串 | 设置关于调用方应用程序的附加信息。该信息将作为 User-Agent 头传递。 | - | client_name |
registerClientMetrics(Object registry, String name) | registry - Micrometer registry 实例
name - 指标组名称 | 向 Micrometer (https://micrometer.io/) registry 实例注册指标采集器。 | - | - |
setServerVersion(String version) | version - 服务器版本字符串 | 设置服务器版本以避免版本自动检测。 | - | server_version |
typeHintMapping(Map typeHintMapping) | typeHintMapping - 类型提示映射 | 为 ClickHouse 类型设置类型提示映射。例如,使多维数组可以以 Java 容器的形式呈现。 | - | type_hint_mapping |
服务器设置
服务器端设置可以在客户端创建时在客户端级别配置一次(参见 Builder 的 serverSetting 方法),也可以在操作级别进行配置(参见操作设置类的 serverSetting)。
try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false)
.setUsername("default")
.setPassword(ClickHouseServerForTest.getPassword())
.compressClientRequest(true)
// Client level
.serverSetting("max_threads", "10")
.serverSetting("async_insert", "1")
.serverSetting("roles", Arrays.asList("role1", "role2"))
.build()) {
// Operation level
QuerySettings querySettings = new QuerySettings();
querySettings.serverSetting("session_timezone", "Europe/Zurich");
...
}
⚠️ 当通过 setOption 方法设置选项时(无论是在 Client.Builder 还是在操作设置类中),对应的服务器设置名称都必须加上 clickhouse_setting_ 前缀。此时可以借助 com.clickhouse.client.api.ClientConfigProperties#serverSetting() 方法。
可以为所有操作(客户端级别)或单个操作(操作级别)设置自定义 HTTP 头。
QuerySettings settings = new QuerySettings()
.httpHeader(HttpHeaders.REFERER, clientReferer)
.setQueryId(qId);
当通过 setOption 方法设置选项时(无论是 Client.Builder 还是操作设置类),自定义请求头名称应以 http_header_ 为前缀。此时,可以使用 com.clickhouse.client.api.ClientConfigProperties#httpHeader() 方法。
常用术语定义
支持的格式 的枚举类型,包含 ClickHouse 支持的所有格式。
raw - 需要由用户自行对原始数据进行转码
full - 客户端可自行完成数据转码,并接受原始数据流
- - 表示 ClickHouse 不支持针对该格式执行此操作
此客户端版本支持:
插入 API
以指定格式的字节流(InputStream)接收数据。假定 data 按 format 进行编码。
签名
CompletableFuture<InsertResponse> insert(String tableName, InputStream data, ClickHouseFormat format, InsertSettings settings)
CompletableFuture<InsertResponse> insert(String tableName, InputStream data, ClickHouseFormat format)
参数
tableName - 目标表名。
data - 编码后数据的输入流。
format - 数据所采用的编码格式。
settings - 请求设置。
返回值
类型为 InsertResponse 的 Future —— 操作结果以及服务器端指标等附加信息。
示例
try (InputStream dataStream = getDataStream()) {
try (InsertResponse response = client.insert(TABLE_NAME, dataStream, ClickHouseFormat.JSONEachRow,
insertSettings).get(3, TimeUnit.SECONDS)) {
log.info("Insert finished: {} rows written", response.getMetrics().getMetric(ServerMetrics.NUM_ROWS_WRITTEN).getLong());
} catch (Exception e) {
log.error("Failed to write JSONEachRow data", e);
throw new RuntimeException(e);
}
}
insert(String tableName, List<?> data, InsertSettings settings)
向数据库发送写入请求。对象列表会被转换为高效的内部格式,然后发送到服务器。列表项所属的类应事先使用 register(Class, TableSchema) 方法进行注册。
签名
client.insert(String tableName, List<?> data, InsertSettings settings)
client.insert(String tableName, List<?> data)
参数
tableName - 目标表名称。
data - DTO(数据传输对象)集合。
settings - 请求设置。
返回值
返回 InsertResponse 类型的 Future,包含操作结果以及服务器端指标等附加信息。
示例
// Important step (done once) - register class to pre-compile object serializer according to the table schema.
client.register(ArticleViewEvent.class, client.getTableSchema(TABLE_NAME));
List<ArticleViewEvent> events = loadBatch();
try (InsertResponse response = client.insert(TABLE_NAME, events).get()) {
// handle response, then it will be closed and connection that served request will be released.
}
InsertSettings
用于插入操作的配置项。
配置方法
| 方法 | 描述 |
|---|
setQueryId(String queryId) | 设置要分配给该操作的查询 ID。默认值:null。 |
setDeduplicationToken(String token) | 设置去重令牌。该令牌将发送到服务器,并可用于标识此查询。默认值:null。 |
setInputStreamCopyBufferSize(int size) | 拷贝缓冲区大小。在写入操作期间,该缓冲区用于将用户提供的输入流中的数据复制到输出流。默认值:8196。 |
serverSetting(String name, String value) | 为单次操作设置单个服务器端配置项。 |
serverSetting(String name, Collection values) | 为某个操作设置可包含多个值的单个服务器配置项。集合中的元素应为 String 值。 |
setDBRoles(Collection dbRoles) | 设置在执行操作前要启用的数据库角色。集合中的元素应为 String 值。 |
setOption(String option, Object value) | 以原始格式设置配置选项。这不是服务器端设置。 |
InsertResponse
用于保存插入操作结果的响应对象。仅当客户端收到来自服务器的响应时才可用。
注意
应尽快关闭此对象以释放连接,因为在完全读取完前一个响应的所有数据之前,该连接无法被重用。
| 方法 | 说明 |
|---|
OperationMetrics getMetrics() | 返回一个包含该操作各项指标的对象。 |
String getQueryId() | 返回为该操作分配的查询 ID,该 ID 由应用程序(通过操作设置)指定或由服务器生成。 |
查询 API
query(String sqlQuery)
按原样发送 sqlQuery。响应格式由查询设置决定。QueryResponse 将保存对响应流的引用,该响应流应由支持该格式的读取器消费。
签名
CompletableFuture<QueryResponse> query(String sqlQuery, QuerySettings settings)
CompletableFuture<QueryResponse> query(String sqlQuery)
参数
sqlQuery - 单条 SQL 语句。查询会按原样发送到服务器。
settings - 请求设置。
返回值
QueryResponse 类型的 Future - 结果数据集及服务器端指标等附加信息。使用完数据集后应关闭 Response 对象。
示例
final String sql = "select * from " + TABLE_NAME + " where title <> '' limit 10";
// Default format is RowBinaryWithNamesAndTypesFormatReader so reader have all information about columns
try (QueryResponse response = client.query(sql).get(3, TimeUnit.SECONDS);) {
// Create a reader to access the data in a convenient way
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);
while (reader.hasNext()) {
reader.next(); // Read the next record from stream and parse it
// get values
double id = reader.getDouble("id");
String title = reader.getString("title");
String url = reader.getString("url");
// collecting data
}
} catch (Exception e) {
log.error("Failed to read data", e);
}
// put business logic outside of the reading block to release http connection asap.
query(String sqlQuery, Map<String, Object> queryParams, QuerySettings settings)
按原样发送 sqlQuery。同时会发送查询参数,以便服务器能够编译该 SQL 表达式。
签名
CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Object> queryParams, QuerySettings settings)
参数
sqlQuery - 包含 {} 占位符的 SQL 表达式。
queryParams - 变量映射,用于在服务器端完成 SQL 表达式。
settings - 请求设置。
返回值
QueryResponse 类型的 Future——包含结果数据集以及诸如服务器端指标之类的附加信息。使用完数据集后应关闭 Response 对象。
示例
// define parameters. They will be sent to the server along with the request.
Map<String, Object> queryParams = new HashMap<>();
queryParams.put("param1", 2);
try (QueryResponse response =
client.query("SELECT * FROM " + table + " WHERE col1 >= {param1:UInt32}", queryParams, new QuerySettings()).get()) {
// Create a reader to access the data in a convenient way
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);
while (reader.hasNext()) {
reader.next(); // Read the next record from stream and parse it
// reading data
}
} catch (Exception e) {
log.error("Failed to read data", e);
}
queryAll(String sqlQuery)
以 RowBinaryWithNamesAndTypes 格式查询数据。结果以集合返回。读取性能与使用 reader 相同,但需要更多内存来保存整个数据集。
签名
List<GenericRecord> queryAll(String sqlQuery)
参数
sqlQuery - 用于从服务器查询数据的 SQL 表达式。
返回值
完整数据集由 GenericRecord 对象列表表示,这些对象以行形式提供对结果数据的访问。
示例
try {
log.info("Reading whole table and process record by record");
final String sql = "select * from " + TABLE_NAME + " where title <> ''";
// Read whole result set and process it record by record
client.queryAll(sql).forEach(row -> {
double id = row.getDouble("id");
String title = row.getString("title");
String url = row.getString("url");
log.info("id: {}, title: {}, url: {}", id, title, url);
});
} catch (Exception e) {
log.error("Failed to read data", e);
}
QuerySettings
查询操作的配置选项。
配置方法
| 方法 | 说明 |
|---|
setQueryId(String queryId) | 为该操作设置要分配的查询 ID。 |
setFormat(ClickHouseFormat format) | 设置响应格式。完整列表请参见 RowBinaryWithNamesAndTypes。 |
setMaxExecutionTime(Integer maxExecutionTime) | 在服务器端设置该操作的最大执行时间。不会影响读取超时设置。 |
waitEndOfQuery(Boolean waitEndOfQuery) | 请求服务器在发送响应前等待查询完成。 |
setUseServerTimeZone(Boolean useServerTimeZone) | 将使用服务器时区(参见客户端配置)来解析操作结果中的日期/时间类型。默认值为 false。 |
setUseTimeZone(String timeZone) | 请求服务器在时间转换时使用 timeZone。参见 session_timezone。 |
serverSetting(String name, String value) | 为单次操作设置单个服务器配置项。 |
serverSetting(String name, Collection values) | 为某个操作设置包含多个值的单个服务器配置项。集合中的元素必须是 String 类型的值。 |
setDBRoles(Collection dbRoles) | 设置在执行操作前要应用的数据库角色。集合中的元素应为 String 值。 |
setOption(String option, Object value) | 以原始格式设置配置选项。这不是服务器级设置。 |
QueryResponse
用于保存查询执行结果的响应对象。仅当客户端从服务器收到响应时才可用。
注意
应尽快关闭此对象以释放连接,因为在完全读取完上一个响应的所有数据之前,连接无法被重用。
| 方法 | 说明 |
|---|
ClickHouseFormat getFormat() | 返回响应数据的编码格式。 |
InputStream getInputStream() | 返回按指定格式编码的未压缩数据字节流。 |
OperationMetrics getMetrics() | 返回一个包含该操作各项指标的对象。 |
String getQueryId() | 返回为该操作分配的查询 ID,该 ID 由应用程序(通过操作设置)指定或由服务器生成。 |
TimeZone getTimeZone() | 返回用于解析响应中 Date/DateTime 类型的时区。 |
通用 API
getTableSchema(String table)
获取 table 的表结构。
签名
TableSchema getTableSchema(String table)
TableSchema getTableSchema(String table, String database)
参数
table - 需要获取其 schema 数据的表名。
database - 目标表所在的数据库。
返回值
返回一个包含表的列列表的 TableSchema 对象。
getTableSchemaFromQuery(String sql)
从 SQL 语句中获取 schema 信息。
方法签名
TableSchema getTableSchemaFromQuery(String sql)
参数
sql - 用于返回其 schema 的 "SELECT" SQL 语句。
返回值
返回一个与 sql 表达式匹配列的 TableSchema 对象。
TableSchema
register(Class<?> clazz, TableSchema schema)
为 Java 类编译一个序列化/反序列化层,以便在基于 schema 进行数据读写时使用。该方法将为 getter/setter 方法对及其对应的列创建序列化器和反序列化器。
列的匹配是通过从方法名中提取列名来完成的。例如,getFirstName 将对应列 first_name 或 firstname。
方法签名
void register(Class<?> clazz, TableSchema schema)
参数
clazz - 表示用于读取/写入数据的 POJO 的 Class。
schema - 用于与 POJO 属性进行匹配的数据模式。
示例
client.register(ArticleViewEvent.class, client.getTableSchema(TABLE_NAME));
使用示例
完整示例代码存放在代码仓库的 'example` 文件夹 中:
读取数据
读取数据的常用方式有两种:
query() 方法会返回底层的 QueryResponse 对象,该对象包含承载数据的 InputStream。通常与 ClickHouseBinaryFormatReader 组合使用以进行流式读取,但也可以配合任何其他自定义 reader 实现使用。QueryResponse 还提供对结果集元数据和指标的访问能力。
queryAll() 方法并使用 GenericRecord 以便按行方便访问数据。在这种情况下,整个结果集会被一次性加载到内存中。
queryRecords() 方法返回 com.clickhouse.client.api.query.Records —— 一个用于迭代 GenericRecord 对象的迭代器。该方法采用流式处理方式(不会将全部数据加载到内存中),并利用 GenericRecord 来访问数据。
注意: 流式方法需要快速读取,否则可能导致服务器写入超时,因为数据是直接从网络流中读取的。
读取数组
ClickHouseBinaryFormatReader 方法
getList(...) - 将任意 Array(...) 读取为 List<T>。适合作为类型灵活读取的默认选项。支持嵌套数组。
getByteArray(...), getShortArray(...), getIntArray(...), getLongArray(...), getFloatArray(...), getDoubleArray(...), getBooleanArray(...) - 最适用于由与基本类型兼容的值构成的一维数组。
getStringArray(...) - 用于 Array(String)(以及以名称形式表示的枚举值)。
getObjectArray(...) - 适用于任意 Array(...) 元素类型(包括嵌套数组)的通用选项。用于读取包含 Nullable 值以及嵌套数组的数组。
所有方法都提供基于索引和基于名称的重载。索引从 1 开始计数。基于索引的重载会直接访问对应的列。
基于名称的方法每次调用都需要执行一次索引查找。
try (QueryResponse response = client.query("SELECT * FROM my_table").get()) {
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);
while (reader.next() != null) {
Object[] uint64 = reader.getObjectArray("uint64_arr"); // Array(UInt64) -> BigInteger[]
Object[] arr2d = reader.getObjectArray("arr2d"); // Array(Array(Int64)) -> Object[]
// nested arrays are returned as nested Object[]:
Object[] firstInner = (Object[]) arr2d[0];
Long firstValue = (Long) firstInner[0];
}
}
GenericRecord 方法
getList(...) - 将任意 Array(...) 解析为 List<T>。适合作为灵活类型读取的默认选项,支持嵌套数组。
getByteArray(...), getShortArray(...), getIntArray(...), getLongArray(...), getFloatArray(...), getDoubleArray(...), getBooleanArray(...) - 最适用于由与原始类型兼容的值组成的一维数组。
getStringArray(...) - 用于 Array(String)(以及以名称表示的枚举类型值)。
getObjectArray(...) - 针对任意 Array(...) 元素类型(包括嵌套数组)的通用选项。用于读取包含 Nullable 值以及嵌套数组的数组。
对于所有方法,都提供基于索引和基于名称的重载。索引从 1 开始。基于索引的重载直接访问列。
基于名称的方法每次都需要进行索引查找。
try (QueryResponse response = client.query("SELECT * FROM my_table").get()) {
List<GenericRecord> rows = client.queryAll(
"SELECT int_arr, arr2d_nullable FROM test_arrays ORDER BY id");
for (GenericRecord row : rows) {
Object[] intArr = row.getObjectArray("int_arr"); // Array(Int32) -> Integer[]
Object[] arr2d = row.getObjectArray("arr2d_nullable"); // Array(Array(Nullable(Int32)))
Object[] inner = (Object[]) arr2d[0];
Object maybeNull = inner[1]; // may be null
}
}
## Migration Guide
Old client (V1) was using `com.clickhouse.client.ClickHouseClient#builder` as start point. The new client (V2) uses similar pattern with `com.clickhouse.client.api.Client.Builder`. Main
differences are:
- no service loader is used to grab implementation. The `com.clickhouse.client.api.Client` is facade class for all kinds of implementation in the future.
- a fewer sources of configuration: one is provided to the builder and one is with operation settings (`QuerySettings`, `InsertSettings`). Previous version had configuration per node and was loading
env. variables in some cases.
### Configuration Parameters Match
There are 3 enum classes related to configuration in V1:
- `com.clickhouse.client.config.ClickHouseDefaults` - configuration parameters that supposed to be set in most use cases. Like `USER` and `PASSWORD`.
- `com.clickhouse.client.config.ClickHouseClientOption` - configuration parameters specific for the client. Like `HEALTH_CHECK_INTERVAL`.
- `com.clickhouse.client.http.config.ClickHouseHttpOption` - configuration parameters specific for HTTP interface. Like `RECEIVE_QUERY_PROGRESS`.
They were designed to group parameters and provide clear separation. However in some cases it lead to a confusion (is there a difference between `com.clickhouse.client.config.ClickHouseDefaults#ASYNC` and
`com.clickhouse.client.config.ClickHouseClientOption#ASYNC`). The new V2 client uses `com.clickhouse.client.api.Client.Builder` as single dictionary of all possible client configuration options.There is
`com.clickhouse.client.api.ClientConfigProperties` where all configuration parameter names are listed.
Table below shows what old options are supported in the new client and their new meaning.
**Legend:** ✔ = supported, ✗ = dropped
<Tabs groupId="v1-migration">
<TabItem value="connection-auth" label="Connection & Auth">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseDefaults#HOST` | `Client.Builder#addEndpoint` | |
| `ClickHouseDefaults#PROTOCOL` | ✗ | Only HTTP supported in V2 |
| `ClickHouseDefaults#DATABASE`<br/>`ClickHouseClientOption#DATABASE` | `Client.Builder#setDefaultDatabase` | |
| `ClickHouseDefaults#USER` | `Client.Builder#setUsername` | |
| `ClickHouseDefaults#PASSWORD` | `Client.Builder#setPassword` | |
| `ClickHouseClientOption#CONNECTION_TIMEOUT` | `Client.Builder#setConnectTimeout` | |
| `ClickHouseClientOption#CONNECTION_TTL` | `Client.Builder#setConnectionTTL` | |
| `ClickHouseHttpOption#MAX_OPEN_CONNECTIONS` | `Client.Builder#setMaxConnections` | |
| `ClickHouseHttpOption#KEEP_ALIVE`<br/>`ClickHouseHttpOption#KEEP_ALIVE_TIMEOUT` | `Client.Builder#setKeepAliveTimeout` | |
| `ClickHouseHttpOption#CONNECTION_REUSE_STRATEGY` | `Client.Builder#setConnectionReuseStrategy` | |
| `ClickHouseHttpOption#USE_BASIC_AUTHENTICATION` | `Client.Builder#useHTTPBasicAuth` | |
</TabItem>
<TabItem value="ssl" label="SSL & Security">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseDefaults#SSL_CERTIFICATE_TYPE` | ✗ | |
| `ClickHouseDefaults#SSL_KEY_ALGORITHM` | ✗ | |
| `ClickHouseDefaults#SSL_PROTOCOL` | ✗ | |
| `ClickHouseClientOption#SSL` | ✗ | See `Client.Builder#addEndpoint` |
| `ClickHouseClientOption#SSL_MODE` | ✗ | |
| `ClickHouseClientOption#SSL_ROOT_CERTIFICATE` | `Client.Builder#setRootCertificate` | SSL Auth should be enabled by `useSSLAuthentication` |
| `ClickHouseClientOption#SSL_CERTIFICATE` | `Client.Builder#setClientCertificate` | |
| `ClickHouseClientOption#SSL_KEY` | `Client.Builder#setClientKey` | |
| `ClickHouseClientOption#KEY_STORE_TYPE` | `Client.Builder#setSSLTrustStoreType` | |
| `ClickHouseClientOption#TRUST_STORE` | `Client.Builder#setSSLTrustStore` | |
| `ClickHouseClientOption#KEY_STORE_PASSWORD` | `Client.Builder#setSSLTrustStorePassword` | |
| `ClickHouseClientOption#SSL_SOCKET_SNI` | `Client.Builder#sslSocketSNI` | |
| `ClickHouseClientOption#CUSTOM_SOCKET_FACTORY` | ✗ | |
| `ClickHouseClientOption#CUSTOM_SOCKET_FACTORY_OPTIONS` | ✗ | See `Client.Builder#sslSocketSNI` to set SNI |
</TabItem>
<TabItem value="socket" label="Socket Options">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseClientOption#SOCKET_TIMEOUT` | `Client.Builder#setSocketTimeout` | |
| `ClickHouseClientOption#SOCKET_REUSEADDR` | `Client.Builder#setSocketReuseAddress` | |
| `ClickHouseClientOption#SOCKET_KEEPALIVE` | `Client.Builder#setSocketKeepAlive` | |
| `ClickHouseClientOption#SOCKET_LINGER` | `Client.Builder#setSocketLinger` | |
| `ClickHouseClientOption#SOCKET_IP_TOS` | ✗ | |
| `ClickHouseClientOption#SOCKET_TCP_NODELAY` | `Client.Builder#setSocketTcpNodelay` | |
| `ClickHouseClientOption#SOCKET_RCVBUF` | `Client.Builder#setSocketRcvbuf` | |
| `ClickHouseClientOption#SOCKET_SNDBUF` | `Client.Builder#setSocketSndbuf` | |
</TabItem>
<TabItem value="compression" label="Compression">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseClientOption#COMPRESS` | `Client.Builder#compressServerResponse` | See also `useHttpCompression` |
| `ClickHouseClientOption#DECOMPRESS` | `Client.Builder#compressClientRequest` | See also `useHttpCompression` |
| `ClickHouseClientOption#COMPRESS_ALGORITHM` | ✗ | `LZ4` for non-http. Http uses `Accept-Encoding` |
| `ClickHouseClientOption#DECOMPRESS_ALGORITHM` | ✗ | `LZ4` for non-http. Http uses `Content-Encoding` |
| `ClickHouseClientOption#COMPRESS_LEVEL` | ✗ | |
| `ClickHouseClientOption#DECOMPRESS_LEVEL` | ✗ | |
</TabItem>
<TabItem value="proxy" label="Proxy">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseClientOption#PROXY_TYPE` | `Client.Builder#addProxy` | |
| `ClickHouseClientOption#PROXY_HOST` | `Client.Builder#addProxy` | |
| `ClickHouseClientOption#PROXY_PORT` | `Client.Builder#addProxy` | |
| `ClickHouseClientOption#PROXY_USERNAME` | `Client.Builder#setProxyCredentials` | |
| `ClickHouseClientOption#PROXY_PASSWORD` | `Client.Builder#setProxyCredentials` | |
</TabItem>
<TabItem value="timeouts-retry" label="Timeouts & Retry">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseClientOption#MAX_EXECUTION_TIME` | `Client.Builder#setExecutionTimeout` | |
| `ClickHouseClientOption#RETRY` | `Client.Builder#setMaxRetries` | See also `retryOnFailures` |
| `ClickHouseHttpOption#AHC_RETRY_ON_FAILURE` | `Client.Builder#retryOnFailures` | |
| `ClickHouseClientOption#FAILOVER` | ✗ | |
| `ClickHouseClientOption#REPEAT_ON_SESSION_LOCK` | ✗ | |
| `ClickHouseClientOption#SESSION_ID` | ✗ | |
| `ClickHouseClientOption#SESSION_CHECK` | ✗ | |
| `ClickHouseClientOption#SESSION_TIMEOUT` | ✗ | |
</TabItem>
<TabItem value="timezone" label="Timezone">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseDefaults#SERVER_TIME_ZONE`<br/>`ClickHouseClientOption#SERVER_TIME_ZONE` | `Client.Builder#setServerTimeZone` | |
| `ClickHouseClientOption#USE_SERVER_TIME_ZONE` | `Client.Builder#useServerTimeZone` | |
| `ClickHouseClientOption#USE_SERVER_TIME_ZONE_FOR_DATES` | | |
| `ClickHouseClientOption#USE_TIME_ZONE` | `Client.Builder#useTimeZone` | |
</TabItem>
<TabItem value="buffers" label="Buffers & Performance">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseClientOption#BUFFER_SIZE` | `Client.Builder#setClientNetworkBufferSize` | |
| `ClickHouseClientOption#BUFFER_QUEUE_VARIATION` | ✗ | |
| `ClickHouseClientOption#READ_BUFFER_SIZE` | ✗ | |
| `ClickHouseClientOption#WRITE_BUFFER_SIZE` | ✗ | |
| `ClickHouseClientOption#REQUEST_CHUNK_SIZE` | ✗ | |
| `ClickHouseClientOption#REQUEST_BUFFERING` | ✗ | |
| `ClickHouseClientOption#RESPONSE_BUFFERING` | ✗ | |
| `ClickHouseClientOption#MAX_BUFFER_SIZE` | ✗ | |
| `ClickHouseClientOption#MAX_QUEUED_BUFFERS` | ✗ | |
| `ClickHouseClientOption#MAX_QUEUED_REQUESTS` | ✗ | |
| `ClickHouseClientOption#REUSE_VALUE_WRAPPER` | ✗ | |
</TabItem>
<TabItem value="threading" label="Threading & Async">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseDefaults#ASYNC`<br/>`ClickHouseClientOption#ASYNC` | `Client.Builder#useAsyncRequests` | |
| `ClickHouseDefaults#MAX_SCHEDULER_THREADS` | ✗ | see `setSharedOperationExecutor` |
| `ClickHouseDefaults#MAX_THREADS` | ✗ | see `setSharedOperationExecutor` |
| `ClickHouseDefaults#THREAD_KEEPALIVE_TIMEOUT` | see `setSharedOperationExecutor` | |
| `ClickHouseClientOption#MAX_THREADS_PER_CLIENT` | ✗ | |
| `ClickHouseClientOption#MAX_CORE_THREAD_TTL` | ✗ | |
</TabItem>
<TabItem value="http-headers" label="HTTP & Headers">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseHttpOption#CUSTOM_HEADERS` | `Client.Builder#httpHeaders` | |
| `ClickHouseHttpOption#CUSTOM_PARAMS` | ✗ | See `Client.Builder#serverSetting` |
| `ClickHouseClientOption#CLIENT_NAME` | `Client.Builder#setClientName` | |
| `ClickHouseHttpOption#CONNECTION_PROVIDER` | ✗ | |
| `ClickHouseHttpOption#DEFAULT_RESPONSE` | ✗ | |
| `ClickHouseHttpOption#SEND_HTTP_CLIENT_ID` | ✗ | |
| `ClickHouseHttpOption#AHC_VALIDATE_AFTER_INACTIVITY` | ✗ | Always enabled when Apache Http Client is used |
</TabItem>
<TabItem value="format-query" label="Format & Query">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseDefaults#FORMAT`<br/>`ClickHouseClientOption#FORMAT` | ✗ | Moved to operation settings (`QuerySettings` and `InsertSettings`) |
| `ClickHouseClientOption#QUERY_ID` | ✗ | See `QuerySettings` and `InsertSettings` |
| `ClickHouseClientOption#LOG_LEADING_COMMENT` | ✗ | See `QuerySettings#logComment` and `InsertSettings#logComment` |
| `ClickHouseClientOption#MAX_RESULT_ROWS` | ✗ | Is server side setting |
| `ClickHouseClientOption#RESULT_OVERFLOW_MODE` | ✗ | Is server side setting |
| `ClickHouseHttpOption#RECEIVE_QUERY_PROGRESS` | ✗ | Server side setting |
| `ClickHouseHttpOption#WAIT_END_OF_QUERY` | ✗ | Server side setting |
| `ClickHouseHttpOption#REMEMBER_LAST_SET_ROLES` | `Client#setDBRoles` | Runtime config now. See also `QuerySettings#setDBRoles` and `InsertSettings#setDBRoles` |
</TabItem>
<TabItem value="node-discovery" label="Node Discovery & Load Balancing">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseClientOption#AUTO_DISCOVERY` | ✗ | |
| `ClickHouseClientOption#LOAD_BALANCING_POLICY` | ✗ | |
| `ClickHouseClientOption#LOAD_BALANCING_TAGS` | ✗ | |
| `ClickHouseClientOption#HEALTH_CHECK_INTERVAL` | ✗ | |
| `ClickHouseClientOption#HEALTH_CHECK_METHOD` | ✗ | |
| `ClickHouseClientOption#NODE_DISCOVERY_INTERVAL` | ✗ | |
| `ClickHouseClientOption#NODE_DISCOVERY_LIMIT` | ✗ | |
| `ClickHouseClientOption#NODE_CHECK_INTERVAL` | ✗ | |
| `ClickHouseClientOption#NODE_GROUP_SIZE` | ✗ | |
| `ClickHouseClientOption#CHECK_ALL_NODES` | ✗ | |
</TabItem>
<TabItem value="misc" label="Miscellaneous">
| V1 Configuration | V2 Builder Method | Comments |
|------------------|-------------------|----------|
| `ClickHouseDefaults#AUTO_SESSION` | ✗ | Session support will be reviewed |
| `ClickHouseDefaults#BUFFERING` | ✗ | |
| `ClickHouseDefaults#MAX_REQUESTS` | ✗ | |
| `ClickHouseDefaults#ROUNDING_MODE` | | |
| `ClickHouseDefaults#SERVER_VERSION`<br/>`ClickHouseClientOption#SERVER_VERSION` | `Client.Builder#setServerVersion` | |
| `ClickHouseDefaults#SRV_RESOLVE` | ✗ | |
| `ClickHouseClientOption#CUSTOM_SETTINGS` | | |
| `ClickHouseClientOption#PRODUCT_NAME` | ✗ | Use client name |
| `ClickHouseClientOption#RENAME_RESPONSE_COLUMN` | ✗ | |
| `ClickHouseClientOption#SERVER_REVISION` | ✗ | |
| `ClickHouseClientOption#TRANSACTION_TIMEOUT` | ✗ | |
| `ClickHouseClientOption#WIDEN_UNSIGNED_TYPES` | ✗ | |
| `ClickHouseClientOption#USE_BINARY_STRING` | ✗ | |
| `ClickHouseClientOption#USE_BLOCKING_QUEUE` | ✗ | |
| `ClickHouseClientOption#USE_COMPILATION` | ✗ | |
| `ClickHouseClientOption#USE_OBJECTS_IN_ARRAYS` | ✗ | |
| `ClickHouseClientOption#MAX_MAPPER_CACHE` | ✗ | |
| `ClickHouseClientOption#MEASURE_REQUEST_TIME` | ✗ | |
</TabItem>
</Tabs>
### General Differences
- Client V2 uses less proprietary classes to increase portability. For example, V2 works with any implementation of `java.io.InputStream` for
writing data to a server.
- Client V2 `async` settings is `off` by default. It means no extra threads and more application control over client. This setting should be `off` for majority of use cases. Enabling `async` will create a separate thread for a request. It only make sense when using application controlled
executor (see `com.clickhouse.client.api.Client.Builder#setSharedOperationExecutor`)
### Writing Data
- use any implementation of `java.io.InputStream`. V1 `com.clickhouse.data.ClickHouseInputStream` is supported but NOT recommended.
- once end of input stream is detected it handled accordingly. Previously output stream of a request should be closed.
__V1 Insert TSV formatted data.__
```java
InputStream inData = getInData();
ClickHouseRequest.Mutation request = client.read(server)
.write()
.table(tableName)
.format(ClickHouseFormat.TSV);
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future;
try (ClickHousePipedOutputStream requestBody = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(config)) {
// start the worker thread which transfer data from the input into ClickHouse
future = request.data(requestBody.getInputStream()).execute();
// Copy data from inData stream to requestBody stream
// We need to close the stream before getting a response
requestBody.close();
try (ClickHouseResponse response = future.get()) {
ClickHouseResponseSummary summary = response.getSummary();
Assert.assertEquals(summary.getWrittenRows(), numRows, "Num of written rows");
}
}
V2 插入 TSV 格式的数据。
InputStream inData = getInData();
InsertSettings settings = new InsertSettings().setInputStreamCopyBufferSize(8198 * 2); // set copy buffer size
try (InsertResponse response = client.insert(tableName, inData, ClickHouseFormat.TSV, settings).get(30, TimeUnit.SECONDS)) {
// Insert is complete at this point
} catch (Exception e) {
// Handle exception
}
- 只需调用一个方法,无需单独创建请求对象。
- 请求体数据流会在所有数据复制完成后自动关闭。
- 现已提供新的低级 API:
com.clickhouse.client.api.Client#insert(java.lang.String, java.util.List<java.lang.String>, com.clickhouse.client.api.DataStreamWriter, com.clickhouse.data.ClickHouseFormat, com.clickhouse.client.api.insert.InsertSettings)。com.clickhouse.client.api.DataStreamWriter 专为实现自定义数据写入逻辑而设计,例如从队列中读取数据。
读取数据
- 默认情况下,数据以
RowBinaryWithNamesAndTypes 格式读取。当需要进行数据绑定时,目前仅支持该格式。
- 可以使用
List<GenericRecord> com.clickhouse.client.api.Client#queryAll(java.lang.String) 方法将数据读取为一组记录。该方法会将数据读入内存并释放连接,无需额外处理。GenericRecord 提供对数据的访问能力,并实现了一些类型转换。
Collection<GenericRecord> records = client.queryAll("SELECT * FROM table");
for (GenericRecord record : records) {
int rowId = record.getInteger("rowID");
String name = record.getString("name");
LocalDateTime ts = record.getLocalDateTime("ts");
}