メインコンテンツまでスキップ
メインコンテンツまでスキップ

ClickHouse Rust Client

ClickHouseへの接続のための公式Rustクライアントで、元々はPaul Loydによって開発されました。クライアントのソースコードはGitHubリポジトリにあります。

概要

  • 行のエンコーディング/デコーディングにserdeを使用。
  • serde属性をサポート:skip_serializingskip_deserializingrename
  • RowBinary形式をHTTPトランスポート上で使用。
    • TCP経由でNativeに切り替える計画があります。
  • TLS(native-tlsおよびrustls-tls機能を通じて)をサポート。
  • 圧縮および解凍(LZ4)をサポート。
  • データの選択または挿入、DDLの実行、およびクライアントサイドのバッチ処理用のAPIを提供。
  • ユニットテスト用の便利なモックを提供。

インストール

クレートを使用するには、Cargo.tomlに以下を追加します。

[dependencies]
clickhouse = "0.12.2"

[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }

他の情報: crates.ioページ

Cargo機能

  • lz4(デフォルトで有効) — Compression::Lz4Compression::Lz4Hc(_)バリアントを有効にします。これが有効な場合、WATCHを除くすべてのクエリに対してデフォルトでCompression::Lz4が使用されます。
  • native-tls — OpenSSLにリンクして、HTTPSスキーマのURLをサポートします。
  • rustls-tls — OpenSSLにリンクせず、hyper-rustlsを介してHTTPSスキーマのURLをサポートします。
  • inserterclient.inserter()を有効にします。
  • test-util — モックを追加します。を参照。開発依存関係でのみ使用してください。
  • watchclient.watch機能を有効にします。詳細は該当するセクションを参照してください。
  • uuiduuidクレートと連携するためにserde::uuidを追加します。
  • timetimeクレートと連携するためにserde::timeを追加します。
参考

HTTPS URL経由でClickHouseに接続する際は、native-tlsまたはrustls-tls機能のいずれかを有効にする必要があります。 両方が有効な場合は、rustls-tls機能が優先されます。

ClickHouseバージョンの互換性

このクライアントは、LTSまたはそれ以降のClickHouseバージョン、ならびにClickHouse Cloudに対応しています。

ClickHouseサーバーがv22.6未満の場合、RowBinaryを奇妙に処理します。この問題を解決するには、v0.11以上を使用し、wa-37420機能を有効にすることができます。注:この機能は新しいClickHouseバージョンでは使用しないでください。

クライアントの使用に関するさまざまなシナリオをカバーすることを目指して、クライアントリポジトリのに提供しています。概要は例のREADMEにあります。

例や次のドキュメントに不明点や不足があれば、お問い合わせください。

使用法

注記

ch2rsクレートは、ClickHouseから行型を生成するのに便利です。

クライアントインスタンスの作成

ヒント

作成したクライアントを再利用するか、それらをクローンして、基盤となるhyper接続プールを再利用してください。

use clickhouse::Client;

let client = Client::default()
    // プロトコルとポートの両方を含める必要があります
    .with_url("http://localhost:8123")
    .with_user("name")
    .with_password("123")
    .with_database("test");

HTTPSまたはClickHouse Cloud接続

HTTPSは、rustls-tlsまたはnative-tlsのCargo機能のいずれかで動作します。

その後、通常通りクライアントを作成します。この例では、環境変数を使用して接続の詳細を格納しています:

参考

URLには、プロトコルとポートの両方を含める必要があります。例:https://instance.clickhouse.cloud:8443

fn read_env_var(key: &str) -> String {
    env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}

let client = Client::default()
    .with_url(read_env_var("CLICKHOUSE_URL"))
    .with_user(read_env_var("CLICKHOUSE_USER"))
    .with_password(read_env_var("CLICKHOUSE_PASSWORD"));

他にも:

行を選択

use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
    no: u32,
    name: &'a str,
}

