跳转到主内容
跳转到主内容

Java 客户端

用于通过协议与数据库服务器通信的 Java 客户端库。当前实现仅支持 HTTP 接口。 该库提供了自己的 API 用于向服务器发送请求。该库还提供了用于处理不同二进制数据格式(RowBinary* & Native*)的工具。

设置


<dependency>
    <groupId>com.clickhouse</groupId>
    <artifactId>client-v2</artifactId>
    <version>0.9.4</version>
</dependency>

初始化

Client 对象通过 com.clickhouse.client.api.Client.Builder#build() 进行初始化。每个客户端都有自己的上下文,客户端之间不共享对象。 Builder 提供了配置方法以便于进行设置。

示例:

 Client client = new Client.Builder()
                .addEndpoint("https://clickhouse-cloud-instance:8443/")
                .setUsername(user)
                .setPassword(password)
                .build();

Client 实现了 AutoCloseable 接口,不再需要时应将其关闭。

身份验证

身份验证在初始化阶段针对每个客户端进行配置。支持三种身份验证方式:密码、访问令牌、SSL 客户端证书。

使用密码进行身份验证需要通过调用 setUsername(String)setPassword(String) 方法来设置用户名和密码:

 Client client = new Client.Builder()
        .addEndpoint("https://clickhouse-cloud-instance:8443/")
        .setUsername(user)
        .setPassword(password)
        .build();

使用访问令牌进行身份验证需要通过调用 setAccessToken(String) 设置访问令牌:

 Client client = new Client.Builder()
        .addEndpoint("https://clickhouse-cloud-instance:8443/")
        .setAccessToken(userAccessToken)
        .build();

通过 SSL 客户端证书进行身份验证需要设置用户名、启用 SSL 身份验证、设置客户端证书和客户端密钥,方法是分别调用 setUsername(String)useSSLAuthentication(boolean)setClientCertificate(String)setClientKey(String):

Client client = new Client.Builder()
        .useSSLAuthentication(true)
        .setUsername("some_user")
        .setClientCertificate("some_user.crt")
        .setClientKey("some_user.key")
注意

SSL 身份验证在生产环境中可能难以排查,因为 SSL 库返回的许多错误信息不够详细。例如,如果客户端证书与密钥不匹配,服务器会立即终止连接(对于 HTTP 而言,这发生在连接初始化阶段,此时尚未发送任何 HTTP 请求,因此也不会返回任何响应)。

请使用 openssl 等工具验证证书和密钥:

  • 检查密钥完整性:openssl rsa -in [key-file.key] -check -noout
  • 检查客户端证书中的 CN 是否与某个 USER 匹配:
    • 从用户证书中获取 CN - openssl x509 -noout -subject -in [user.cert]
    • 确认数据库中已设置相同的值:select name, auth_type, auth_params from system.users where auth_type = 'ssl_certificate'(查询结果中的 auth_params 字段类似于 {"common_names":["some_user"]}

配置

所有设置均通过实例方法(即配置方法)定义,使每个值的作用域和上下文清晰明确。 主要配置参数在单一作用域(客户端或操作)中定义,互不覆盖。

配置在创建客户端时定义。请参阅 com.clickhouse.client.api.Client.Builder

客户端配置

MethodArgumentsDescriptionDefaultKey
addEndpoint(String endpoint)endpoint - URL 格式的服务器地址将服务器端点添加到可用服务器列表。目前仅支持一个端点。nonenone
addEndpoint(Protocol protocol, String host, int port, boolean secure)protocol - 连接协议
host - IP 或主机名
secure - 使用 HTTPS
将服务器端点添加到可用服务器列表。目前仅支持一个端点。nonenone
enableConnectionPool(boolean enable)enable - 启用/禁用标志设置是否启用连接池trueconnection_pool_enabled
setMaxConnections(int maxConnections)maxConnections - 连接数量设置客户端对每个服务器端点最多可以打开的连接数。10max_open_connections
setConnectionTTL(long timeout, ChronoUnit unit)timeout - 超时值
unit - 时间单位
设置连接的生存时间 (TTL),超过该时间后连接将被视为不再活动-1connection_ttl
setKeepAliveTimeout(long timeout, ChronoUnit unit)timeout - 超时值
unit - 时间单位
设置 HTTP 连接的 Keep-Alive 超时时间。设置为 0 以禁用 Keep-Alive。-http_keep_alive_timeout
setConnectionReuseStrategy(ConnectionReuseStrategy strategy)strategy - LIFOFIFO选择连接池应使用的连接复用策略FIFOconnection_reuse_strategy
setDefaultDatabase(String database)database - 数据库名称设置默认数据库defaultdatabase

服务器设置

服务器端设置可以在创建客户端时设置一次(参见 BuilderserverSetting 方法),也可以在操作级别设置(参见操作设置类的 serverSetting)。

 try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false)
        .setUsername("default")
        .setPassword(ClickHouseServerForTest.getPassword())
        .compressClientRequest(true)

        // Client level
        .serverSetting("max_threads", "10")
        .serverSetting("async_insert", "1")
        .serverSetting("roles", Arrays.asList("role1", "role2"))

        .build()) {

	// Operation level
	QuerySettings querySettings = new QuerySettings();
	querySettings.serverSetting("session_timezone", "Europe/Zurich");

	...
}

