メインコンテンツまでスキップ
メインコンテンツまでスキップ

ClickHouse Rustクライアント

ClickHouseに接続するための公式のRustクライアントで、元々はPaul Loydによって開発されました。クライアントのソースコードはGitHubリポジトリで入手可能です。

概要

  • 行のエンコード/デコードにはserdeを使用しています。
  • serde属性をサポートしています:skip_serializingskip_deserializingrename
  • HTTPトランスポートではRowBinaryフォーマットを使用します。
    • TCP経由でNativeに切り替える計画があります。
  • TLSをサポートしています(native-tlsおよびrustls-tls機能を通じて)。
  • 圧縮と解凍をサポートしています(LZ4)。
  • データの選択や挿入、DDLの実行、およびクライアント側のバッチ処理のためのAPIを提供しています。
  • ユニットテスト用の便利なモックを提供しています。

インストール

クレートを使用するには、Cargo.tomlに以下を追加してください:

[dependencies]
clickhouse = "0.12.2"

[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }

詳細は、crates.ioページもご覧ください。

Cargo機能

  • lz4(デフォルトで有効)— Compression::Lz4およびCompression::Lz4Hc(_)バリアントを有効にします。有効にされた場合、デフォルトで全てのクエリにCompression::Lz4が使用されますが、WATCHを除きます。
  • native-tlshyper-tlsを通じてHTTPSスキーマのURLをサポートし、OpenSSLにリンクします。
  • rustls-tlshyper-rustlsを通じてHTTPSスキーマのURLをサポートし、OpenSSLにリンクしません。
  • inserterclient.inserter()を有効にします。
  • test-util — モックを追加します。詳細はこちらの例をご覧ください。これはdev-dependenciesでのみ使用してください。
  • watchclient.watch機能を有効にします。詳細は対応するセクションを参照してください。
  • uuiduuidクレートで作業するためにserde::uuidを追加します。
  • timetimeクレートで作業するためにserde::timeを追加します。
参考

ClickHouseにHTTPS URLを介して接続する場合、native-tlsまたはrustls-tlsのいずれかの機能を有効にする必要があります。両方が有効にされている場合は、rustls-tls機能が優先されます。

ClickHouseバージョンの互換性

このクライアントはLTSまたはそれ以降のバージョンのClickHouseと、ClickHouse Cloudに対応しています。

v22.6より古いClickHouseサーバーは、行バイナリを一部の稀なケースで不正確に処理します。 v0.11以上を使用し、wa-37420機能を有効にすることでこの問題を解決できます。ただし注意:この機能は新しいClickHouseバージョンでは使用しないでください。

クライアントのさまざまな使用シナリオをカバーすることを目指しており、はクライアントリポジトリでご覧いただけます。概要は例のREADMEで入手可能です。

例や以下の文書に不明な点や欠落がある場合は、お問い合わせください。

使用法

注記

ch2rsクレートは、ClickHouseから行タイプを生成するのに役立ちます。

クライアントインスタンスの作成

ヒント

作成したクライアントを再利用するか、クローンして基盤となるhyper接続プールを再利用してください。

use clickhouse::Client;

let client = Client::default()
    // should include both protocol and port
    .with_url("http://localhost:8123")
    .with_user("name")
    .with_password("123")
    .with_database("test");

HTTPSまたはClickHouse Cloud接続

HTTPSはrustls-tlsまたはnative-tlsのCargo機能のいずれかで動作します。

通常通りにクライアントを作成してください。この例では、環境変数を使用して接続詳細を保存しています:

参考

URLにはプロトコルとポートの両方を含める必要があります。例:https://instance.clickhouse.cloud:8443

fn read_env_var(key: &str) -> String {
    env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}

let client = Client::default()
    .with_url(read_env_var("CLICKHOUSE_URL"))
    .with_user(read_env_var("CLICKHOUSE_USER"))
    .with_password(read_env_var("CLICKHOUSE_PASSWORD"));

詳細は次の通りです:

  • クライアントリポジトリのClickHouse Cloudの例は、オンプレミスのHTTPS接続にも適用可能です。

行の選択

