跳到主要内容
跳到主要内容

Java 客户端

用于通过其协议与数据库服务器通信的 Java 客户端库。当前实现仅支持 HTTP 接口。 该库提供了自身的 API 用于向服务器发送请求,还提供了一些工具,用于处理不同的二进制数据格式(RowBinary* 和 Native*)。

安装与配置


<dependency>
    <groupId>com.clickhouse</groupId>
    <artifactId>client-v2</artifactId>
    <version>0.9.1</version>
</dependency>

初始化

com.clickhouse.client.api.Client.Builder#build() 会初始化 Client 对象。每个 Client 实例都有自己的上下文,彼此之间不共享任何对象。 Builder 提供了若干配置方法,便于进行便捷配置。

示例:

 Client client = new Client.Builder()
                .addEndpoint("https://clickhouse-cloud-instance:8443/")
                .setUsername(user)
                .setPassword(password)
                .build();

Client 实现了 AutoCloseable 接口,在不再需要时应关闭。

身份验证

在初始化阶段为每个客户端配置身份验证。当前支持三种身份验证方式:通过密码、访问令牌或 SSL 客户端证书。

通过密码进行身份验证时,需要调用 setUsername(String)setPassword(String) 来设置用户名和密码:

 Client client = new Client.Builder()
        .addEndpoint("https://clickhouse-cloud-instance:8443/")
        .setUsername(user)
        .setPassword(password)
        .build();

若要通过访问令牌进行身份验证,需要调用 setAccessToken(String) 来设置访问令牌:

 Client client = new Client.Builder()
        .addEndpoint("https://clickhouse-cloud-instance:8443/")
        .setAccessToken(userAccessToken)
        .build();

使用 SSL 客户端证书进行认证时,需要设置用户名、启用 SSL 认证,并分别通过调用 setUsername(String)useSSLAuthentication(boolean)setClientCertificate(String)setClientKey(String) 来设置客户端证书和客户端密钥:

Client client = new Client.Builder()
        .useSSLAuthentication(true)
        .setUsername("some_user")
        .setClientCertificate("some_user.crt")
        .setClientKey("some_user.key")
注意

在生产环境中,SSL 身份验证可能比较难排查,因为很多来自 SSL 库的错误信息都不够详细。比如,如果客户端证书和私钥不匹配,服务器会立即终止连接(对于 HTTP,这会发生在连接初始化阶段,此时尚未发送任何 HTTP 请求,因此也不会返回任何响应)。