⚠️ 当通过 setOption 方法设置选项时(无论是 Client.Builder 还是操作设置类),服务器设置名称都应添加 clickhouse_setting_ 前缀。此时,com.clickhouse.client.api.ClientConfigProperties#serverSetting() 方法会很有用。

自定义 HTTP 请求头

可以为所有操作(客户端级别)或单个操作(操作级别)设置自定义 HTTP 头。


QuerySettings settings = new QuerySettings()
    .httpHeader(HttpHeaders.REFERER, clientReferer)
    .setQueryId(qId);

当通过 setOption 方法设置选项时(无论是 Client.Builder 还是操作设置类),自定义请求头名称应以 http_header_ 为前缀。此时,可以使用 com.clickhouse.client.api.ClientConfigProperties#httpHeader() 方法。

通用定义

ClickHouseFormat

支持的格式的枚举,包含 ClickHouse 支持的所有格式。

  • raw - 用户需要对原始数据进行转码
  • full - 客户端可以自行对数据进行转码,并能接收原始数据流
  • - - ClickHouse 不支持对该格式执行此操作

此客户端版本支持:

格式输入输出
TabSeparated原始raw
TabSeparatedRawrawraw
TabSeparatedWithNames原始raw
TabSeparatedWithNamesAndTypesrawraw
TabSeparatedRawWithNamesrawraw
TabSeparatedRawWithNamesAndTypesrawraw
Templaterawraw
TemplateIgnoreSpaces原始*
CSV原始原始
CSVWithNamesrawraw
CSVWithNamesAndTypes原始raw
CustomSeparatedraw原始
CustomSeparatedWithNames原始原始
CustomSeparatedWithNamesAndTypesrawraw
SQLInsert-raw
Valuesraw原始
Vertical*原始
JSONraw原始
JSONAsStringraw-
JSONAsObjectraw*
JSONStringsraw原始
JSONColumnsraw原样
JSONColumnsWithMetadata原始原始
JSONCompactraw原始
JSONCompactStrings-原始
JSONCompactColumnsraw原样
JSONEachRow原始原始
PrettyJSONEachRow*raw
JSONEachRowWithProgress-原始
JSONStringsEachRowraw原始
JSONStringsEachRowWithProgress*原始
JSONCompactEachRow原始raw
JSONCompactEachRowWithNames原始raw
JSONCompactEachRowWithNamesAndTypes原始原始
JSONCompactStringsEachRowrawraw
JSONCompactStringsEachRowWithNames原始原始
JSONCompactStringsEachRowWithNamesAndTypesrawraw
JSONObjectEachRowraw原始
BSONEachRow原始raw
TSKVraw原始
Pretty-原始
PrettyNoEscapes*原始格式
PrettyMonoBlock-raw
PrettyNoEscapesMonoBlock*raw
PrettyCompact-原始
PrettyCompactNoEscapes*原始
PrettyCompactMonoBlock-raw
PrettyCompactNoEscapesMonoBlock*原始
PrettySpace-原始
PrettySpaceNoEscapes*原始
PrettySpaceMonoBlock-原始
PrettySpaceNoEscapesMonoBlock*raw
Prometheus-raw
Protobuf原始原始
ProtobufSingle原始原始
ProtobufListraw原始
Avro原始原始
AvroConfluentraw*
Parquet原始数据原始数据
ParquetMetadata原始-
Arrow原始raw
ArrowStreamrawraw
ORC原始raw
One原始*
Npy原始原始
RowBinary完整full
RowBinaryWithNamesfull完全支持
RowBinaryWithNamesAndTypes完全完全支持
RowBinaryWithDefaults完整-
Native完整原始
Null*原始
XML-raw
CapnProtoraw原始
LineAsString原始原始
Regexp原始数据*
RawBLOBrawraw
MsgPack原始原始
MySQLDump原始-
DWARF原始*
Markdown-raw
Form原始*

