メインコンテンツへスキップ
メインコンテンツへスキップ

データベース/SQL API

標準APIの完全なコード例はこちらで確認できます。

接続設定については、設定を参照してください。 サポートされるデータ型とGoの型マッピングについては、データ型を参照してください。

database/sql、つまり「標準」APIを使うと、標準インターフェースに準拠することで、アプリケーションコードが基盤となるデータベースを意識しないようにする必要がある場面でも、このクライアントを使用できます。ただし、その代償として、抽象化や間接参照のための追加レイヤーや、必ずしもClickHouseに適していないプリミティブが入ることになります。とはいえ、ツールが複数のデータベースに接続する必要がある場面では、こうしたコストは通常許容範囲です。

さらに、このクライアントはトランスポート層としてHTTPの使用もサポートしており、最適なパフォーマンスを得るために、データは引き続きネイティブ形式でエンコードされます。

接続

接続には、clickhouse://<host>:<port>?<query_option>=<value> 形式のDSN文字列と Open メソッドを使う方法、または clickhouse.OpenDB メソッドを使う方法があります。後者は database/sql 仕様の一部ではありませんが、sql.DB インスタンスを返します。このメソッドでは、プロファイリングなど、database/sql 仕様経由では明確に公開できない機能を利用できます。

func Connect() error {
        env, err := GetStdTestEnvironment()
        if err != nil {
                return err
        }
        conn := clickhouse.OpenDB(&clickhouse.Options{
                Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
                Auth: clickhouse.Auth{
                        Database: env.Database,
                        Username: env.Username,
                        Password: env.Password,
                },
        })
        return conn.Ping()
}

func ConnectDSN() error {
        env, err := GetStdTestEnvironment()
        if err != nil {
                return err
        }
        conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://%s:%d?username=%s&password=%s", env.Host, env.Port, env.Username, env.Password))
        if err != nil {
                return err
        }
        return conn.Ping()
}

完全な例

以降のすべての例では、明示的に示す場合を除き、ClickHouse の conn 変数は作成済みで使用可能であるものとします。

接続設定

ほとんどの設定オプションは ClickHouse API と共通です。共通の設定については 設定 を参照してください。以下の SQL 固有の DSN パラメータを利用できます。

  • hosts - ロードバランシングおよびフェイルオーバーのための、単一アドレス host のカンマ区切りリスト - Connecting to Multiple Nodes を参照してください。
  • username/password - 認証情報 - Authentication を参照してください
  • database - 現在のデフォルトデータベースを選択します
  • dial_timeout - 期間を表す文字列です。符号付きの場合もある 10 進数の並びで、各数値には省略可能な小数部と、300ms1s のような単位サフィックスを付けられます。有効な時間単位は mssm です。
  • connection_open_strategy - random/in_order (デフォルトは random) - Connecting to Multiple Nodes を参照してください
    • round_robin - 一連のサーバーからラウンドロビン方式でサーバーを選択します
    • in_order - 指定された順序で最初に利用可能なサーバーを選択します
  • debug - デバッグ出力を有効にします (真偽値)
  • compress - 圧縮アルゴリズムを指定します - none (デフォルト) 、zstdlz4gzipdeflatebrtrue に設定した場合は lz4 が使われます。native 通信でサポートされるのは lz4zstd のみです。
  • compress_level - 圧縮レベル (デフォルトは 0) です。Compression を参照してください。これはアルゴリズムごとに異なります。
    • gzip - -2 (最高速度) から 9 (最高圧縮率)
    • deflate - -2 (最高速度) から 9 (最高圧縮率)
    • br - 0 (最高速度) から 11 (最高圧縮率)
    • zstdlz4 - 無視されます
  • secure - セキュアな SSL 接続を確立します (デフォルトは false)
  • skip_verify - 証明書の検証をスキップします (デフォルトは false)
  • block_buffer_size - block バッファサイズを制御できます。BlockBufferSize を参照してください。 (デフォルトは 2)
