メインコンテンツへスキップ
メインコンテンツへスキップ

Arrow Flight インターフェイス

概要

ClickHouse は、gRPC 上で Arrow IPC フォーマットを使用し、効率的な列指向データ転送を実現する高性能 RPC フレームワークである Apache Arrow Flight プロトコルをサポートしています。

この実装には Arrow Flight SQL のサポートも含まれており、Flight SQL プロトコルに対応した BI ツールやアプリケーションから ClickHouse に直接クエリを実行できます。

主な機能:

  • SQL クエリを実行し、結果を Apache Arrow フォーマットで取得できます。
  • Arrow フォーマットを使用してテーブルにデータを挿入できます。
  • Flight SQL コマンドを使用してメタデータ (カタログ、スキーマ、テーブル、主キー) をクエリできます。
  • Flight SQL を介してサーバー側のプリペアドステートメントを作成、バインド、実行、クローズできます。
  • Flight SQL アクションを使用してセッションと設定を管理できます。
  • TLS 暗号化とユーザー名/パスワード認証に対応しています。
  • PollFlightInfo による結果の段階的な取得。
  • CancelFlightInfo によるクエリのキャンセル。

Arrow Flight サーバーを有効にする

Arrow Flight サーバーを有効にするには、ClickHouse サーバーの設定に arrowflight_port を追加します。

<clickhouse>
    <arrowflight_port>9090</arrowflight_port>
</clickhouse>

起動時に、インターフェイスが有効になったことを示すログメッセージが出力されます:

{} <Information> Application: Arrow Flight compatibility protocol: 0.0.0.0:9090

TLS 設定

Arrow Flight インターフェイスで TLS を有効にするには、次の設定を行います:

<clickhouse>
    <arrowflight_port>9090</arrowflight_port>
    <arrowflight>
        <enable_ssl>true</enable_ssl>
        <ssl_cert_file>/path/to/server-cert.pem</ssl_cert_file>
        <ssl_key_file>/path/to/server-key.pem</ssl_key_file>
    </arrowflight>
</clickhouse>

TLS を有効にしている場合、クライアントは grpc:// ではなく grpc+tls:// スキームで接続する必要があります。

認証

Arrow Flight インターフェイスでは、2 つの認証方式がサポートされています。

基本認証

クライアントは、標準の HTTP Authorization: Basic ヘッダーを使用して、ユーザー名とパスワードで認証を行います。認証に成功すると、サーバーはレスポンスヘッダーに Bearer トークンを返します。

Bearer Token認証

後続のリクエストでは、Basic認証で返されたBearerトークンを、Authorization: Bearer <token> ヘッダーを使用して利用できます。トークンは使用のたびに自動的に更新され、default_session_timeout サーバー設定に従って失効します (デフォルト: 60秒) 。

Pythonの例

import pyarrow.flight as flight

client = flight.FlightClient("grpc://localhost:9090")

# Basic auth returns a bearer token for subsequent calls
token_pair = client.authenticate_basic_token("default", "")
options = flight.FlightCallOptions(headers=[token_pair])

TLS を使用する場合:

import pyarrow.flight as flight

with open("ca-cert.pem", "rb") as f:
    tls_root_certs = f.read()

client = flight.FlightClient(
    "grpc+tls://localhost:9090",
    tls_root_certs=tls_root_certs,
)

token_pair = client.authenticate_basic_token("default", "password")
options = flight.FlightCallOptions(headers=[token_pair])

セッション管理

Arrow Flight インターフェイスは、カスタム gRPC メタデータヘッダーを通じて ClickHouse セッションをサポートします。

HeaderDescription
x-clickhouse-session-idセッション識別子。指定した場合、複数のリクエストで同じセッション状態 (一時テーブル、設定) が共有されます。
x-clickhouse-session-timeout秒単位のセッションタイムアウト。max_session_timeout を超えることはできません。
x-clickhouse-session-checkセッションを作成せずに存在確認を行うには、1 に設定します。
x-clickhouse-session-closeリクエスト完了後にセッションを閉じるには、1 に設定します。これを使用するには、サーバー設定で enable_arrow_close_sessiontrue にする必要があります。
注記