let table_name = "some";
let mut cursor = client
    .query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
    .bind(Identifier(table_name))
    .bind(500)
    .bind(504)
    .fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • プレースホルダ?fieldsno, nameRowのフィールド)に置き換えられます。
  • プレースホルダ?は次のbind()呼び出しで値に置き換えられます。
  • 最初の行またはすべての行を取得するために、便利なfetch_one::<Row>()およびfetch_all::<Row>()メソッドが使用できます。
  • sql::Identifierを使用してテーブル名をバインドできます。

注意:応答全体がストリーミングされるため、カーソルは数行を生成した後でもエラーを返す可能性があります。この場合、クエリをquery(...).with_option("wait_end_of_query", "1")を使用して、サーバー側での応答バッファリングを有効にしてください。詳細buffer_sizeオプションも便利です。

注意

行を選択する際はwait_end_of_queryを慎重に使用してください。サーバー側でのメモリ消費が増加し、全体的なパフォーマンスが低下する可能性があります。

行の挿入

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
    no: u32,
    name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • end()が呼び出されない場合、INSERTは中止されます。
  • 行はストリームとして段階的に送信され、ネットワーク負荷を分散します。
  • ClickHouseは、すべての行が同じパーティションに収まる場合にのみバッチを原子的に挿入します。また、その数はmax_insert_block_sizeより少なくなければなりません。

非同期挿入(サーバーサイドバッチ処理)

ClickHouseの非同期挿入を使用して、クライアントサイドのバッチ処理を回避できます。これは、insertメソッド(または、Clientインスタンス自体)にasync_insertオプションを提供することで行えます。

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("async_insert", "1")
    .with_option("wait_for_async_insert", "0");

他に:

Inserter機能(クライアントサイドバッチ処理)

inserter Cargo機能が必要です。

let mut inserter = client.inserter("some")?
    .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
    .with_max_bytes(50_000_000)
    .with_max_rows(750_000)
    .with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
    println!(
        "{}バイト、{}行、{}トランザクションが挿入されました",
        stats.bytes, stats.rows, stats.transactions,
    );
}

// アプリケーションのシャットダウン時にInserterを最終化し、残りの行をコミットするのを忘れないでください。
// `.end()`も統計を提供します。
inserter.end().await?;
  • Inserterは、任意の閾値(max_bytesmax_rowsperiod)に達した場合にcommit()でアクティブな挿入を終了します。
  • アクティブなINSERTを終了させる間隔は、with_period_biasを使用してバイアスをかけることができ、並列挿入による負荷のスパイクを回避します。
  • Inserter::time_left()を使用して、現在の期間が終了するタイミングを検出できます。アイテムが稀にしか発生しない場合は、Inserter::commit()を再度呼び出して制限を確認します。
  • 時間閾値は、inserterを高速化するためにquantaクレートを使用して実装されます。test-utilが有効な場合は使用されません(したがって、カスタムテストではtokio::time::advance()で時間を管理できます)。
  • commit()呼び出しの間のすべての行は、同じINSERTステートメントで挿入されます。
注意

挿入を終了/最終化したい場合は、フラッシュを忘れないでください:

inserter.end().await?;

DDLの実行

単一ノードのデプロイメントでは、次のようにしてDDLを実行するだけで済みます:

client.query("DROP TABLE IF EXISTS some").execute().await?;

しかし、ロードバランサーまたはClickHouse Cloudでのクラスターデプロイメントでは、すべてのレプリカにDDLが適用されるのを待つことをお勧めします。これにはwait_end_of_queryオプションを使用します。次のように行うことができます:

client
    .query("DROP TABLE IF EXISTS some")
    .with_option("wait_end_of_query", "1")
    .execute()
    .await?;

ClickHouse設定

with_optionメソッドを使用して、さまざまなClickHouse設定を適用できます。例えば:

let numbers = client
    .query("SELECT number FROM system.numbers")
    // この設定はこの特定のクエリにのみ適用されます。
    // グローバルクライアント設定を上書きします。
    .with_option("limit", "3")
    .fetch_all::<u64>()
    .await?;

queryに加え、insertおよびinserterメソッドでも同様に機能します。さらに、同じメソッドをClientインスタンスで呼び出して、すべてのクエリに対するグローバル設定を設定できます。