插入 API

insert(String tableName, InputStream data, ClickHouseFormat format)

接受指定格式的字节 InputStream 数据。要求 data 使用 format 编码。

签名

CompletableFuture<InsertResponse> insert(String tableName, InputStream data, ClickHouseFormat format, InsertSettings settings)
CompletableFuture<InsertResponse> insert(String tableName, InputStream data, ClickHouseFormat format)

参数

tableName - 目标表名。

data - 已编码数据的输入流。

format - 数据编码所使用的格式。

settings - 请求设置。

返回值

InsertResponse 类型的 Future - 操作结果及服务器端指标等附加信息。

示例

try (InputStream dataStream = getDataStream()) {
    try (InsertResponse response = client.insert(TABLE_NAME, dataStream, ClickHouseFormat.JSONEachRow,
            insertSettings).get(3, TimeUnit.SECONDS)) {

        log.info("Insert finished: {} rows written", response.getMetrics().getMetric(ServerMetrics.NUM_ROWS_WRITTEN).getLong());
    } catch (Exception e) {
        log.error("Failed to write JSONEachRow data", e);
        throw new RuntimeException(e);
    }
}

insert(String tableName, List<?> data, InsertSettings settings)

向数据库发送写入请求。对象列表将被转换为高效格式,然后发送到服务器。列表项的类应事先使用 register(Class, TableSchema) 方法进行注册。

签名

client.insert(String tableName, List<?> data, InsertSettings settings)
client.insert(String tableName, List<?> data)

参数

tableName - 目标表名称。

data - DTO(数据传输对象)集合对象。

settings - 请求设置。

返回值

InsertResponse 类型的 Future - 包含操作结果以及服务器端指标等附加信息。

示例

// Important step (done once) - register class to pre-compile object serializer according to the table schema.
client.register(ArticleViewEvent.class, client.getTableSchema(TABLE_NAME));

List<ArticleViewEvent> events = loadBatch();

try (InsertResponse response = client.insert(TABLE_NAME, events).get()) {
    // handle response, then it will be closed and connection that served request will be released.
}

InsertSettings

插入操作的配置选项。

配置方式

方法说明
setQueryId(String queryId)设置要分配给该操作的查询 ID。默认值:null
setDeduplicationToken(String token)设置去重令牌。该令牌将被发送到服务器,可用于标识该查询。默认值:null
setInputStreamCopyBufferSize(int size)复制缓冲区大小。该缓冲区在写入操作期间用于将数据从用户提供的输入流复制到输出流。默认值:8196
serverSetting(String name, String value)为一次操作设置单个服务器参数。
serverSetting(String name, Collection values)为某个操作设置一个可包含多个值的服务器配置项。集合中的元素必须是 String 类型的值。
setDBRoles(Collection dbRoles)设置在执行操作前生效的 DB 角色。集合中的元素应为 String 类型的值。
setOption(String option, Object value)以原始格式设置配置选项。这不是服务器级设置。

InsertResponse

响应对象,保存插入操作的结果。仅在客户端收到服务器响应时可用。

注意

应尽快关闭此对象以释放连接,因为只有在完全读取上一个响应的所有数据后,连接才能被重用。

方法说明
OperationMetrics getMetrics()返回一个包含操作指标的对象。
String getQueryId()返回为该操作分配的查询 ID,该 ID 由应用程序(通过操作设置)或服务器指定。

查询 API

query(String sqlQuery)

按原样发送 sqlQuery。响应格式由查询设置决定。QueryResponse 将持有对响应流的引用,该响应流应由支持相应格式的读取器消费。

签名

CompletableFuture<QueryResponse> query(String sqlQuery, QuerySettings settings)
CompletableFuture<QueryResponse> query(String sqlQuery)

参数

sqlQuery - 单个 SQL 语句。该查询将按原样发送到服务器。

settings - 请求设置。

返回值