Arrow Flight は HTTP/2 上で gRPC を使用するため、メタデータヘッダー名は大文字と小文字を区別し、ここに示したとおり、正確に小文字で指定する必要があります (例: x-clickhouse-session-idX-ClickHouse-Session-Id ではありません) 。これは、HTTP/2 のフィールド名を小文字のみに限定している RFC 9113, Section 8.2 による要件です。ヘッダー名が大文字小文字を区別しない HTTP/1.1 とは異なります。

セッションを使用すると、SetSessionOptions アクションを介して ClickHouse の設定を永続的に設定できます (DoAction を参照) 。

サーバー設定リファレンス

設定デフォルト説明
arrowflight_portArrow Flight サーバーのポート。この設定を指定した場合にのみ、サーバーが起動します。
arrowflight.enable_sslfalseTLS 暗号化を有効にします。
arrowflight.ssl_cert_fileTLS 証明書ファイルのパス。TLS を有効にする場合に必須です。
arrowflight.ssl_key_fileTLS 秘密鍵ファイルのパス。TLS を有効にする場合に必須です。
arrowflight.tickets_lifetime_seconds600Flight チケットの有効期限が切れてクリーンアップされるまでの時間 (秒) 。チケットの自動失効を無効にするには 0 に設定します。
arrowflight.cancel_ticket_after_do_getfalsetrue の場合、DoGet で消費された直後にチケットをキャンセルし、メモリを解放します。
arrowflight.poll_descriptors_lifetime_seconds600poll ディスクリプタ の有効期限が切れるまでの時間 (秒) 。自動失効を無効にするには 0 に設定します。
arrowflight.cancel_flight_descriptor_after_poll_flight_infofalsetrue の場合、PollFlightInfo で消費された後に poll ディスクリプタ をキャンセルします。
arrowflight.max_prepared_statements_per_user100ユーザーごとに同時に開ける プリペアドステートメント の最大数。制限を無効にするには 0 に設定します。
arrowflight.prepared_statements_lifetime_seconds-1プリペアドステートメント の有効期間モード。> 0: この値を有効期間として使用し、セッションに紐づく文とセッションレス文の両方で、各リクエストごとに有効期限を更新します。0: 自動失効を無効にします。-1: セッションに紐づく文では、セッションタイムアウトを有効期間として使用し、各リクエストごとに更新します。セッションレス文は自動では失効しません。
enable_arrow_close_sessiontruex-clickhouse-session-close ヘッダーを介してクライアントがセッションを閉じられるようにします。
default_session_timeout60デフォルトのセッションタイムアウト (秒) 。Bearer トークンの有効期限も制御します。
max_session_timeout3600許可されるセッションタイムアウトの最大値 (秒) 。

対応している RPC メソッド

GetFlightInfo

クエリを実行し、結果のschema、データ取得用チケットを含むエンドポイント、行数、およびバイト数を含むFlightInfoを返します。

受け付けるFlightDescriptorは、次のいずれかです。

  • PATH ディスクリプタ: テーブル名として解釈される、1つの部分からなるパスです。SELECT * FROM <table>を生成します。
  • CMD ディスクリプタ: 生のSQLクエリ文字列、またはシリアライズされた Flight SQL protobuf command のいずれかです (Flight SQL Commandsを参照) 。

クエリは最後まで実行され、結果はサーバー側のチケットに保存されます。各データブロックごとに個別のエンドポイント/チケットが生成されるため、クライアントはデータを並列に取得できます。

# Query by table name
descriptor = flight.FlightDescriptor.for_path("my_table")
info = client.get_flight_info(descriptor, options)

# Query by SQL
descriptor = flight.FlightDescriptor.for_command(
    "SELECT * FROM my_table WHERE id > 100"
)
info = client.get_flight_info(descriptor, options)

# Retrieve results
for endpoint in info.endpoints:
    reader = client.do_get(endpoint.ticket, options)
    table = reader.read_all()
    print(table.to_pandas())

PollFlightInfo