use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
    no: u32,
    name: &'a str,
}

let table_name = "some";
let mut cursor = client
    .query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
    .bind(Identifier(table_name))
    .bind(500)
    .bind(504)
    .fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • プレースホルダー?fieldsno, nameRowのフィールド)に置き換えられます。
  • プレースホルダー?は以下のbind()呼び出しの値に置き換えられます。
  • 最初の行またはすべての行を取得するために便利なfetch_one::<Row>()およびfetch_all::<Row>()メソッドが使用できます。
  • テーブル名をバインドするためにsql::Identifierを使用できます。

注意:レスポンス全体がストリームされるため、カーソルは行を生成した後でもエラーを返すことがあります。この場合、query(...).with_option("wait_end_of_query", "1")を試して、サーバー側でのレスポンスバッファリングを有効にしてください。詳細もご覧ください。buffer_sizeオプションも便利です。

注意

行を選択する際にwait_end_of_queryを使用する際は注意してください。サーバー側でのメモリ消費が増加し、全体のパフォーマンスが低下する可能性があります。

行の挿入

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
    no: u32,
    name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • end()が呼び出されない場合、INSERTは中止されます。
  • 行はストリームとして徐々に送信され、ネットワーク負荷を分散します。
  • ClickHouseは、すべての行が同じパーティションに収まる場合にのみバッチを原子的に挿入します。この際、行の数はmax_insert_block_sizeよりも少ない必要があります。

非同期挿入(サーバー側バッチ処理)

ClickHouseの非同期挿入を使用して、クライアント側のデータバッチ処理を回避できます。これは、単にinsertメソッド(またはクライアントインスタンス自体に)にasync_insertオプションを提供することで行えます。

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("async_insert", "1")
    .with_option("wait_for_async_insert", "0");

詳細は次の通りです:

Inserter機能(クライアント側バッチ処理)

inserterのCargo機能が必要です。

let mut inserter = client.inserter("some")?
    .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
    .with_max_bytes(50_000_000)
    .with_max_rows(750_000)
    .with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
    println!(
        "{} bytes, {} rows, {} transactions have been inserted",
        stats.bytes, stats.rows, stats.transactions,
    );
}

// don't forget to finalize the inserter during the application shutdown
// and commit the remaining rows. `.end()` will provide stats as well.
inserter.end().await?;
  • Inserterは、いずれかのしきい値(max_bytesmax_rowsperiod)に達した場合、commit()内でアクティブな挿入を終了します。
  • アクティブなINSERTを終了する間隔は、並行する挿入者による負荷のスパイクを避けるためにwith_period_biasを使用して調整できます。
  • Inserter::time_left()は、現在の期間が終了する時を検出するために使用できます。ストリームがアイテムをまれに発生する場合は、制限を再確認するためにInserter::commit()を再度呼び出してください。
  • 時間のしきい値は、quantaクレートを使用してinserterを高速化します。test-utilが有効な場合は使用されません(したがって、カスタムテストでtokio::time::advance()による時間管理が可能です)。
  • commit()コール間のすべての行は、同じINSERT文で挿入されます。
注意

挿入を終了/確定する場合は、フラッシュを忘れないでください:

inserter.end().await?;

DDLの実行

単一ノードの展開では、次のようにDDLを実行するだけで十分です。

client.query("DROP TABLE IF EXISTS some").execute().await?;

ただし、負荷分散装置またはClickHouse Cloudを使用したクラスター展開では、wait_end_of_queryオプションを使用して、すべてのレプリカでDDLが適用されるのを待つことをお勧めします。これは次のように行うことができます。

client
    .query("DROP TABLE IF EXISTS some")
    .with_option("wait_end_of_query", "1")
    .execute()
    .await?;

ClickHouse設定

with_optionメソッドを使用して、さまざまなClickHouse設定を適用できます。例として:

let numbers = client
    .query("SELECT number FROM system.numbers")
    // This setting will be applied to this particular query only;
    // it will override the global client setting.
    .with_option("limit", "3")
    .fetch_all::<u64>()
    .await?;