请使用 openssl 等工具来验证证书和私钥:

  • 检查私钥完整性:openssl rsa -in [key-file.key] -check -noout
  • 检查客户端证书中是否包含与用户匹配的 CN:
    • 从用户证书中获取 CN:openssl x509 -noout -subject -in [user.cert]
    • 在数据库中验证是否设置了相同的值:select name, auth_type, auth_params from system.users where auth_type = 'ssl_certificate'(查询会输出 auth_params,内容类似 {"common_names":["some_user"]}

配置

所有设置都通过实例方法(也称为配置方法)进行定义,从而使每个值的作用域和上下文更加清晰明了。 主要配置参数分别在各自的作用域(客户端或操作)中定义,且不会互相覆盖。

配置在创建客户端时完成定义。参见 com.clickhouse.client.api.Client.Builder

客户端配置

配置方式参数描述
addEndpoint(String endpoint)* endpoint - 采用 URL 格式的服务器地址。将服务器端点添加到可用服务器列表。目前仅支持一个端点。

默认值:none
枚举:none
键:none
addEndpoint(Protocol protocol, String host, int port, boolean secure)- protocol - 连接协议 com.clickhouse.client.api.enums.Protocol#HTTP
- host - 服务器的 IP 或主机名。
- secure - 指定通信是否应使用该协议的安全版本(HTTPS)。
将服务器端点添加到可用服务器列表。目前仅支持配置一个端点。

默认值:none
枚举:none
键:none
setOption(String key, String value)* key - 客户端配置选项的字符串键。
- value - 该配置选项的字符串值
设置客户端选项的原始值。在从属性文件中读取配置时非常有用。
setUsername(String username)- username - 用户用于认证的用户名为后续配置所选的身份验证方法设置用户名

默认值:default
枚举:ClientConfigProperties.USER
键:user
setPassword(String password)* password - 用于密码验证的秘密值为密码验证设置密钥,并实质上将其选为验证方式

默认值:-
枚举:ClientConfigProperties.PASSWORD
键:password
setAccessToken(String accessToken)- accessToken - 访问令牌的字符串形式设置用于身份验证的访问令牌,并设置相应的身份验证方法

默认值:-
枚举:ClientConfigProperties.ACCESS_TOKEN
键:access_token
useSSLAuthentication(boolean useSSLAuthentication)* useSSLAuthentication - 用于指示是否应启用 SSL 身份验证的标志将 SSL 客户端证书设为身份验证方式。

默认值:-
枚举值:ClientConfigProperties.SSL_AUTH
键名:ssl_authentication
enableConnectionPool(boolean enable)- enable - 指示是否启用该选项的标志位设置是否启用连接池

默认值:true
枚举值:ClientConfigProperties.CONNECTION_POOL_ENABLED
键:connection_pool_enabled
setConnectTimeout(long timeout, ChronoUnit unit)* timeout - 以某种时间单位表示的超时时长。
- unit - timeout 的时间单位
为任何出站连接设置发起连接的超时时间。这会影响等待套接字完成连接建立的时长。

默认值:-
枚举:ClientConfigProperties.CONNECTION_TIMEOUT
键:connection_timeout
setConnectionRequestTimeout(long timeout, ChronoUnit unit)- timeout - 以某种时间单位表示的超时时长。
- unit - timeout 的时间单位
设置连接请求超时时间。此配置仅在从连接池获取连接时生效。

默认值:10000
枚举:ClientConfigProperties.CONNECTION_REQUEST_TIMEOUT
键:connection_request_timeout
setMaxConnections(int maxConnections)* maxConnections - 连接数设置客户端到每个服务器端点可打开的最大连接数。

默认值:10
枚举:ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS
键:max_open_connections
setConnectionTTL(long timeout, ChronoUnit unit)- timeout - 超时时长,配合时间单位使用。
- unit - timeout 的时间单位
设置连接的 TTL,超过该值后连接将被视为不再活动

默认值: -1
枚举: ClientConfigProperties.CONNECTION_TTL
键: connection_ttl
setKeepAliveTimeout(long timeout, ChronoUnit unit)* timeout - 以某个时间单位表示的超时时长。
- unit - timeout 的时间单位
设置 HTTP 连接的 Keep-Alive 超时时间。可通过将该超时时间设置为零 0 来禁用 Keep-Alive。

默认值:-
枚举:ClientConfigProperties.HTTP_KEEP_ALIVE_TIMEOUT
键:http_keep_alive_timeout
setConnectionReuseStrategy(ConnectionReuseStrategy strategy)- strategy - 枚举常量 com.clickhouse.client.api.ConnectionReuseStrategy选择连接池应使用的策略:如果希望连接在返回到池中后立即被复用,则使用 LIFO;如果希望按照连接变为可用的顺序来使用(返回的连接不会被立即再次使用),则使用 FIFO

默认值:FIFO
枚举:ClientConfigProperties.CONNECTION_REUSE_STRATEGY
键:connection_reuse_strategy
setSocketTimeout(long timeout, ChronoUnit unit)`* timeout - 以某个时间单位表示的超时时间。
- unit - timeout 的时间单位。
设置用于读写操作的 socket 超时时间

默认值: 0
枚举值: ClientConfigProperties.SOCKET_OPERATION_TIMEOUT
键: socket_timeout
setSocketRcvbuf(long size)- size - 大小(字节)设置 TCP socket 接收缓冲区。该缓冲区不在 JVM 内存中分配。

默认值:8196
枚举:ClientConfigProperties.SOCKET_RCVBUF_OPT
键:socket_rcvbuf
setSocketSndbuf(long size)* size - 大小(以字节为单位)设置 TCP socket 接收缓冲区。该缓冲区不占用 JVM 内存。

默认值:8196
枚举:ClientConfigProperties.SOCKET_SNDBUF_OPT
键:socket_sndbuf
setSocketKeepAlive(boolean value)- value - 标志位,表示是否启用该选项。为客户端创建的每个 TCP 套接字设置 SO_KEEPALIVE 选项。TCP Keepalive 启用一种机制,用于检查连接的存活状态,并有助于检测被突然终止的连接。

默认值:-
枚举:ClientConfigProperties.SOCKET_KEEPALIVE_OPT
键:socket_keepalive
setSocketTcpNodelay(boolean value)* value - 指示是否启用该选项的标志。为客户端创建的每个 TCP 套接字设置 SO_NODELAY 选项。该 TCP 选项会使套接字尽可能快地发送数据。

默认值: -
枚举: ClientConfigProperties.SOCKET_TCP_NO_DELAY_OPT
键: socket_tcp_nodelay
setSocketLinger(int secondsToWait)- secondsToWait - 要等待的秒数。为客户端创建的每个 TCP 套接字设置 Linger(延迟关闭)时间。

默认值:-
枚举:ClientConfigProperties.SOCKET_LINGER_OPT
键:socket_linger
compressServerResponse(boolean enabled)* enabled - 用于指示是否应启用该选项的标志设置服务器是否压缩其响应内容。

默认值:true
枚举:ClientConfigProperties.COMPRESS_SERVER_RESPONSE
键:compress
compressClientRequest(boolean enabled)- enabled - 标志,用于指示是否启用该选项设置客户端是否应对其请求进行压缩。

默认值:false
枚举:ClientConfigProperties.COMPRESS_CLIENT_REQUEST
键:decompress
useHttpCompression(boolean enabled)* enabled - 表示是否启用该选项的标志设置在启用相应选项时客户端/服务器通信是否使用 HTTP 压缩
appCompressedData(boolean enabled)- enabled - 指示是否启用该选项的标志位通知客户端,压缩将由应用程序负责处理。

默认值:false
枚举值:ClientConfigProperties.APP_COMPRESSED_DATA
键:app_compressed_data
setLZ4UncompressedBufferSize(int size)* size - 大小(字节)设置用于接收数据流未压缩部分的缓冲区大小。若缓冲区大小设置过小,将会创建一个新的缓冲区,并在日志中记录相应的警告。

默认值:65536
枚举常量:ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE
配置键:compression.lz4.uncompressed_buffer_size
disableNativeCompression- disable - 用于指示是否应禁用该选项的标志禁用原生压缩。如果设置为 true,则会禁用原生压缩。

默认值:false
枚举:ClientConfigProperties.DISABLE_NATIVE_COMPRESSION
键:disable_native_compression
setDefaultDatabase(String database)* database - 数据库的名称设置默认数据库。

默认值:default
枚举值:ClientConfigProperties.DATABASE
键:database
addProxy(ProxyType type, String host, int port)- type - 代理类型。
- host - 代理主机名或 IP 地址。
- port - 代理端口。
设置用于与服务器通信时使用的代理。如果代理要求身份验证,则必须配置代理。

默认值:-
枚举:ClientConfigProperties.PROXY_TYPE
键:proxy_type

默认值:-
枚举:ClientConfigProperties.PROXY_HOST
键:proxy_host

默认值:-
枚举:ClientConfigProperties.PROXY_PORT
键:proxy_port
setProxyCredentials(String user, String pass)* user - 代理用户名
- pass - 密码
设置用于代理认证的用户凭据。

默认值:-
枚举:ClientConfigProperties.PROXY_USER
键:proxy_user

默认值:-
枚举:ClientConfigProperties.PROXY_PASSWORD
键:proxy_password
setExecutionTimeout(long timeout, ChronoUnit timeUnit)- timeout - 按某个时间单位表示的超时时长。
- timeUnit - timeout 的时间单位
用于设置查询的最⼤执行超时时间

默认值:0
枚举:ClientConfigProperties.MAX_EXECUTION_TIME
键:max_execution_time
setHttpCookiesEnabled(boolean enabled)enabled - 指示是否启用该选项的标志设置是否在后续请求中保存并回传 HTTP cookie。
setSSLTrustStore(String path)path - 本地(客户端)系统中的文件路径设置客户端是否应使用 SSL 信任库来验证服务器主机。

默认值:-
枚举:ClientConfigProperties.SSL_TRUST_STORE
键:trust_store
setSSLTrustStorePassword(String password)password - 秘密值设置用于解锁由 setSSLTrustStore(String path) 指定的 SSL 信任库的密码

默认: -
枚举: ClientConfigProperties.SSL_KEY_STORE_PASSWORD
键: key_store_password
setSSLTrustStoreType(String type)type - 信任库类型名称设置由 setSSLTrustStore(String path) 指定的信任库类型。

默认值:-
枚举值:ClientConfigProperties.SSL_KEYSTORE_TYPE
键:key_store_type
setRootCertificate(String path)path - 本地(客户端)系统上的文件路径设置客户端是否应使用指定的根(CA)证书对服务器主机进行验证。

默认值:-
枚举:ClientConfigProperties.CA_CERTIFICATE
键名:sslrootcert
setClientCertificate(String path)path - 本地(客户端)系统中的文件路径设置在发起 SSL 连接以及进行 SSL 认证时使用的客户端证书路径。

默认值:-
枚举值:ClientConfigProperties.SSL_CERTIFICATE
键名:sslcert
setClientKey(String path)path - 本地(客户端)系统中的文件路径设置用于与服务器进行 SSL 加密通信的客户端私钥。

默认值:-
枚举:ClientConfigProperties.SSL_KEY
键:ssl_key
useServerTimeZone(boolean useServerTimeZone)useServerTimeZone - 表示是否启用该选项的标志指定在解码 DateTime 和 Date 列值时,客户端是否应使用服务器时区。启用后,应通过 setServerTimeZone(String timeZone) 设置服务器时区。

默认值:true
枚举:ClientConfigProperties.USE_SERVER_TIMEZONE
键:use_server_time_zone
useTimeZone(String timeZone)timeZone - Java 中有效时区 ID 的字符串(参见 java.time.ZoneId设置在解码 DateTime 和 Date 列值时,是否应使用指定的时区。会覆盖服务器的时区设置。

默认值:-
枚举:ClientConfigProperties.USE_TIMEZONE
键:use_time_zone
setServerTimeZone(String timeZone)timeZone - Java 中有效时区 ID 的字符串值(参见 java.time.ZoneId设置服务器端时区。默认使用 UTC 时区。

默认值:UTC
枚举:ClientConfigProperties.SERVER_TIMEZONE
键:server_time_zone
useAsyncRequests(boolean async)async - 指示是否应启用该选项的标志。设置是否在单独线程中执行客户端请求。默认禁用,因为应用程序更清楚如何组织多线程任务,而且在单独线程中运行任务并不会提升性能。

默认值:false
枚举:ClientConfigProperties.ASYNC_OPERATIONS
键:async
setSharedOperationExecutor(ExecutorService executorService)executorService - 执行器服务的实例。设置用于操作任务的 executor 服务。

默认值:none
枚举:none
键:none
setClientNetworkBufferSize(int size)* size - 以字节为单位的大小设置应用程序内存空间中缓冲区的大小,用于在套接字与应用程序之间来回复制数据。更大的缓冲区可以减少对 TCP 栈的系统调用次数,但会增加每个连接占用的内存。由于连接的生命周期通常较短,该缓冲区也会被垃圾回收(GC)。还需注意,分配大块连续内存可能会带来问题。

默认值: 300000
枚举: ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE
键: client_network_buffer_size
retryOnFailures(ClientFaultCause ...causes)- causes - com.clickhouse.client.api.ClientFaultCause 枚举中的常量设置可恢复/可重试的错误类型。

默认值:NoHttpResponse,ConnectTimeout,ConnectionRequestTimeout
枚举值:ClientConfigProperties.CLIENT_RETRY_ON_FAILURE
键:client_retry_on_failures
setMaxRetries(int maxRetries)* maxRetries - 重试次数为针对 retryOnFailures(ClientFaultCause ...causes) 定义的失败类型设置最大重试次数

默认值:3
枚举值:ClientConfigProperties.RETRY_ON_FAILURE
键名:retry
allowBinaryReaderToReuseBuffers(boolean reuse)- reuse - 指示是否应启用该选项的标志位大多數資料集都包含以小位元組序列編碼的數值資料。預設情況下,reader 會分配所需的緩衝區,將資料讀入其中,然後轉換為目標的 Number 類別。由於會分配並釋放大量小物件,這可能會造成顯著的 GC 壓力。啟用此選項後,reader 會使用預先分配的緩衝區來進行數值轉換。這是安全的,因為每個 reader 都有自己的一組緩衝區,且各個 reader 只會被單一執行緒使用。
httpHeader(String key, String value)* key - HTTP 头部键名。
- value - 该头部的字符串值。
为单个 HTTP 头部设置值。之前的值将被覆盖。

默认值:none
枚举:none
键:none
httpHeader(String key, Collection values)- key - HTTP 头部键名。
- values - 字符串值列表。
为单个 HTTP 头部设置值。之前的值会被覆盖。

默认值:none
枚举:none
键:none
httpHeaders(Map headers)* header - 包含 HTTP 头及其对应值的映射表。同时设置多个 HTTP 头部的值。

默认值: none
枚举: none
键: none
serverSetting(String name, String value)- name - 查询级别设置的名称。
- value - 此设置的字符串值。
设置随每个查询一起传递给服务器的设置。单个操作的设置可以覆盖它。设置列表

默认值:none
枚举值:none
键:none
serverSetting(String name, Collection values)* name - 查询级别设置的名称。
- values - 该设置的字符串值。
配置随每个查询一同传递给服务器的设置。单个操作的设置可以覆盖它。参见 设置列表。此方法适用于为可接受多个值的设置赋值,例如 roles

默认值: none
枚举: none
键: none
columnToMethodMatchingStrategy(ColumnToMethodMatchingStrategy strategy)- strategy - 用于列与字段匹配的策略实现设置在注册 DTO 时用于匹配 DTO 类字段和数据库列的自定义策略。

默认值: none
枚举: none
键: none
useHTTPBasicAuth(boolean useBasicAuth)* useBasicAuth - 用于指示是否应启用该选项的标志设置是否使用基本 HTTP 身份验证进行用户密码认证。默认启用。使用此类型的身份验证可以解决密码中包含特殊字符而无法通过 HTTP 头部传输所导致的问题。

默认值:true
枚举:ClientConfigProperties.HTTP_USE_BASIC_AUTH
键:http_use_basic_auth
setClientName(String clientName)- clientName - 表示应用程序名称的字符串设置关于调用方应用程序的附加信息。该字符串将作为客户端名称传递给服务器。对于 HTTP 协议,将作为 User-Agent 头部字段传递。

默认值:-
枚举:ClientConfigProperties.CLIENT_NAME
键:client_name
useBearerTokenAuth(String bearerToken)* bearerToken - 编码后的 Bearer 令牌指定是否使用 Bearer 认证以及要使用的 token。token 将按原样发送,因此在传递给此方法之前应先进行编码处理。

默认值:-
枚举:ClientConfigProperties.BEARERTOKEN_AUTH
键:bearer_token
registerClientMetrics(Object registry, String name)- registry - Micrometer registry 实例
- name - 指标组名称
在 Micrometer(https://micrometer.io/)的注册表实例中注册传感器。
setServerVersion(String version)* version - 服务器版本的字符串形式设置服务器版本以避免版本检测。

默认值:-
枚举值:ClientConfigProperties.SERVER_VERSION
键名:server_version
typeHintMapping(Map typeHintMapping)- typeHintMapping - 类型提示映射为 ClickHouse 类型设置类型提示映射。例如,可以让多维数组以 Java 容器类型呈现,而不是作为独立的 Array 对象。

默认值:-
枚举:ClientConfigProperties.TYPE_HINT_MAPPING
键:type_hint_mapping
sslSocketSNI(String sni)* sni - 服务器名称的字符串形式在 SSL/TLS 连接中设置用于 SNI(Server Name Indication,服务器名称指示)的服务器名称。

默认值:-
枚举值:ClientConfigProperties.SSL_SOCKET_SNI
键名:ssl_socket_sni

服务器设置

服务器端设置既可以在创建客户端时于客户端级别统一设置一次(参见 BuilderserverSetting 方法),也可以在操作级别进行设置(参见操作设置类中的 serverSetting 方法)。

 try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false)
        .setUsername("default")
        .setPassword(ClickHouseServerForTest.getPassword())
        .compressClientRequest(true)

        // 客户端级别
        .serverSetting("max_threads", "10")
        .serverSetting("async_insert", "1")
        .serverSetting("roles", Arrays.asList("role1", "role2"))

        .build()) {

	// 操作级别
	QuerySettings querySettings = new QuerySettings();
	querySettings.serverSetting("session_timezone", "Europe/Zurich");

	...
}

当通过 setOption 方法(无论是 Client.Builder 还是操作设置类)设置选项时,服务器端设置名称应添加前缀 clickhouse_setting_。在这种情况下,可以方便地使用 com.clickhouse.client.api.ClientConfigProperties#serverSetting()

自定义 HTTP 头部

可以在客户端级别为所有操作统一设置自定义 HTTP 头部,也可以在操作级别为单个操作单独设置自定义 HTTP 头部。


QuerySettings settings = new QuerySettings()
    .httpHeader(HttpHeaders.REFERER, clientReferer)
    .setQueryId(qId);

当通过 setOption 方法(无论是 Client.Builder 还是操作设置类)设置选项时,自定义请求头名称应添加 http_header_ 前缀。在这种情况下,可以方便地使用方法 com.clickhouse.client.api.ClientConfigProperties#httpHeader()

常用术语定义

ClickHouseFormat

支持的格式 的枚举类型。它包含 ClickHouse 支持的所有格式。

  • raw - 用户应对原始数据进行转码
  • full - 客户端可以自行对数据进行转码,并接受原始数据流
  • - - ClickHouse 不支持对此格式执行该操作

本客户端版本支持:

格式输入输出
TabSeparated原始原始数据
TabSeparatedRaw原始原始
TabSeparatedWithNames原始原始
TabSeparatedWithNamesAndTypes原始原始
TabSeparatedRawWithNames原始数据原始
TabSeparatedRawWithNamesAndTypes原始原始
Template原始原始
TemplateIgnoreSpaces原始*
CSV原始原始
CSVWithNames原始原始
CSVWithNamesAndTypes原始原始
CustomSeparated原始原始
CustomSeparatedWithNames原始原始
CustomSeparatedWithNamesAndTypes原始原始
SQLInsert-原始
Values 格式原始原始
Vertical*原始数据
JSON原始原始
JSONAsString原始-
JSONAsObject原始*
JSONStringsraw原始
JSONColumnsraw原始
JSONColumnsWithMetadata原始原始
JSONCompact原始原始
JSONCompactStrings-原始
JSONCompactColumns原始原始
JSONEachRow原始原始
PrettyJSONEachRow*原始
JSONEachRowWithProgress-原始
JSONStringsEachRow原始原始数据
JSONStringsEachRowWithProgress*原始
JSONCompactEachRow原始raw
JSONCompactEachRowWithNames原始原始
JSONCompactEachRowWithNamesAndTypes原始数据原始
JSONCompactStringsEachRow原始原始数据
JSONCompactStringsEachRowWithNames原始数据原始
JSONCompactStringsEachRowWithNamesAndTypes原始原始
JSONObjectEachRow原始原始
BSONEachRow原始raw
TSKV原始原始
Pretty 格式-原始
PrettyNoEscapes*原始
PrettyMonoBlock-原始
PrettyNoEscapesMonoBlock*原始
PrettyCompact-原始
PrettyCompactNoEscapes*原始
PrettyCompactMonoBlock-raw
PrettyCompactNoEscapesMonoBlock*原始
PrettySpace-原始
PrettySpaceNoEscapes*原始
PrettySpaceMonoBlock-原始
PrettySpaceNoEscapesMonoBlock*原始
Prometheus-原始
Protobuf原始原始数据
ProtobufSingle原始原始
ProtobufList原始数据原始
Avro原始原始
AvroConfluent原始*
Parquet原始原始
ParquetMetadataraw-
Arrow原始原始
ArrowStream原始原始
ORC原始数据原始
One原始*
Npy原始原始
RowBinary完整完整
RowBinaryWithNames完整完整
RowBinaryWithNamesAndTypes完整完整
RowBinaryWithDefaults完整-
Native完整原始
Null*原始
XML-raw
CapnProto原始原始
LineAsString原始数据原始
Regexp原始*
RawBLOB原始原始
MsgPack原始原始数据
MySQLDump原始-
DWARF原始*
Markdown-原始数据
表单raw*

Insert API

insert(String tableName, InputStream data, ClickHouseFormat format)

以指定格式编码的字节 InputStream 形式接收数据。data 预期应使用 format 进行编码。

函数签名

CompletableFuture<InsertResponse> insert(String tableName, InputStream data, ClickHouseFormat format, InsertSettings settings)
CompletableFuture<InsertResponse> insert(String tableName, InputStream data, ClickHouseFormat format)

参数

tableName - 目标表名。

data - 已编码数据的输入流。

format - 数据编码所使用的格式。

settings - 请求相关设置。

返回值

InsertResponse 类型的 Future —— 表示操作结果以及服务器端指标等附加信息。

示例

try (InputStream dataStream = getDataStream()) {
    try (InsertResponse response = client.insert(TABLE_NAME, dataStream, ClickHouseFormat.JSONEachRow,
            insertSettings).get(3, TimeUnit.SECONDS)) {

        log.info("插入完成:已写入 {} 行", response.getMetrics().getMetric(ServerMetrics.NUM_ROWS_WRITTEN).getLong());
    } catch (Exception e) {
        log.error("写入 JSONEachRow 数据失败", e);
        throw new RuntimeException(e);
    }
}

insert(String tableName, List<?> data, InsertSettings settings)

向数据库发送写入请求。对象列表会被转换为一种高效的格式,然后发送到服务器。列表元素的类应事先通过 register(Class, TableSchema) 方法进行注册。

方法签名

client.insert(String tableName, List<?> data, InsertSettings settings)
client.insert(String tableName, List<?> data)

参数

tableName - 目标表名称。

data - DTO(Data Transfer Object,数据传输对象)集合。

settings - 请求参数设置。

返回值

InsertResponse 类型的 Future —— 操作结果以及例如服务端指标在内的附加信息。

示例

// 重要步骤(仅执行一次) - 注册类以根据表结构预编译对象序列化器。 
client.register(ArticleViewEvent.class, client.getTableSchema(TABLE_NAME));

List<ArticleViewEvent> events = loadBatch();

try (InsertResponse response = client.insert(TABLE_NAME, events).get()) {
    // 处理响应,之后响应将被关闭,服务该请求的连接将被释放。 
}

insert(String tableName, DataStreamWriter writer, ClickHouseFormat format, InsertSettings settings)

Beta

此 API 方法允许传入一个 writer 对象,用于将数据直接编码到输出流中。数据将由客户端进行压缩。 在 InsertSettings 中有一个名为 appCompressedData 的配置选项,可用于关闭客户端压缩,由应用程序自行发送压缩后的数据流。 文档中的示例展示了此 API 设计时所针对的主要使用场景。

com.clickhouse.client.api.DataStreamWriter 是一个函数式接口,包含一个方法 onOutput,当输出流准备好写入数据时由客户端调用。该接口还有 另一个具有默认实现的方法 onRetry。当重试逻辑被触发时会调用此方法,主要用于在适用时重置数据源。

方法签名

CompletableFuture<InsertResponse> insert(String tableName,              // 目标表名称
                                         DataStreamWriter writer,       // 数据写入器实例
                                         ClickHouseFormat format,       // 写入器编码数据的格式
                                         InsertSettings settings)       // 操作设置

参数

tableName - 目标表名称。

writer - 数据写入器实例。

format - 写入器对数据进行编码所使用的数据格式。

settings - 请求设置。

返回值

InsertResponse 类型的 Future——包含操作结果以及服务器端指标等附加信息。

示例

使用 JSONEachRow 格式,将一组编码为字符串值的 JSON 对象写入:


final int EXECUTE_CMD_TIMEOUT = 10; // 秒
final String tableName = "events";
final String tableCreate = "CREATE TABLE \"" + tableName + "\" " +
        " (name String, " +
        "  v1 Float32, " +
        "  v2 Float32, " +
        "  attrs Nullable(String), " +
        "  corrected_time DateTime('UTC') DEFAULT now()," +
        "  special_attr Nullable(Int8) DEFAULT -1)" +
        "  Engine = MergeTree ORDER by ()";

client.execute("DROP TABLE IF EXISTS " + tableName).get(EXECUTE_CMD_TIMEOUT, TimeUnit.SECONDS);
client.execute(createTableSQL).get(EXECUTE_CMD_TIMEOUT, TimeUnit.SECONDS);

String correctedTime = Instant.now().atZone(ZoneId.of("UTC")).format(DataTypeUtils.DATETIME_FORMATTER);
String[] rows = new String[] {
        "{ \"name\": \"foo1\", \"v1\": 0.3, \"v2\": 0.6, \"attrs\": \"a=1,b=2,c=5\", \"corrected_time\": \"" + correctedTime + "\", \"special_attr\": 10}",
        "{ \"name\": \"foo1\", \"v1\": 0.3, \"v2\": 0.6, \"attrs\": \"a=1,b=2,c=5\", \"corrected_time\": \"" + correctedTime + "\"}",
        "{ \"name\": \"foo1\", \"v1\": 0.3, \"v2\": 0.6, \"attrs\": \"a=1,b=2,c=5\" }",
        "{ \"name\": \"foo1\", \"v1\": 0.3, \"v2\": 0.6 }",
};


try (InsertResponse response = client.insert(tableName, out -> {
    // 写入原始字节数据 
    for (String row : rows) {
        out.write(row.getBytes());
    }

}, ClickHouseFormat.JSONEachRow, new InsertSettings()).get()) {

    System.out.println("写入行数: " + response.getWrittenRows());
}

写入已压缩数据:

String tableName = "very_long_table_name_with_uuid_" + UUID.randomUUID().toString().replace('-', '_');
String tableCreate = "CREATE TABLE \"" + tableName + "\" " +
        " (name String, " +
        "  v1 Float32, " +
        "  v2 Float32, " +
        "  attrs Nullable(String), " +
        "  corrected_time DateTime('UTC') DEFAULT now()," +
        "  special_attr Nullable(Int8) DEFAULT -1)" +
        "  Engine = MergeTree ORDER by ()";

client.execute("DROP TABLE IF EXISTS " + tableName).get(EXECUTE_CMD_TIMEOUT, TimeUnit.SECONDS);
client.execute(createTableSQL).get(EXECUTE_CMD_TIMEOUT, TimeUnit.SECONDS);

String correctedTime = Instant.now().atZone(ZoneId.of("UTC")).format(DataTypeUtils.DATETIME_FORMATTER);
String[] data = new String[] {
        "{ \"name\": \"foo1\", \"v1\": 0.3, \"v2\": 0.6, \"attrs\": \"a=1,b=2,c=5\", \"corrected_time\": \"" + correctedTime + "\", \"special_attr\": 10}",
        "{ \"name\": \"foo1\", \"v1\": 0.3, \"v2\": 0.6, \"attrs\": \"a=1,b=2,c=5\", \"corrected_time\": \"" + correctedTime + "\"}",
        "{ \"name\": \"foo1\", \"v1\": 0.3, \"v2\": 0.6, \"attrs\": \"a=1,b=2,c=5\" }",
        "{ \"name\": \"foo1\", \"v1\": 0.3, \"v2\": 0.6 }",
};


// 此步骤仅用于演示。实际应用中数据应已经过压缩。 
byte[][] compressedData = new byte[data.length][];
for (int i = 0 ; i < data.length; i++) {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    GZIPOutputStream gz = new GZIPOutputStream(baos);
    gz.write(data[i].getBytes(StandardCharsets.UTF_8));
    gz.finish();
    compressedData[i] = baos.toByteArray();
}

InsertSettings insertSettings = new InsertSettings()
        .appCompressedData(true, "gzip"); // 定义压缩算法(通过 HTTP 头发送)

try (InsertResponse response = client.insert(tableName, out -> {
    // 写入数据 
    for (byte[] row : compressedData) {
        out.write(row);
    }
}, ClickHouseFormat.JSONEachRow, insertSettings).get()) {
    System.out.println("已写入行数:" + response.getWrittenRows());
}    

InsertSettings

插入操作的配置选项。

配置方法

方法说明
setQueryId(String queryId)设置将分配给此操作的查询 ID。默认值:null
setDeduplicationToken(String token)设置去重令牌。该令牌将发送到服务器,可用于标识查询。默认值:null
setInputStreamCopyBufferSize(int size)复制缓冲区大小。该缓冲区在写入操作期间用于将用户提供的输入流中的数据复制到输出流。默认值:8196
serverSetting(String name, String value)为某个操作设置单个服务器设置。
serverSetting(String name, Collection values)为某个操作设置具有多个值的服务器设置。集合中的元素应为 String 类型的值。
setDBRoles(Collection dbRoles)设置在执行操作之前要生效的数据库角色。集合中的元素应为 String 类型的值。
setOption(String option, Object value)以原始形式设置配置选项。这不是服务器设置。

InsertResponse

表示插入操作结果的响应对象。仅当客户端从服务器收到响应时可用。

注意

应尽快关闭此对象以释放连接,因为在前一个响应的所有数据完全读取完毕之前,该连接无法被复用。

MethodDescription
OperationMetrics getMetrics()返回包含该操作指标的对象。
String getQueryId()返回由应用程序(通过操作设置)或服务器为该操作分配的查询 ID。

查询 API

query(String sqlQuery)

按原样发送 sqlQuery。响应格式由查询设置决定。QueryResponse 将持有对响应流的引用,该响应流应由支持相应格式的读取器来消费。

函数签名

CompletableFuture<QueryResponse> query(String sqlQuery, QuerySettings settings)
CompletableFuture<QueryResponse> query(String sqlQuery)

参数

sqlQuery - 单条 SQL 语句。查询会按原样发送到服务器。

settings - 请求相关的设置。

返回值

类型为 QueryResponse 的 Future —— 结果数据集以及诸如服务器端指标等附加信息。在消费完数据集后应关闭 Response 对象。

示例

final String sql = "select * from " + TABLE_NAME + " where title <> '' limit 10";

// 默认格式为 RowBinaryWithNamesAndTypesFormatReader,因此读取器包含所有列信息
try (QueryResponse response = client.query(sql).get(3, TimeUnit.SECONDS);) {

    // 创建读取器以便访问数据
    ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);

    while (reader.hasNext()) {
        reader.next(); // 从流中读取并解析下一条记录

        // 获取值
        double id = reader.getDouble("id");
        String title = reader.getString("title");
        String url = reader.getString("url");

        // 收集数据 
    }
} catch (Exception e) {
    log.error("读取数据失败", e);
}

// 将业务逻辑置于读取块之外以尽快释放 HTTP 连接。  

query(String sqlQuery, Map<String, Object> queryParams, QuerySettings settings)

按原样发送 sqlQuery,并同时发送查询参数,使服务器能够编译该 SQL 表达式。

方法签名

CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Object> queryParams, QuerySettings settings)

参数

sqlQuery - 带有 {} 占位符的 SQL 表达式。

queryParams - 用于在服务器端补全该 SQL 表达式的变量映射。

settings - 请求设置。

返回值

QueryResponse 类型的 Future——包含结果数据集以及诸如服务器端指标等附加信息。使用完数据集后应关闭 Response 对象。

示例


// 定义参数。这些参数将随请求一起发送到服务器。   
Map<String, Object> queryParams = new HashMap<>();
queryParams.put("param1", 2);

try (QueryResponse response =
        client.query("SELECT * FROM " + table + " WHERE col1 >= {param1:UInt32}", queryParams, new QuerySettings()).get()) {

    // 创建读取器以便捷访问数据
    ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);

    while (reader.hasNext()) {
        reader.next(); // 从流中读取并解析下一条记录

        // 读取数据 
    }

} catch (Exception e) {
    log.error("读取数据失败", e);
}

queryAll(String sqlQuery)

RowBinaryWithNamesAndTypes 格式查询数据。返回结果为一个集合。读取性能与使用 reader 相同,但需要更多内存来存储整个数据集。

签名

List<GenericRecord> queryAll(String sqlQuery)

参数

sqlQuery - 用于从服务器查询数据的 SQL 表达式。

返回值

完整数据集,以 GenericRecord 对象列表的形式表示,这些对象以逐行访问的方式提供对结果数据的访问。

示例

try {
    log.info("读取整表并逐条处理记录");
    final String sql = "select * from " + TABLE_NAME + " where title <> ''";

    // 读取完整结果集并逐条处理
    client.queryAll(sql).forEach(row -> {
        double id = row.getDouble("id");
        String title = row.getString("title");
        String url = row.getString("url");

        log.info("id: {}, title: {}, url: {}", id, title, url);
    });
} catch (Exception e) {
    log.error("数据读取失败", e);
}

QuerySettings

查询操作的配置选项。

配置方法

MethodDescription
setQueryId(String queryId)设置将分配给该操作的查询 ID。
setFormat(ClickHouseFormat format)设置响应格式。完整列表参见 RowBinaryWithNamesAndTypes
setMaxExecutionTime(Integer maxExecutionTime)设置操作在服务器上的最长执行时间,不会影响读取超时时间。
waitEndOfQuery(Boolean waitEndOfQuery)请求服务器在发送响应前等待查询结束。
setUseServerTimeZone(Boolean useServerTimeZone)使用服务器时区(参见客户端配置)解析操作结果中的日期/时间类型。默认值为 false
setUseTimeZone(String timeZone)请求服务器在时间转换时使用 timeZone。参见 session_timezone
serverSetting(String name, String value)为某个操作设置单个服务器设置。
serverSetting(String name, Collection values)为某个操作设置带有多个值的服务器设置。集合中的条目应为 String 值。
setDBRoles(Collection dbRoles)设置在执行操作前要应用的 DB 角色。集合中的条目应为 String 值。
setOption(String option, Object value)以原始格式设置一个配置选项。这不是服务器设置。

QueryResponse

用于承载查询执行结果的响应对象。仅当客户端收到服务器响应时才可用。

注意

应尽快关闭此对象以释放连接,因为在前一个响应的全部数据被完全读取之前,该连接无法复用。

MethodDescription
ClickHouseFormat getFormat()返回响应中数据所采用的格式。
InputStream getInputStream()返回指定格式的未压缩数据字节流。
OperationMetrics getMetrics()返回包含操作指标的对象。
String getQueryId()返回为该操作分配的查询 ID(通过操作设置或由服务器分配)。
TimeZone getTimeZone()返回在处理响应中的 Date/DateTime 类型时应使用的时区。

示例

  • 示例代码见 repo
  • 请参考 Spring Service 的 实现

通用 API

getTableSchema(String table)

获取指定 table 的表结构。

方法签名

TableSchema getTableSchema(String table)
TableSchema getTableSchema(String table, String database)

参数

table - 需要获取其表结构数据的表名。

database - 目标表所在的数据库。

返回值

返回一个 TableSchema 对象,包含该表的列列表。

getTableSchemaFromQuery(String sql)

从 SQL 语句中获取表结构。

签名

TableSchema getTableSchemaFromQuery(String sql)

参数

sql - 需要返回其模式(schema)的 SELECT SQL 语句。

返回值

返回一个 TableSchema 对象,其中的列与该 sql 语句中的列相对应。

TableSchema

register(Class<?> clazz, TableSchema schema)

为指定的 Java Classschema 编译序列化/反序列化层,以用于写入和读取数据。该方法会基于 getter/setter 与对应列的配对创建 serializer 和 deserializer。 列的匹配是通过从方法名中提取列名来完成的。例如,getFirstName 会对应列 first_namefirstname

方法签名

void register(Class<?> clazz, TableSchema schema)

参数

clazz - 表示用于读写数据的 POJO 的类。

schema - 用于与 POJO 属性进行匹配的数据模式。

示例

client.register(ArticleViewEvent.class, client.getTableSchema(TABLE_NAME));

使用示例

完整的示例代码存储在仓库的 examples 文件夹中:

  • client-v2 - 主要示例集。
  • demo-service - 演示如何在 Spring Boot 应用中使用客户端的示例。
  • demo-kotlin-service - 演示如何在 Ktor(Kotlin)应用中使用客户端的示例。