func ConnectSettings() error {
        env, err := GetStdTestEnvironment()
        if err != nil {
                return err
        }
        conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://127.0.0.1:9001,127.0.0.1:9002,%s:%d/%s?username=%s&password=%s&dial_timeout=10s&connection_open_strategy=round_robin&debug=true&compress=lz4", env.Host, env.Port, env.Database, env.Username, env.Password))
        if err != nil {
                return err
        }
        return conn.Ping()
}

完全な例

HTTP で接続する

デフォルトでは、接続はネイティブプロトコルで確立されます。HTTP が必要なユーザーは、DSN を変更して HTTP プロトコルを含めるか、接続オプションで Protocol を指定することで有効化できます。

func ConnectHTTP() error {
        env, err := GetStdTestEnvironment()
        if err != nil {
                return err
        }
        conn := clickhouse.OpenDB(&clickhouse.Options{
                Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
                Auth: clickhouse.Auth{
                        Database: env.Database,
                        Username: env.Username,
                        Password: env.Password,
                },
                Protocol: clickhouse.HTTP,
        })
        return conn.Ping()
}

func ConnectDSNHTTP() error {
        env, err := GetStdTestEnvironment()
        if err != nil {
                return err
        }
        conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password))
        if err != nil {
                return err
        }
        return conn.Ping()
}

完全な例

セッション

HTTP のみ

セッションが必要なのは、HTTP トランスポートを使用する場合のみです。ネイティブ TCP 接続では、セッションが自動的に組み込まれています。

HTTP を使用する場合は、session_id を設定として渡し、一時テーブルなどのセッションに紐づく機能を有効にします。

conn := clickhouse.OpenDB(&clickhouse.Options{
    Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
    Auth: clickhouse.Auth{
        Database: env.Database,
        Username: env.Username,
        Password: env.Password,
    },
    Protocol: clickhouse.HTTP,
    Settings: clickhouse.Settings{
        "session_id": uuid.NewString(),
    },
})
if _, err := conn.Exec(`DROP TABLE IF EXISTS example`); err != nil {
    return err
}
_, err = conn.Exec(`
    CREATE TEMPORARY TABLE IF NOT EXISTS example (
            Col1 UInt8
    )
`)
if err != nil {
    return err
}
scope, err := conn.Begin()
if err != nil {
    return err
}
batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
    return err
}
for i := 0; i < 10; i++ {
    _, err := batch.Exec(
        uint8(i),
    )
    if err != nil {
        return err
    }
}
rows, err := conn.Query("SELECT * FROM example")
if err != nil {
    return err
}
defer rows.Close()

var (
    col1 uint8
)
for rows.Next() {
    if err := rows.Scan(&col1); err != nil {
        return err
    }
    fmt.Printf("row: col1=%d\n", col1)
}

// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
    return err
}

完全な例

実行

接続を確立したら、Exec メソッドを使用して sql 文を実行できます。

conn.Exec(`DROP TABLE IF EXISTS example`)
_, err = conn.Exec(`
    CREATE TABLE IF NOT EXISTS example (
        Col1 UInt8,
        Col2 String
    ) engine=Memory
`)
if err != nil {
    return err
}
_, err = conn.Exec("INSERT INTO example VALUES (1, 'test-1')")

完全な例

このメソッドはContextの受け取りをサポートしていません。デフォルトでは、バックグラウンドContextで実行されます。必要な場合はExecContextを使用してください。詳しくはContextの使用を参照してください。

バッチ insert

Being メソッドで sql.Tx を作成することで、バッチ処理を実現できます。ここから、INSERT 文 を指定して Prepare メソッドを使用すると、batch を取得できます。これにより sql.Stmt が返され、Exec メソッドを使ってそこに行を付加できます。batch は、元の sql.Tx に対して Commit が executed されるまでメモリ内に蓄積されます。

batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
    return err
}
for i := 0; i < 1000; i++ {
    _, err := batch.Exec(
        uint8(42),
        "ClickHouse", "Inc",
        uuid.New(),
        map[string]uint8{"key": 1},             // Map(String, UInt8)
        []string{"Q", "W", "E", "R", "T", "Y"}, // Array(String)
        []interface{}{ // Tuple(String, UInt8, Array(Map(String, String)))
            "String Value", uint8(5), []map[string]string{
                map[string]string{"key": "value"},
                map[string]string{"key": "value"},
                map[string]string{"key": "value"},
            },
        },
        time.Now(),
    )
    if err != nil {
        return err
    }
}
return scope.Commit()