queryに加えて、insertおよびinserterメソッドでも同様に機能します。また、クライアントインスタンス上で同じメソッドを呼び出すことで、すべてのクエリに対してグローバル設定を設定できます。

クエリID

.with_optionを使用することで、クエリをClickHouseのクエリログで識別するためのquery_idオプションを設定できます。

let numbers = client
    .query("SELECT number FROM system.numbers LIMIT 1")
    .with_option("query_id", "some-query-id")
    .fetch_all::<u64>()
    .await?;

queryに加えて、insertおよびinserterメソッドでも同様に機能します。

危険

query_idを手動で設定する場合は、一意であることを確認してください。UUIDは良い選択です。

詳細は、クライアントリポジトリのquery_id例をご覧ください。

セッションID

query_idと同様に、session_idを設定して同一のセッション内でステートメントを実行できます。session_idは、クライアントレベルでグローバルに設定することも、queryinsert、またはinserter呼び出しごとに設定することもできます。

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_option("session_id", "my-session");
危険

クラスター展開の場合、「スティッキーセッション」がないため、特定のクラスターノードに接続してこの機能を適切に利用する必要があります。たとえば、ラウンドロビン負荷分散装置は、後続のリクエストが同じClickHouseノードで処理されることを保証しません。

詳細は、クライアントリポジトリのsession_id例をご覧ください。

カスタムHTTPヘッダー

プロキシ認証を使用している場合やカスタムヘッダーを渡す必要がある場合は、次のようにできます。

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_header("X-My-Header", "hello");

詳細は、クライアントリポジトリのカスタムHTTPヘッダーの例をご覧ください。

カスタムHTTPクライアント

これは、基盤となるHTTP接続プール設定を微調整するのに役立ちます。

use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;

let connector = HttpConnector::new(); // or HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
    // For how long keep a particular idle socket alive on the client side (in milliseconds).
    // It is supposed to be a fair bit less that the ClickHouse server KeepAlive timeout,
    // which was by default 3 seconds for pre-23.11 versions, and 10 seconds after that.
    .pool_idle_timeout(Duration::from_millis(2_500))
    // Sets the maximum idle Keep-Alive connections allowed in the pool.
    .pool_max_idle_per_host(4)
    .build(connector);

let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
注意

この例はレガシーHyper APIに依存しており、将来的に変更される可能性があります。

詳細は、クライアントリポジトリのカスタムHTTPクライアントの例をご覧ください。

データ型

  • (U)Int(8|16|32|64|128)は、対応する(u|i)(8|16|32|64|128)型またはそれを囲む新しい型にマッピングされます。
  • (U)Int256は直接サポートされていませんが、回避策があります。
  • Float(32|64)は、対応するf(32|64)型またはそれを囲む新しい型にマッピングされます。
  • Decimal(32|64|128)は、対応するi(32|64|128)型またはそれを囲む新しい型にマッピングされます。符号付き小数点数の実装には、fixnumを使用するのが便利です。
  • Booleanは、boolまたはそれを囲む新しい型にマッピングされます。
  • Stringは、&str&[u8]StringVec<u8>、またはSmartStringなどの任意の文字列またはバイト型にマッピングされます。新しい型もサポートされています。バイトを格納するためには、serde_bytesを使用することを検討してください。こちらの方が効率的です。
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
    str: &'a str,
    string: String,
    #[serde(with = "serde_bytes")]
    bytes: Vec<u8>,
    #[serde(with = "serde_bytes")]
    byte_slice: &'a [u8],
}
  • FixedString(N)は、バイトの配列としてサポートされています。例:[u8; N]
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
    fixed_str: [u8; 16], // FixedString(16)
}
  • Enum(8|16)は、serde_reprを使用してサポートされています。
use serde_repr::{Deserialize_repr, Serialize_repr};

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    level: Level,
}