クエリID

.with_optionを使用して、ClickHouseのクエリログでクエリを識別するためにquery_idオプションを設定できます。

let numbers = client
    .query("SELECT number FROM system.numbers LIMIT 1")
    .with_option("query_id", "some-query-id")
    .fetch_all::<u64>()
    .await?;

queryに加え、insertおよびinserterメソッドでも同様に機能します。

危険

query_idを手動で設定する場合は、それがユニークであることを確認してください。UUIDが良い選択です。

他にも:query_idの例をクライアントリポジトリで確認してください。

セッションID

query_idと同様に、同じセッション内でステートメントを実行するためにsession_idを設定できます。session_idは、クライアントレベルでグローバルに設定するか、queryinsert、またはinserter呼び出しごとに設定することができます。

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("session_id", "my-session");
危険

クラスターデプロイメントでは、"sticky sessions"がないため、この機能を適切に利用するには、_特定のクラスターノード_に接続する必要があります。例えば、ラウンドロビンロードバランサーは、次のリクエストが同じClickHouseノードによって処理されることを保証しません。

他にも:session_idの例をクライアントリポジトリで確認してください。

カスタムHTTPヘッダー

プロキシ認証を使用している場合やカスタムヘッダーを渡す必要がある場合、次のように行うことができます。

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_header("X-My-Header", "hello");

他にも:カスタムHTTPヘッダーの例をクライアントリポジトリで確認してください。

カスタムHTTPクライアント

これは、基盤となるHTTP接続プールの設定を微調整するのに便利です。

use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;

let connector = HttpConnector::new(); // またはHttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
    // クライアント側で特定のアイドルソケットをどれくらい保持するか(ミリ秒)。
    // ClickHouseサーバーのKeepAliveタイムアウトよりかなり小さいことが想定されています。
    // これは、前の23.11バージョンではデフォルトで3秒、以降は10秒でした。
    .pool_idle_timeout(Duration::from_millis(2_500))
    // プール内で保持される最大アイドルKeep-Alive接続を設定します。
    .pool_max_idle_per_host(4)
    .build(connector);

let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
注意

この例は古いHyper APIに依存しており、今後変更される可能性があります。

他にも:カスタムHTTPクライアントの例をクライアントリポジトリで確認してください。

データ型

  • (U)Int(8|16|32|64|128)は、対応する(u|i)(8|16|32|64|128)型またはそれに基づく新しい型にマッピングされます。
  • (U)Int256は直接サポートされていませんが、回避策があります
  • Float(32|64)は、対応するf(32|64)型またはそれに基づく新しい型にマッピングされます。
  • Decimal(32|64|128)は、対応するi(32|64|128)型またはそれに基づく新しい型にマッピングされます。符号付き固定小数点数のfixnumや他の実装を使用する方が便利です。
  • Booleanは、bool型またはその周りの新しい型にマッピングされます。
  • Stringは、任意の文字列またはバイト型にマッピングされます。例えば、&str&[u8]StringVec<u8>、またはSmartString。新しい型もサポートされています。バイトを格納する場合は、より効率的なserde_bytesを使用することを検討してください。
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
    str: &'a str,
    string: String,
    #[serde(with = "serde_bytes")]
    bytes: Vec<u8>,
    #[serde(with = "serde_bytes")]
    byte_slice: &'a [u8],
}
  • FixedString(N)は、バイトの配列としてサポートされています。例えば、[u8; N]
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
    fixed_str: [u8; 16], // FixedString(16)
}
  • Enum(8|16)は、serde_reprを使用してサポートされます。
use serde_repr::{Deserialize_repr, Serialize_repr};

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    level: Level,
}