QueryResponse 类型的 Future - 包含结果数据集和附加信息(如服务器端指标)。使用完数据集后应关闭 Response 对象。

示例

final String sql = "select * from " + TABLE_NAME + " where title <> '' limit 10";

// Default format is RowBinaryWithNamesAndTypesFormatReader so reader have all information about columns
try (QueryResponse response = client.query(sql).get(3, TimeUnit.SECONDS);) {

    // Create a reader to access the data in a convenient way
    ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);

    while (reader.hasNext()) {
        reader.next(); // Read the next record from stream and parse it

        // get values
        double id = reader.getDouble("id");
        String title = reader.getString("title");
        String url = reader.getString("url");

        // collecting data
    }
} catch (Exception e) {
    log.error("Failed to read data", e);
}

// put business logic outside of the reading block to release http connection asap.

query(String sqlQuery, Map<String, Object> queryParams, QuerySettings settings)

按原样发送 sqlQuery。此外还会发送查询参数,以便服务器编译 SQL 表达式。

签名

CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Object> queryParams, QuerySettings settings)

参数

sqlQuery - 带有占位符 {} 的 SQL 表达式。

queryParams - 用于在服务器上完成 SQL 表达式的变量映射表。

settings - 请求设置。

返回值

QueryResponse 类型的 Future - 包含结果数据集和附加信息(如服务器端指标)。使用完数据集后应关闭 Response 对象。

示例


// define parameters. They will be sent to the server along with the request.
Map<String, Object> queryParams = new HashMap<>();
queryParams.put("param1", 2);

try (QueryResponse response =
        client.query("SELECT * FROM " + table + " WHERE col1 >= {param1:UInt32}", queryParams, new QuerySettings()).get()) {

    // Create a reader to access the data in a convenient way
    ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);

    while (reader.hasNext()) {
        reader.next(); // Read the next record from stream and parse it

        // reading data
    }

} catch (Exception e) {
    log.error("Failed to read data", e);
}

queryAll(String sqlQuery)

RowBinaryWithNamesAndTypes 格式查询数据。将结果作为集合返回。读取性能与 reader 相同,但需要更多内存来保存整个数据集。

签名

List<GenericRecord> queryAll(String sqlQuery)

参数

sqlQuery - 用于从服务器查询数据的 SQL 表达式。

返回值

完整数据集由 GenericRecord 对象列表表示,这些对象以行形式提供对结果数据的访问。

示例

try {
    log.info("Reading whole table and process record by record");
    final String sql = "select * from " + TABLE_NAME + " where title <> ''";

    // Read whole result set and process it record by record
    client.queryAll(sql).forEach(row -> {
        double id = row.getDouble("id");
        String title = row.getString("title");
        String url = row.getString("url");

        log.info("id: {}, title: {}, url: {}", id, title, url);
    });
} catch (Exception e) {
    log.error("Failed to read data", e);
}

QuerySettings

查询操作的配置选项。

配置方式

方法说明
setQueryId(String queryId)设置要分配给该操作的查询 ID。
setFormat(ClickHouseFormat format)设置响应格式。完整列表请参见 RowBinaryWithNamesAndTypes
setMaxExecutionTime(Integer maxExecutionTime)在服务器端设置操作的最大执行时间。不会影响读取超时设置。
waitEndOfQuery(Boolean waitEndOfQuery)请求服务器在发送响应前等待查询完成。
setUseServerTimeZone(Boolean useServerTimeZone)服务器时区(参见客户端配置)将用于解析操作结果中的日期/时间类型。默认值为 false
setUseTimeZone(String timeZone)请求服务器使用 timeZone 进行时间转换。参见 session_timezone
serverSetting(String name, String value)为一次操作设置单个服务器配置项。
serverSetting(String name, Collection values)为某个操作设置可包含多个值的单个服务器配置项。集合中的各项应为 String 值。
setDBRoles(Collection dbRoles)设置在执行操作前要应用的数据库角色。集合中的元素应为 String 值。
setOption(String option, Object value)以原始格式设置配置选项。这不是服务器级别的设置。

QueryResponse

用于保存查询执行结果的响应对象。仅在客户端从服务器收到响应时可用。

注意

应尽快关闭此对象以释放连接,因为只有在完全读取上一个响应的所有数据后,连接才能被重用。