長時間実行されるクエリについて、結果を段階的に取得できるようにします。GetFlightInfo のようにクエリ全体の完了を待つのではなく、PollFlightInfo は結果をブロック単位で返します。

最初の呼び出しでクエリの実行が開始され、レスポンスには次が含まれます。

  • その時点で利用可能なデータブロックの エンドポイント を含む FlightInfo
  • 次回のポーリング用の FlightDescriptor (さらに結果が見込まれる場合)

返されたディスクリプタを使って後続の呼び出しを行うと、追加のブロックを取得できます。これ以上利用可能なデータがない場合、レスポンスには次のディスクリプタは含まれません。

注記

現在の実装では、データブロックが利用可能になるまで待機し、データがない場合に即座に返すことはありません。

GetSchema

完全なクエリを実行せずに、クエリ結果の Arrow schema を返します。GetFlightInfo と同じディスクリプタ型を受け付けます。

descriptor = flight.FlightDescriptor.for_command(
    "SELECT 1 AS x, 'hello' AS y"
)
schema_result = client.get_schema(descriptor, options)
schema = schema_result.schema
print(schema)  # x: int32, y: string

DoGet

指定されたチケットのデータを取得します。指定できるのは、次のいずれかです。

  • GetFlightInfo または PollFlightInfo が返すチケット。
  • チケット値として指定する、生の SQL クエリ文字列。
# Using a ticket from GetFlightInfo
reader = client.do_get(endpoint.ticket, options)
table = reader.read_all()

# Using a raw SQL query as ticket
ticket = flight.Ticket("SELECT number FROM system.numbers LIMIT 10")
reader = client.do_get(ticket, options)
table = reader.read_all()

DoPut

ClickHouse にデータを送信します。FlightDescriptor と Arrow のレコードバッチのストリームを受け取ります。

テーブル名を指定して挿入 (PATH ディスクリプタ) :

schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
batch = pa.record_batch(
    [pa.array([1, 2, 3]), pa.array(["Alice", "Bob", "Charlie"])],
    schema=schema,
)

descriptor = flight.FlightDescriptor.for_path("my_table")
writer, _ = client.do_put(descriptor, schema, options)
writer.write_batch(batch)
writer.close()

SQL による挿入 (CMD ディスクリプタ) :

descriptor = flight.FlightDescriptor.for_command(
    "INSERT INTO my_table FORMAT Arrow"
)
writer, _ = client.do_put(descriptor, schema, options)
writer.write_batch(batch)
writer.close()

Flight SQL CommandStatementUpdate による DDL/DML の実行:

Flight SQL クライアントでは、DDL/DML 文 (CREATE、INSERT、ALTER など) の実行に CommandStatementUpdate を使用します。レスポンスには、影響を受けた行数が含まれます。

Flight SQL CommandStatementIngest による一括取り込み:

サポートされるのは、既存テーブルへの追記のみです (TABLE_NOT_EXIST_OPTION_FAIL + TABLE_EXISTS_OPTION_APPEND) 。このコマンドでは、カタログと一時テーブルはサポートされません。

transaction_idCommandStatementUpdate または CommandStatementIngest ではサポートされません。指定した場合、ClickHouse は NotImplemented エラーを返します。

注記

データ転送で受け付けるのは Arrow フォーマットのみです。SQL でほかのフォーマット (例: FORMAT JSON) を指定すると、エラーになります。

DoAction

名前付きアクションを実行します。サポートされているアクションは次のとおりです。

CancelFlightInfo

FlightInfo に関連付けられた実行中のクエリをキャンセルします。クエリ ID は FlightInfoapp_metadata フィールドから抽出されます。また、そのクエリに関連付けられたポーリングディスクリプタもキャンセルします。

# Start a long-running query via PollFlightInfo, then cancel it
cancel_request = flight.CancelFlightInfoRequest(info)
result = client.cancel_flight_info(cancel_request, options)
# result.status is CancelStatus.CANCELLED if successful

SetSessionOptions

現在のセッションに対する ClickHouse サーバー設定を行います。x-clickhouse-session-id ヘッダーでセッション ID を設定しておく必要があります。

サポートされる値の型: string、boolean、integer、double、string リスト。