完全な例

行のクエリ

単一の行をクエリするには、QueryRow メソッドを使用します。これは *sql.Row を返し、このオブジェクトに対して Scan を呼び出す際に、各カラムの値を格納する変数へのポインタを渡します。QueryRowContext バリアントでは、background 以外の Context を渡すことができます。Context の使用 を参照してください。

row := conn.QueryRow("SELECT * FROM example")
var (
    col1             uint8
    col2, col3, col4 string
    col5             map[string]uint8
    col6             []string
    col7             interface{}
    col8             time.Time
)
if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
    return err
}

完全な例

複数の行を反復処理するには、Query メソッドを使います。これは *sql.Rows 構造体を返し、Next を呼び出して各行を順に処理できます。対応する QueryContext では、Context を渡すことができます。

rows, err := conn.Query("SELECT * FROM example")
if err != nil {
    return err
}
defer rows.Close()

var (
    col1             uint8
    col2, col3, col4 string
    col5             map[string]uint8
    col6             []string
    col7             interface{}
    col8             time.Time
)
for rows.Next() {
    if err := rows.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
        return err
    }
    fmt.Printf("row: col1=%d, col2=%s, col3=%s, col4=%s, col5=%v, col6=%v, col7=%v, col8=%v\n", col1, col2, col3, col4, col5, col6, col7, col8)
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
    return err
}

完全な例

Async insert

非同期 insert は、ExecContext メソッドで insert を実行することで実現できます。以下に示すように、非同期モードを有効にした Context を渡す必要があります。これにより、クライアントがサーバーによる insert の完了を待機するか、それともデータが受信された時点で応答するかをユーザーが指定できます。これは実質的に、パラメータ wait_for_async_insert を制御します。

const ddl = `
    CREATE TABLE example (
            Col1 UInt64
        , Col2 String
        , Col3 Array(UInt8)
        , Col4 DateTime
    ) ENGINE = Memory
    `
if _, err := conn.Exec(ddl); err != nil {
    return err
}
ctx := clickhouse.Context(context.Background(), clickhouse.WithStdAsync(false))
{
    for i := 0; i < 100; i++ {
        _, err := conn.ExecContext(ctx, fmt.Sprintf(`INSERT INTO example VALUES (
            %d, '%s', [1, 2, 3, 4, 5, 6, 7, 8, 9], now()
        )`, i, "Golang SQL database driver"))
        if err != nil {
            return err
        }
    }
}

完全な例

パラメータバインディング

標準APIは、ClickHouse APIと同じパラメータバインディング機能をサポートしており、ExecQueryQueryRow メソッド (および対応する Context 版) にパラメータを渡せます。位置指定、名前付き、番号付きのパラメータをサポートしています。

var count uint64
// positional bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 >= ? AND Col3 < ?", 500, now.Add(time.Duration(750)*time.Second)).Scan(&count); err != nil {
    return err
}
// 250
fmt.Printf("Positional bind count: %d\n", count)
// numeric bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= $2 AND Col3 > $1", now.Add(time.Duration(150)*time.Second), 250).Scan(&count); err != nil {
    return err
}
// 100
fmt.Printf("Numeric bind count: %d\n", count)
// named bind
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= @col1 AND Col3 > @col3", clickhouse.Named("col1", 100), clickhouse.Named("col3", now.Add(time.Duration(50)*time.Second))).Scan(&count); err != nil {
    return err
}
// 50
fmt.Printf("Named bind count: %d\n", count)

完全な例

なお、特別なケースに関する注意事項は、引き続き適用されます。

Context の使用

標準 API では、ClickHouse API と同様に、Context を介して期限、キャンセルシグナル、そのほかのリクエストスコープの値を渡すことができます。ClickHouse API と異なり、これはメソッドの Context バリアントを使って実現します。つまり、デフォルトではバックグラウンド Context を使用する Exec のようなメソッドには、先頭のパラメータとして Context を渡せる ExecContext というバリアントがあります。これにより、アプリケーションフローの任意のステージで Context を渡せます。たとえば、ConnContext を使って接続を確立する際や、QueryRowContext を使ってクエリ結果の行を取得する際に Context を渡せます。使用可能なすべてのメソッドの例を以下に示します。