#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
    Debug = 1,
    Info = 2,
    Warn = 3,
    Error = 4,
}
  • UUIDは、uuid::Uuidにマッピングされ、serde::uuidを使用します。uuid機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::uuid")]
    uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4")]
    ipv4: std::net::Ipv4Addr,
}
  • Dateは、u16またはそれに基づく新しい型にマッピングされ、1970-01-01以来の経過日数を表します。また、serde::time::dateを使用して、time::Dateもサポートされています。これにはtime機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: u16,
    #[serde(with = "clickhouse::serde::time::date")]
    date: Date,
}
  • Date32は、i32またはそれに基づく新しい型にマッピングされ、1970-01-01以来の経過日数を表します。また、serde::time::date32を使用して、time::Dateもサポートされています。これにはtime機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: i32,
    #[serde(with = "clickhouse::serde::time::date32")]
    date: Date,
}
  • DateTimeは、u32またはそれに基づく新しい型にマッピングされ、UNIXエポックからの経過秒数を表します。また、serde::time::datetimeを使用して、time::OffsetDateTimeもサポートされています。これにはtime機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: u32,
    #[serde(with = "clickhouse::serde::time::datetime")]
    dt: OffsetDateTime,
}
  • DateTime64(_)は、i32またはそれに基づく新しい型にマッピングされ、UNIXエポックからの経過時間を表します。また、serde::time::datetime64::*を使用して、time::OffsetDateTimeもサポートされています。これにはtime機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: i64, // `DateTime64(X)`に応じて秒/µs/ms/nsの経過時間
    #[serde(with = "clickhouse::serde::time::datetime64::secs")]
    dt64s: OffsetDateTime,  // `DateTime64(0)`
    #[serde(with = "clickhouse::serde::time::datetime64::millis")]
    dt64ms: OffsetDateTime, // `DateTime64(3)`
    #[serde(with = "clickhouse::serde::time::datetime64::micros")]
    dt64us: OffsetDateTime, // `DateTime64(6)`
    #[serde(with = "clickhouse::serde::time::datetime64::nanos")]
    dt64ns: OffsetDateTime, // `DateTime64(9)`
}
  • Tuple(A, B, ...)(A, B, ...)またはそれに基づく新しい型にマッピングされます。
  • Array(_)は任意のスライスにマッピングされます。例えばVec<_>&[_]。新しい型もサポートされています。
  • Map(K, V)Array((K, V))のように動作します。
  • LowCardinality(_)はシームレスにサポートされます。
  • Nullable(_)Option<_>にマッピングされます。clickhouse::serde::*ヘルパーには::optionを追加します。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4::option")]
    ipv4_opt: Option<Ipv4Addr>,
}
  • Nestedは、リネーミングを伴う複数の配列を提供することでサポートされます。
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(rename = "items.name")]
    items_name: Vec<String>,
    #[serde(rename = "items.count")]
    items_count: Vec<u32>,
}
  • Geoタイプがサポートされています。Pointはタプル(f64, f64)のように振る舞い、他のタイプは単なるポイントのスライスです。
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    point: Point,
    ring: Ring,
    polygon: Polygon,
    multi_polygon: MultiPolygon,
    line_string: LineString,
    multi_line_string: MultiLineString,
}
  • VariantDynamic、(新しい) JSONデータ型はまだサポートされていません。

モック

このクレートは、CHサーバーをモックし、DDL、SELECTINSERTおよびWATCHクエリをテストするためのユーティリティを提供します。この機能はtest-util機能で有効にできます。開発依存関係としてのみ使用してください

を参照してください。

トラブルシューティング

CANNOT_READ_ALL_DATA

CANNOT_READ_ALL_DATAエラーの最も一般的な原因は、アプリケーション側の行定義がClickHouseのものと一致しないことです。

次のテーブルを考えてみてください:

CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp

その後、EventLogがアプリケーション側で不一致な型とともに定義されている場合、例えば:

#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: String, // <- 代わりにu32にする必要があります!
}

データを挿入する際に、次のようなエラーが発生する可能性があります:

Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")

この例では、EventLog構造体の定義を正しく修正することで解決されます:

#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: u32
}

既知の制限

  • VariantDynamic、(新しい) JSONデータ型はまだサポートされていません。
  • サーバーサイドのパラメータバインディングはまだサポートされていません。詳細はthis issueを参照してください。

お問い合わせ

質問や支援が必要な場合は、コミュニティSlackまたはGitHubのIssuesを通じて気軽にご連絡ください。