設定名が不明な場合は、エラー INVALID_NAME が返されます。値を解析できない場合は、エラー INVALID_VALUE が返されます。

GetSessionOptions

現在のセッションにおけるすべての ClickHouse 設定とその値を返します。設定名から文字列値へのマップを返します (内部的には system.settings をクエリします) 。

CreatePreparedStatement

サーバー側のプリペアドステートメントを作成し、ステートメントハンドルを返します。リクエストには、? プレースホルダーを含む SQL クエリテキストが含まれます。

このアクションでは transaction_id はサポートされていません。指定した場合、ClickHouse は NotImplemented エラーを返します。

クエリ文の場合、レスポンスには次が含まれることがあります。

  • dataset_schema: result set のスキーマ。
  • parameter_schema: ステートメントパラメーターのスキーマ。

有効なクエリに対して スキーマ inference が失敗した場合でも (たとえば、そのクエリではプレースホルダーを NULL に置き換えることが有効でない場合) 、ClickHouse は引き続きプリペアドステートメントを作成し、dataset_schema なしでハンドルを返します。

プリペアドステートメントは単一のセッションではなく、認証されたユーザーに属します。同じユーザーとして複数のセッションを開いている場合、それらのどのセッションからでも同じステートメントハンドルを実行、再バインド、クローズできます。

他のユーザーは、自分が作成していないステートメントハンドルを実行、バインド、またはクローズできません。

arrowflight.prepared_statements_lifetime_seconds は有効期限の動作を制御します。

  • > 0: 設定された値をステートメントの有効期間として使用します。セッションに紐付くステートメントとセッションを持たないステートメントの両方で、各リクエスト時に有効期限が更新されます。
  • 0: プリペアドステートメントは自動的に期限切れになりません。
  • -1 (default): ステートメントがセッション内で作成された場合、その有効期間はそのセッションのタイムアウトに従い、そのセッション内の各リクエスト時に更新されます。ステートメントがセッションなしで作成された場合、自動的に期限切れにはなりません。

期限切れになったステートメントは削除され、arrowflight.max_prepared_statements_per_user のカウント対象にも含まれなくなります。

ClosePreparedStatement

リクエストに空でないステートメントハンドルが含まれている場合、プリペアドステートメントを閉じ、関連するサーバー側リソースを解放します。

ClickHouse は、ハンドルが空の場合に ClosePreparedStatement による一括クローズもサポートしています。

  • x-clickhouse-session-id が存在する場合、そのセッション内で認証済みユーザーのすべてのプリペアドステートメントを閉じます。
  • セッション ID が存在しない場合、認証済みユーザーのセッションに属さないプリペアドステートメントのみを閉じます。

プリペアドステートメントがセッション内で (x-clickhouse-session-id 経由で) 作成された場合、そのセッションが閉じられると自動的に閉じられます。

Flight SQL コマンド

CMD ディスクリプタにシリアライズされた Flight SQL protobuf メッセージが含まれている場合、ClickHouse は以下のコマンドを処理します。

GetFlightInfo / GetSchema でサポートされるコマンド

CommandDescription
CommandStatementQuery任意の SQL クエリを実行します。transaction_id はサポートされていません。
CommandGetSqlInfoサーバーのメタデータ (名前、バージョン、Arrow のバージョン、機能) を取得します。
CommandGetCatalogsカタログを一覧表示します。空の結果を返します (ClickHouse はカタログを使用しません) 。
CommandGetDbSchemasデータベースを一覧表示します。省略可能な db_schema_filter_pattern (SQL の LIKE パターン) をサポートします。
CommandGetTablesテーブルを一覧表示します。スキーマ、テーブル名、テーブルタイプのフィルターと、スキーマ を含める省略可能な設定をサポートします。
CommandGetTableTypesテーブルエンジンの種類 (system.table_engines から) を一覧表示します。
CommandGetPrimaryKeys指定したテーブルの主キーカラムを取得します。
CommandPreparedStatementQueryハンドルで準備済みの SELECT スタイルのプリペアドステートメントを実行します。

DoPut でサポート