方法说明
ClickHouseFormat getFormat()返回用于对响应数据进行编码的格式。
InputStream getInputStream()返回采用指定格式的未压缩数据字节流。
OperationMetrics getMetrics()返回一个包含该操作各项指标的对象。
String getQueryId()返回为该操作分配的查询 ID,该 ID 由应用程序(通过操作设置)或由服务器生成。
TimeZone getTimeZone()返回用于处理响应中 Date/DateTime 类型的时区。

示例

通用 API

getTableSchema(String table)

获取 table 的表架构。

签名

TableSchema getTableSchema(String table)
TableSchema getTableSchema(String table, String database)

参数

table - 要获取模式数据的表名。

database - 目标表所在的数据库。

返回值

返回包含表列列表的 TableSchema 对象。

getTableSchemaFromQuery(String sql)

从 SQL 语句中获取模式(schema)。

签名

TableSchema getTableSchemaFromQuery(String sql)

参数

sql - 需要返回其模式(schema)的 "SELECT" SQL 语句。

返回值

返回一个 TableSchema 对象,其列与 sql 表达式相匹配。

TableSchema

register(Class<?> clazz, TableSchema schema)

为 Java 类编译序列化和反序列化层,以便使用 schema 进行数据的写入和读取。该方法将为 getter/setter 方法对及其对应的列创建序列化器和反序列化器。 列匹配通过从方法名称中提取列名来实现。例如,getFirstName 方法将对应于列 first_namefirstname

签名

void register(Class<?> clazz, TableSchema schema)

参数

clazz - 表示用于读写数据的 POJO 类。

schema - 用于与 POJO 属性进行匹配的数据架构。

示例

client.register(ArticleViewEvent.class, client.getTableSchema(TABLE_NAME));

使用示例

完整的示例代码存储在代码仓库的 example 文件夹中:

迁移指南

旧版客户端(V1)使用 com.clickhouse.client.ClickHouseClient#builder 作为入口点。新版客户端(V2)采用类似的模式,使用 com.clickhouse.client.api.Client.Builder。主要差异如下:

  • 这里不会使用 service loader 来获取具体实现。com.clickhouse.client.api.Client 是一个外观类,作为未来各种实现的统一入口。
  • 配置来源更少:一个由构建器提供,另一个来自操作设置(QuerySettingsInsertSettings)。先前的版本在每个节点上都有配置,并且在某些情况下会从环境变量中加载。

配置参数匹配

V1 中有 3 个与配置相关的枚举类:

  • com.clickhouse.client.config.ClickHouseDefaults - 在大多数使用场景下通常需要设置的配置参数,例如 USERPASSWORD
  • com.clickhouse.client.config.ClickHouseClientOption - 客户端特定的配置参数,例如 HEALTH_CHECK_INTERVAL
  • com.clickhouse.client.http.config.ClickHouseHttpOption - 特定于 HTTP 接口的配置参数,例如 RECEIVE_QUERY_PROGRESS