#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
    Debug = 1,
    Info = 2,
    Warn = 3,
    Error = 4,
}
  • UUIDは、serde::uuidを使用してuuid::Uuidにマッピングされます。uuid機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::uuid")]
    uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4")]
    ipv4: std::net::Ipv4Addr,
}
  • Dateは、u16またはそれを囲む新しい型にマッピングされ、1970-01-01以降の日数を表します。また、time::Dateは、serde::time::dateを使用してサポートされており、time機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: u16,
    #[serde(with = "clickhouse::serde::time::date")]
    date: Date,
}
  • Date32は、i32またはそれを囲む新しい型にマッピングされ、1970-01-01以降の日数を表します。また、time::Dateは、serde::time::date32を使用してサポートされており、time機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    days: i32,
    #[serde(with = "clickhouse::serde::time::date32")]
    date: Date,
}
  • DateTimeは、u32またはそれを囲む新しい型にマッピングされ、UNIXエポック以降の経過秒数を表します。また、time::OffsetDateTimeは、serde::time::datetimeを使用してサポートされており、time機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: u32,
    #[serde(with = "clickhouse::serde::time::datetime")]
    dt: OffsetDateTime,
}
  • DateTime64(_)は、i32またはそれを囲む新しい型にマッピングされ、UNIXエポック以降の経過時間を表します。また、time::OffsetDateTimeは、serde::time::datetime64::*を使用してサポートされており、time機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)`
    #[serde(with = "clickhouse::serde::time::datetime64::secs")]
    dt64s: OffsetDateTime,  // `DateTime64(0)`
    #[serde(with = "clickhouse::serde::time::datetime64::millis")]
    dt64ms: OffsetDateTime, // `DateTime64(3)`
    #[serde(with = "clickhouse::serde::time::datetime64::micros")]
    dt64us: OffsetDateTime, // `DateTime64(6)`
    #[serde(with = "clickhouse::serde::time::datetime64::nanos")]
    dt64ns: OffsetDateTime, // `DateTime64(9)`
}
  • Tuple(A, B, ...)は、(A, B, ...)またはそれを囲む新しい型にマッピングされます。
  • Array(_)は、任意のスライス(例:Vec<_>&[_])にマッピングされます。新しい型もサポートされています。
  • Map(K, V)Array((K, V))のように振る舞います。
  • LowCardinality(_)はシームレスにサポートされています。
  • Nullable(_)は、Option<_>にマッピングされます。clickhouse::serde::*ヘルパーには::optionを追加してください。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(with = "clickhouse::serde::ipv4::option")]
    ipv4_opt: Option<Ipv4Addr>,
}
  • Nestedは、名前の付け替えを使って複数の配列を提供することでサポートされています。
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    #[serde(rename = "items.name")]
    items_name: Vec<String>,
    #[serde(rename = "items.count")]
    items_count: Vec<u32>,
}
  • Geo型がサポートされています。Pointはタプル(f64, f64)のように振る舞い、他の型はポイントのスライスです。
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
    point: Point,
    ring: Ring,
    polygon: Polygon,
    multi_polygon: MultiPolygon,
    line_string: LineString,
    multi_line_string: MultiLineString,
}
  • VariantDynamic、(新しい)JSONデータ型はまだサポートされていません。

モッキング

このクレートは、CHサーバーのモックを作成し、DDL、SELECTINSERT、およびWATCHクエリをテストするためのユーティリティを提供します。この機能は、test-util機能を使用して有効化できます。これは開発依存性としてのみ使用してください。

をご覧ください。

トラブルシューティング

CANNOT_READ_ALL_DATA

CANNOT_READ_ALL_DATAエラーの最も一般的な原因は、アプリケーション側の行定義がClickHouseの定義と一致しないことです。

以下のテーブルを考慮してください:

CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp

次に、アプリケーション側でEventLogが不一致の型で定義されていると、例えば以下のようになります:

#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: String, // <- should be u32 instead!
}

データを挿入すると、次のエラーが発生することがあります:

Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")

この例では、EventLog構造体の正しい定義によって解決されます:

#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
    id: u32
}

既知の制限

  • VariantDynamic、(新しい)JSONデータ型はまだサポートされていません。
  • サーバー側のパラメータバインディングはまだサポートされていません。これに関してはこの問題で進捗を追跡できます。

お問い合わせ

質問や支援が必要な場合は、Community SlackまたはGitHub issuesを通じてお気軽にお問い合わせください。