Command説明
CommandStatementUpdateDDL/DML 文 (CREATE、INSERT、ALTER など) を実行します。影響を受けた行数を返します。transaction_id はサポートされていません。
CommandStatementIngestArrow データを既存のテーブルに一括挿入します。サポートされるのは追記モードのみです。transaction_id はサポートされていません。
CommandPreparedStatementQueryDoPut 経由で送信する際に、準備済み文のパラメータ値をバインドし、ステートメントハンドルを含む DoPutPreparedStatementResult を返します。受け付けるパラメータセットは 1 つ (1 行) のみで、バインドする値の数は ? プレースホルダーの数と完全に一致している必要があります。
CommandPreparedStatementUpdateハンドルで準備済みの DDL/DML プリペアドステートメントを実行し、影響を受けた行数を返します。

ClickHouse では未対応

これらのコマンドは、ClickHouse で提供されていない機能に対応しているため、Arrow Flight SQL インターフェイスではサポートされません。

CommandReason
CommandGetCrossReferenceClickHouse はリレーショナルデータベースではなく、外部キー制約も実装していないため、クロスリファレンスのメタデータは利用できません。
CommandGetExportedKeysClickHouse はリレーショナルデータベースではなく、外部キー制約も実装していないため、エクスポートされたキーのメタデータは利用できません。
CommandGetImportedKeysClickHouse はリレーショナルデータベースではなく、外部キー制約も実装していないため、インポートされたキーのメタデータは利用できません。
CommandStatementSubstraitPlanClickHouse は Substrait プランをサポートしていません。

完全なサンプル

import pyarrow as pa
import pyarrow.flight as flight

# Connect and authenticate
client = flight.FlightClient("grpc://localhost:9090")
token = client.authenticate_basic_token("default", "")
options = flight.FlightCallOptions(headers=[token])

# Insert data using DoPut with a PATH descriptor
schema = pa.schema([("id", pa.uint32()), ("value", pa.string())])
batch = pa.record_batch(
    [pa.array([1, 2, 3], type=pa.uint32()), pa.array(["a", "b", "c"])],
    schema=schema,
)
descriptor = flight.FlightDescriptor.for_path("test")
writer, _ = client.do_put(descriptor, schema, options)
writer.write_batch(batch)
writer.close()

# Query data using GetFlightInfo + DoGet
descriptor = flight.FlightDescriptor.for_command(
    "SELECT * FROM test ORDER BY id"
)
info = client.get_flight_info(descriptor, options)
for endpoint in info.endpoints:
    reader = client.do_get(endpoint.ticket, options)
    table = reader.read_all()
    print(table.to_pandas())

出力:

   id value
0   1     a
1   2     b
2   3     c

データフォーマット

すべてのデータは Apache Arrow IPC フォーマットで転送されます。サポートされるのは Arrow フォーマットのみで、その他の ClickHouse フォーマット (例: FORMAT JSONFORMAT CSV) を指定するとエラーが発生します。

ClickHouse のデータ型は、シリアライゼーション時に Arrow の型へマッピングされます。設定 output_format_arrow_unsupported_types_as_binary は、サポート対象外の ClickHouse 型をバイナリブロブとしてシリアライズするかどうかを制御します。

互換性

Arrow Flight インターフェイスは、Arrow Flight または Arrow Flight SQL プロトコルをサポートするあらゆるクライアントやツールと互換性があり、以下が含まれます。

  • Python (pyarrow)
  • Java (org.apache.arrow.flight)
  • C++ (arrow::flight)
  • Go (apache/arrow/go)
  • ADBC (Arrow Database Connectivity) ドライバー
  • DBeaver、および Flight SQL をサポートするその他のツール

お使いのツールでネイティブの ClickHouse コネクタ (例: JDBC、ODBC、ネイティブプロトコル) を利用できる場合は、パフォーマンスまたはフォーマットの互換性のために Arrow Flight が特に必要でない限り、そちらの使用を優先してください。

クライアント側の ArrowFlight 機能

ClickHouse は、外部の Arrow Flight サーバーからデータを読み取るための Flight クライアントとして動作することもできます。次を参照してください。

関連項目