它们的设计目的是对参数进行分组并提供清晰的分离。但在某些情况下会导致混淆(例如 com.clickhouse.client.config.ClickHouseDefaults#ASYNCcom.clickhouse.client.config.ClickHouseClientOption#ASYNC 之间是否存在差异)。 新的 V2 客户端使用 com.clickhouse.client.api.Client.Builder 作为所有可能客户端配置选项的单一字典。所有配置参数名称列在 com.clickhouse.client.api.ClientConfigProperties 中。

下表列出了新客户端支持的旧选项及其新含义。

图例: ✔ = 支持,✗ = 已弃用

V1 配置项V2 构建器方法备注
ClickHouseDefaults#HOSTClient.Builder#addEndpoint
ClickHouseDefaults#PROTOCOLV2 中仅支持 HTTP
ClickHouseDefaults#DATABASE
ClickHouseClientOption#DATABASE
Client.Builder#setDefaultDatabase
ClickHouseDefaults#USERClient.Builder#setUsername
ClickHouseDefaults#PASSWORDClient.Builder#setPassword
ClickHouseClientOption#CONNECTION_TIMEOUTClient.Builder#setConnectTimeout
ClickHouseClientOption#CONNECTION_TTLClient.Builder#setConnectionTTL
ClickHouseHttpOption#MAX_OPEN_CONNECTIONSClient.Builder#setMaxConnections
ClickHouseHttpOption#KEEP_ALIVE
ClickHouseHttpOption#KEEP_ALIVE_TIMEOUT
Client.Builder#setKeepAliveTimeout
ClickHouseHttpOption#CONNECTION_REUSE_STRATEGYClient.Builder#setConnectionReuseStrategy
ClickHouseHttpOption#USE_BASIC_AUTHENTICATIONClient.Builder#useHTTPBasicAuth

通用差异

  • Client V2 使用更少的自定义类以提高可移植性。例如,V2 可以与任何 java.io.InputStream 的实现配合使用,将数据写入服务器。
  • Client V2 的 async 配置默认为 off。这意味着不会额外创建线程,并让应用程序对客户端拥有更多控制权。对于大多数使用场景,此配置都应保持为 off。启用 async 会为请求创建一个单独的线程。仅在使用由应用程序控制的 executor 时才有意义(参见 com.clickhouse.client.api.Client.Builder#setSharedOperationExecutor)。

写入数据

  • 可以使用任何 java.io.InputStream 的实现。支持 V1 com.clickhouse.data.ClickHouseInputStream,但不推荐使用。
  • 一旦检测到输入流已结束,就会进行相应处理。在此之前,应先关闭该请求的输出流。

V1 插入 TSV 格式的数据。

InputStream inData = getInData();
ClickHouseRequest.Mutation request = client.read(server)
        .write()
        .table(tableName)
        .format(ClickHouseFormat.TSV);
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future;
try (ClickHousePipedOutputStream requestBody = ClickHouseDataStreamFactory.getInstance()
        .createPipedOutputStream(config)) {
    // start the worker thread which transfer data from the input into ClickHouse
    future = request.data(requestBody.getInputStream()).execute();

    // Copy data from inData stream to requestBody stream

    // We need to close the stream before getting a response
    requestBody.close();

    try (ClickHouseResponse response = future.get()) {
        ClickHouseResponseSummary summary = response.getSummary();
        Assert.assertEquals(summary.getWrittenRows(), numRows, "Num of written rows");
    }
}

V2 插入 TSV 格式的数据。

InputStream inData = getInData();
InsertSettings settings = new InsertSettings().setInputStreamCopyBufferSize(8198 * 2); // set copy buffer size
try (InsertResponse response = client.insert(tableName, inData, ClickHouseFormat.TSV, settings).get(30, TimeUnit.SECONDS)) {

  // Insert is complete at this point

} catch (Exception e) {
 // Handle exception
}
  • 只需要调用一个方法,无需创建额外的请求对象。
  • 当所有数据复制完成后,请求体数据流会自动关闭。
  • 现已提供新的低级 API:com.clickhouse.client.api.Client#insert(java.lang.String, java.util.List<java.lang.String>, com.clickhouse.client.api.DataStreamWriter, com.clickhouse.data.ClickHouseFormat, com.clickhouse.client.api.insert.InsertSettings)com.clickhouse.client.api.DataStreamWriter 专为实现自定义数据写入逻辑而设计,例如从队列中读取数据。

读取数据

  • 默认情况下,数据以 RowBinaryWithNamesAndTypes 格式读取。目前在需要进行数据绑定时,仅支持此格式。
  • 可以使用 List<GenericRecord> com.clickhouse.client.api.Client#queryAll(java.lang.String) 方法将数据读取为一组记录。该方法会将数据读取到内存中并释放连接,无需额外处理。GenericRecord 提供对数据的访问能力,并实现了一些类型转换。
Collection<GenericRecord> records = client.queryAll("SELECT * FROM table");
for (GenericRecord record : records) {
    int rowId = record.getInteger("rowID");
    String name = record.getString("name");
    LocalDateTime ts = record.getLocalDateTime("ts");
}

Java 客户端库,用于通过数据库服务器协议进行通信。当前实现仅支持 HTTP 接口。该库提供了自有 API 用于向服务器发送请求。

弃用

此库即将被弃用。新项目请使用最新的 Java 客户端

设置

<!-- https://mvnrepository.com/artifact/com.clickhouse/clickhouse-http-client -->
<dependency>
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-http-client</artifactId>
    <version>0.7.2</version>
</dependency>

从版本 0.5.0 开始,驱动程序使用新的客户端 HTTP 库,需要将其添加为依赖项。

<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents.client5/httpclient5 -->
<dependency>
    <groupId>org.apache.httpcomponents.client5</groupId>
    <artifactId>httpclient5</artifactId>
    <version>5.3.1</version>
</dependency>

初始化

连接 URL 格式:protocol://host[:port][/database][?param[=value][&param[=value]][#tag[,tag]],示例:

连接到单个节点:

ClickHouseNode server = ClickHouseNode.of("http://localhost:8123/default?compress=0");

连接到具有多个节点的集群:

ClickHouseNodes servers = ClickHouseNodes.of(
    "jdbc:ch:http://server1.domain,server2.domain,server3.domain/my_db"
    + "?load_balancing_policy=random&health_check_interval=5000&failover=2");

查询 API

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
     ClickHouseResponse response = client.read(servers)
        .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
        .query("select * from numbers limit :limit")
        .params(1000)
        .executeAndWait()) {
            ClickHouseResponseSummary summary = response.getSummary();
            long totalRows = summary.getTotalRowsToRead();
}

流式查询 API

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
     ClickHouseResponse response = client.read(servers)
        .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
        .query("select * from numbers limit :limit")
        .params(1000)
        .executeAndWait()) {
            for (ClickHouseRecord r : response.records()) {
            int num = r.getValue(0).asInteger();
            // type conversion
            String str = r.getValue(0).asString();
            LocalDate date = r.getValue(0).asDate();
        }
}