Context を使って期限、キャンセルシグナル、クエリ ID、QUOTA キー、接続設定を渡す方法の詳細については、ClickHouse API の Context の使用 を参照してください。

ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
    "async_insert": "1",
}))

// queries can be cancelled using the context
ctx, cancel := context.WithCancel(context.Background())
go func() {
    cancel()
}()
if err = conn.QueryRowContext(ctx, "SELECT sleep(3)").Scan(); err == nil {
    return fmt.Errorf("expected cancel")
}

// set a deadline for a query - this will cancel the query after the absolute time is reached. Again terminates the connection only,
// queries will continue to completion in ClickHouse
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
if err := conn.PingContext(ctx); err == nil {
    return fmt.Errorf("expected deadline exceeeded")
}

// set a query id to assist tracing queries in logs e.g. see system.query_log
var one uint8
ctx = clickhouse.Context(context.Background(), clickhouse.WithQueryID(uuid.NewString()))
if err = conn.QueryRowContext(ctx, "SELECT 1").Scan(&one); err != nil {
    return err
}

conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
defer func() {
    conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
}()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde"))
// set a quota key - first create the quota
if _, err = conn.ExecContext(ctx, "CREATE QUOTA IF NOT EXISTS foobar KEYED BY client_key FOR INTERVAL 1 minute MAX queries = 5 TO default"); err != nil {
    return err
}

// queries can be cancelled using the context
ctx, cancel = context.WithCancel(context.Background())
// we will get some results before cancel
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
    "max_block_size": "1",
}))
rows, err := conn.QueryContext(ctx, "SELECT sleepEachRow(1), number FROM numbers(100);")
if err != nil {
    return err
}
defer rows.Close()

var (
    col1 uint8
    col2 uint8
)

for rows.Next() {
    if err := rows.Scan(&col1, &col2); err != nil {
        if col2 > 3 {
            fmt.Println("expected cancel")
            return nil
        }
        return err
    }
    fmt.Printf("row: col2=%d\n", col2)
    if col2 == 3 {
        cancel()
    }
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
    return err
}

完全な例

動的スキャン

ClickHouse API と同様に、カラム型の情報を利用して、Scan に渡せる適切な型の変数インスタンスを実行時に作成できます。これにより、型が不明なカラムでも読み取ることができます。

const query = `
SELECT
        1     AS Col1
    , 'Text' AS Col2
`
rows, err := conn.QueryContext(context.Background(), query)
if err != nil {
    return err
}
defer rows.Close()

columnTypes, err := rows.ColumnTypes()
if err != nil {
    return err
}
vars := make([]interface{}, len(columnTypes))
for i := range columnTypes {
    vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
for rows.Next() {
    if err := rows.Scan(vars...); err != nil {
        return err
    }
    for _, v := range vars {
        switch v := v.(type) {
        case *string:
            fmt.Println(*v)
        case *uint8:
            fmt.Println(*v)
        }
    }
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
    return err
}

完全な例

外部テーブル

外部テーブルを使うと、クライアントはSELECTクエリとともに ClickHouse にデータを送信できます。このデータは一時テーブルに格納され、評価のためにクエリ内で使用できます。

クエリとともにクライアントから外部データを送信するには、ユーザーは ext.NewTable を使って外部テーブルを作成し、これを Context 経由で渡す前に構築する必要があります。

table1, err := ext.NewTable("external_table_1",
    ext.Column("col1", "UInt8"),
    ext.Column("col2", "String"),
    ext.Column("col3", "DateTime"),
)
if err != nil {
    return err
}

for i := 0; i < 10; i++ {
    if err = table1.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now()); err != nil {
        return err
    }
}

table2, err := ext.NewTable("external_table_2",
    ext.Column("col1", "UInt8"),
    ext.Column("col2", "String"),
    ext.Column("col3", "DateTime"),
)