请参阅代码仓库中的完整代码示例

插入 API


try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
     ClickHouseResponse response = client.read(servers).write()
        .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
        .query("insert into my_table select c2, c3 from input('c1 UInt8, c2 String, c3 Int32')")
        .data(myInputStream) // `myInputStream` is source of data in RowBinary format
        .executeAndWait()) {
            ClickHouseResponseSummary summary = response.getSummary();
            summary.getWrittenRows();
}

请参阅仓库中的完整代码示例

RowBinary 编码

RowBinary 格式在其页面中进行了说明。

以下是一个 代码示例.

功能

压缩

客户端默认使用 LZ4 压缩,需要以下依赖项:

<!-- https://mvnrepository.com/artifact/org.lz4/lz4-java -->
<dependency>
    <groupId>org.lz4</groupId>
    <artifactId>lz4-java</artifactId>
    <version>1.8.0</version>
</dependency>

您可以选择使用 gzip,只需在连接 URL 中设置 compress_algorithm=gzip 即可。

或者,您可以通过以下几种方式禁用压缩。

  1. 通过在连接 URL 中将 compress 设置为 0 来禁用压缩:http://localhost:8123/default?compress=0
  2. 通过客户端配置禁用:
ClickHouseClient client = ClickHouseClient.builder()
   .config(new ClickHouseConfig(Map.of(ClickHouseClientOption.COMPRESS, false)))
   .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
   .build();

请参阅压缩文档了解更多不同的压缩选项。

多查询

在同一会话内的工作线程中依次执行多个查询:

CompletableFuture<List<ClickHouseResponseSummary>> future = ClickHouseClient.send(servers.apply(servers.getNodeSelector()),
    "create database if not exists my_base",
    "use my_base",
    "create table if not exists test_table(s String) engine=Memory",
    "insert into test_table values('1')('2')('3')",
    "select * from test_table limit 1",
    "truncate table test_table",
    "drop table if exists test_table");
List<ClickHouseResponseSummary> results = future.get();

命名参数

您可以按名称传递参数,而不必仅依赖参数在参数列表中的位置。使用 params 函数即可实现此功能。

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
     ClickHouseResponse response = client.read(servers)
        .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
        .query("select * from my_table where name=:name limit :limit")
        .params("Ben", 1000)
        .executeAndWait()) {
            //...
        }
}
参数

所有涉及 String 类型的 params 签名(StringString[]Map<String, String>)均假定传入的键为有效的 ClickHouse SQL 字符串。例如:

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
     ClickHouseResponse response = client.read(servers)
        .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
        .query("select * from my_table where name=:name")
        .params(Map.of("name","'Ben'"))
        .executeAndWait()) {
            //...
        }
}

如果您不想手动将 String 对象解析为 ClickHouse SQL,可以使用位于 com.clickhouse.data 的辅助函数 ClickHouseValues.convertToSqlExpression:

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
     ClickHouseResponse response = client.read(servers)
        .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
        .query("select * from my_table where name=:name")
        .params(Map.of("name", ClickHouseValues.convertToSqlExpression("Ben's")))
        .executeAndWait()) {
            //...
        }
}

在上述示例中,ClickHouseValues.convertToSqlExpression 会转义内部的单引号,并用有效的单引号将变量包围起来。

其他类型,如 IntegerUUIDArrayEnum,将在 params 中自动转换。

节点发现

Java 客户端提供自动发现 ClickHouse 节点的功能。自动发现默认为禁用状态。如需手动启用,请将 auto_discovery 设置为 true:

properties.setProperty("auto_discovery", "true");

或在连接 URL 中:

jdbc:ch://my-server/system?auto_discovery=true

启用自动发现后,无需在连接 URL 中指定所有 ClickHouse 节点。URL 中指定的节点将作为种子节点,Java 客户端会自动从系统表和/或 clickhouse-keeper 或 zookeeper 中发现其他节点。

以下选项用于配置自动发现功能:

属性默认值描述
auto_discoveryfalse客户端是否应从 system 表和/或 clickhouse-keeper/zookeeper 中发现更多节点。
node_discovery_interval0以毫秒为单位的节点发现间隔,值为零或负数表示仅进行一次发现。
node_discovery_limit100单次可发现的最大节点数;设置为零或负值表示不限制。

负载均衡

Java 客户端根据负载均衡策略选择 ClickHouse 节点来发送请求。通常,负载均衡策略负责以下事项:

  1. 从托管的节点列表中获取一个节点。
  2. 管理节点状态。
  3. (可选)调度一个用于节点发现的后台进程(如果启用了自动发现),并执行健康检查。

以下是配置负载均衡的选项列表:

属性默认值说明
load_balancing_policy""负载均衡策略可以是以下之一:
  • firstAlive - 请求将被发送到受管节点列表中第一个健康节点
  • random - 请求将被发送到受管节点列表中随机选取的一个节点
  • roundRobin - 请求将按照轮询方式依次发送到受管节点列表中的每个节点
  • 实现 ClickHouseLoadBalancingPolicy 的全限定类名 - 自定义负载均衡策略
  • 如果未指定该策略,请求将被发送到受管节点列表中的第一个节点
    load_balancing_tags""用于筛选节点的负载均衡标签。只有具有指定标签的节点才会接收请求
    health_check_interval0健康检查间隔(毫秒)。值为 0 或负数表示仅执行一次。
    health_check_methodClickHouseHealthCheckMethod.SELECT_ONE健康检查方法。可以是以下之一:
  • ClickHouseHealthCheckMethod.SELECT_ONE - 通过 select 1 查询进行检查
  • ClickHouseHealthCheckMethod.PING - 基于协议的检查,通常更快
  • node_check_interval0以毫秒为单位的节点检查间隔,负数将被视为 0。仅当自上次检查以来已过去指定的时间时,才会检查节点状态。
    health_check_intervalnode_check_interval 的区别在于,health_check_interval 选项会调度一个后台任务,对节点列表(全部节点或故障节点)执行状态检查,而 node_check_interval 指定的是针对某个特定节点,自上次检查以来需要经过的时间阈值。
    check_all_nodesfalse是否对所有节点执行健康检查,或仅对故障节点执行检查。

    故障转移和重试

    Java 客户端提供配置选项,用于设置故障转移和失败查询的重试行为:

    参数默认值说明
    故障切换0对单个请求允许执行故障转移的最大次数。值为 0 或负数表示不执行故障转移。发生故障转移时,会根据负载均衡策略将失败的请求发送到其他节点,以从故障中恢复。
    retry0单个请求允许重试的最大次数。为零或负值时表示不进行重试。仅当 ClickHouse 服务器返回 NETWORK_ERROR 错误码时,才会对同一节点重新发送该请求
    repeat_on_session_locktrue当会话被锁定时,是否重复执行直到超时(由 session_timeoutconnect_timeout 控制)。如果 ClickHouse 服务器返回 SESSION_IS_LOCKED 错误码,则会重试该失败请求。

    添加自定义 HTTP 请求头

    Java 客户端支持 HTTP/S 传输层,可在请求中添加自定义 HTTP 头。 应使用 custom_http_headers 属性,多个头之间使用 , 分隔,键值对使用 = 分隔

    Java 客户端支持

    options.put("custom_http_headers", "X-ClickHouse-Quota=test, X-ClickHouse-Test=test");
    

    JDBC 驱动

    properties.setProperty("custom_http_headers", "X-ClickHouse-Quota=test, X-ClickHouse-Test=test");