for i := 0; i < 10; i++ {
    table2.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now())
}
ctx := clickhouse.Context(context.Background(),
    clickhouse.WithExternalTable(table1, table2),
)
rows, err := conn.QueryContext(ctx, "SELECT * FROM external_table_1")
if err != nil {
    return err
}
defer rows.Close()

for rows.Next() {
    var (
        col1 uint8
        col2 string
        col3 time.Time
    )
    rows.Scan(&col1, &col2, &col3)
    fmt.Printf("col1=%d, col2=%s, col3=%v\n", col1, col2, col3)
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
    return err
}

var count uint64
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_1").Scan(&count); err != nil {
    return err
}
fmt.Printf("external_table_1: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_2").Scan(&count); err != nil {
    return err
}
fmt.Printf("external_table_2: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM (SELECT * FROM external_table_1 UNION ALL SELECT * FROM external_table_2)").Scan(&count); err != nil {
    return err
}
fmt.Printf("external_table_1 UNION external_table_2: %d\n", count)

完全な例

OpenTelemetry

ClickHouse は、TCP と HTTP の両方のトランスポートで トレースコンテキスト伝播 をサポートしています。clickhouse.WithSpan を使うと、Context 経由でスパンをクエリに関連付けることができます。

HTTP トランスポートの制限

ClickHouse サーバーは標準の traceparent / tracestate HTTP ヘッダーを受け入れますが、clickhouse-go の HTTP トランスポートは現時点ではそれらを送信しないため、HTTP 経由では WithSpan は効果がありません。回避策として、接続オプションの HttpHeaders を使ってヘッダーを手動で設定できます。

var count uint64
rows := conn.QueryRowContext(clickhouse.Context(context.Background(), clickhouse.WithSpan(
    trace.NewSpanContext(trace.SpanContextConfig{
        SpanID:  trace.SpanID{1, 2, 3, 4, 5},
        TraceID: trace.TraceID{5, 4, 3, 2, 1},
    }),
)), "SELECT COUNT() FROM (SELECT number FROM system.numbers LIMIT 5)")
if err := rows.Scan(&count); err != nil {
    return err
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
    return err
}
fmt.Printf("count: %d\n", count)

完全な例

圧縮

標準 API は、ネイティブの ClickHouse API と同じ圧縮アルゴリズム、つまりブロックレベルの lz4 および zstd 圧縮をサポートします。これに加えて、HTTP 接続では gzipdeflatebr による圧縮もサポートされます。これらのいずれかが有効な場合、挿入時およびクエリ応答ではブロック単位で圧縮が行われます。一方、ping やクエリリクエストなどのその他のリクエストは圧縮されません。これは lz4 および zstd のオプションと同様です。

接続の確立に OpenDB メソッドを使う場合は、Compression 設定を渡すことができます。これには、圧縮レベルを指定する機能も含まれます (以下を参照) 。DSN を使用して sql.Open 経由で接続する場合は、compress パラメータを使います。指定できる値は、gzipdeflatebrzstdlz4 などの特定の圧縮アルゴリズム、またはブールフラグです。true に設定する と、lz4 が使用されます。デフォルト は none、つまり圧縮は無効です。

conn := clickhouse.OpenDB(&clickhouse.Options{
    Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
    Auth: clickhouse.Auth{
        Database: env.Database,
        Username: env.Username,
        Password: env.Password,
    },
    Compression: &clickhouse.Compression{
        Method: clickhouse.CompressionBrotli,
        Level:  5,
    },
    Protocol: clickhouse.HTTP,
})

完全な例

conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s&compress=gzip&compress_level=5", env.Host, env.HttpPort, env.Username, env.Password))

完全な例

適用する圧縮レベルは、DSN パラメータ compress&#95;level または Compression オプションの Level フィールドで制御できます。デフォルトは 0 ですが、アルゴリズムによって異なります。

  • gzip - -2 (最高速度) ~ 9 (最高圧縮率)
  • deflate - -2 (最高速度) ~ 9 (最高圧縮率)
  • br - 0 (最高速度) ~ 11 (最高圧縮率)
  • zstd, lz4 - 無視されます