メインコンテンツまでスキップ
メインコンテンツまでスキップ

dltをClickHouseに接続する

Community Maintained

dlt は、様々な乱雑なデータソースから、よく構造化されたライブデータセットにデータをロードするためにPythonスクリプトに追加できるオープンソースライブラリです。

ClickHouseでdltをインストール

dltライブラリをClickHouse依存関係とともにインストールする方法:

pip install "dlt[clickhouse]"

セットアップガイド

1. dltプロジェクトの初期化

以下のように、新しいdltプロジェクトを初期化します:

dlt init chess clickhouse
注記

このコマンドは、ソースとしてチェスを、宛先としてClickHouseを使用して、パイプラインを初期化します。

上記のコマンドは、.dlt/secrets.tomlやClickHouseのための要件ファイルを含むいくつかのファイルとディレクトリを生成します。要件ファイルに指定された必要な依存関係を次のようにして実行することでインストールできます:

pip install -r requirements.txt

または、pip install dlt[clickhouse]を使用して、dltライブラリとClickHouse宛先用の必要な依存関係をインストールします。

2. ClickHouseデータベースのセットアップ

ClickHouseにデータをロードするには、ClickHouseデータベースを作成する必要があります。以下はそのための大まかな手順です:

  1. 既存のClickHouseデータベースを使用するか、新しいデータベースを作成できます。

  2. 新しいデータベースを作成するには、clickhouse-clientコマンドラインツールまたは選択したSQLクライアントを使用してClickHouseサーバーに接続します。

  3. 新しいデータベース、ユーザーを作成し、必要な権限を付与するために次のSQLコマンドを実行します:

CREATE DATABASE IF NOT EXISTS dlt;
CREATE USER dlt IDENTIFIED WITH sha256_password BY 'Dlt*12345789234567';
GRANT CREATE, ALTER, SELECT, DELETE, DROP, TRUNCATE, OPTIMIZE, SHOW, INSERT, dictGet ON dlt.* TO dlt;
GRANT SELECT ON INFORMATION_SCHEMA.COLUMNS TO dlt;
GRANT CREATE TEMPORARY TABLE, S3 ON *.* TO dlt;

3. 認証情報の追加

次に、.dlt/secrets.tomlファイルにClickHouseの認証情報を以下のように設定します:

[destination.clickhouse.credentials]
database = "dlt"                         # The database name you created
username = "dlt"                         # ClickHouse username, default is usually "default"
password = "Dlt*12345789234567"          # ClickHouse password if any
host = "localhost"                       # ClickHouse server host
port = 9000                              # ClickHouse HTTP port, default is 9000
http_port = 8443                         # HTTP Port to connect to ClickHouse server's HTTP interface. Defaults to 8443.
secure = 1                               # Set to 1 if using HTTPS, else 0.

[destination.clickhouse]
dataset_table_separator = "___"          # Separator for dataset table names from dataset.
注記

HTTP_PORT http_portパラメータは、ClickHouseサーバーのHTTPインターフェースに接続する際に使用するポート番号を指定します。これは、ネイティブTCPプロトコルに使用されるデフォルトのポート9000とは異なります。

外部ステージングを使用しない場合(すなわち、パイプラインでステージングパラメータを設定しない場合)には、http_portを設定する必要があります。これは、組み込みのClickHouseローカルストレージステージングが、HTTPを介してClickHouseと通信するclickhouse contentライブラリを使用しているためです。

指定されたポートでHTTP接続を受け入れるようにClickHouseサーバーが構成されていることを確認してください。例えば、http_port = 8443を設定した場合、ClickHouseはポート8443でHTTPリクエストを待っている必要があります。外部ステージングを使用している場合は、clickhouse-connectが使用されないため、http_portパラメータを省略できます。

clickhouse-driverライブラリが使用するのと似たデータベース接続文字列を渡すことができます。上記の認証情報は次のようになります:


# keep it at the top of your toml file, before any section starts.
destination.clickhouse.credentials="clickhouse://dlt:Dlt*12345789234567@localhost:9000/dlt?secure=1"

書き込みの方法

すべての書き込み方法がサポートされています。

dltライブラリの書き込み方法は、データを宛先にどのように書き込むかを定義します。書き込み方法には次の3種類があります:

Replace: この方法は、リソースからのデータで宛先のデータを置き換えます。すべてのクラスとオブジェクトを削除し、データをロードする前にスキーマを再作成します。詳細についてはこちらをご覧ください。

Merge: この書き込み方法は、リソースのデータと宛先のデータをマージします。merge方法では、リソースのprimary_keyを指定する必要があります。詳細についてはこちらをご覧ください。

Append: これはデフォルトの方法です。データは、宛先の既存データに追加され、primary_keyフィールドは無視されます。

データのロード

データは、データソースに応じて最も効率的な方法を使用してClickHouseにロードされます:

  • ローカルファイルに関しては、clickhouse-connectライブラリを使用して、INSERTコマンドを使用してClickHouseテーブルにファイルを直接ロードします。
  • リモートストレージ(S3Google Cloud StorageAzure Blob Storageなど)のファイルでは、ClickHouseテーブル関数(s3、gcs、azureBlobStorageなど)が使用され、ファイルを読み込み、データをテーブルに挿入します。

データセット

Clickhouseは1つのデータベースに複数のデータセットをサポートしていませんが、dltは様々な理由からデータセットに依存しています。Clickhousedltと共に機能させるために、Clickhouseデータベース内にdltによって生成されたテーブルは、データセット名で接頭辞が付けられ、設定可能なdataset_table_separatorによって区切られます。さらに、データを含まない特別なセンチネルテーブルが作成され、dltClickhouse宛先に既に存在する仮想データセットを認識できるようになります。

サポートされているファイル形式

  • jsonlは、直接のロードとステージングの両方において推奨される形式です。
  • parquetは、直接のロードとステージングの両方をサポートしています。

clickhouse宛先には、デフォルトのSQL宛先からいくつかの特定の偏差があります:

  1. Clickhouseには実験的なobjectデータ型がありますが、若干予測不可能であることが明らかになったため、dlt Clickhouse宛先では複雑なデータ型をテキスト列としてロードします。この機能が必要な場合は、私たちのSlackコミュニティに連絡し、追加を検討します。
  2. Clickhousetimeデータ型をサポートしていません。時間はtext列にロードされます。
  3. Clickhousebinaryデータ型をサポートしていません。そのため、バイナリデータはtext列にロードされます。jsonlからのロードでは、バイナリデータはbase64文字列になります。parquetからロードするとき、binaryオブジェクトはtextに変換されます。
  4. Clickhouseは、nullでない populatedなテーブルにカラムを追加することを許可しています。
  5. Clickhouseは、floatまたはdoubleデータ型を使用している場合、特定の条件下で丸めエラーを生成する可能性があります。丸めエラーが許容できない場合は、decimalデータ型を使用してください。例えば、jsonlにて12.7001の値をdouble列にロードすると、予測可能な丸めエラーが発生します。

サポートされているカラムヒント

ClickHouseは次のカラムヒントをサポートしています:

  • primary_key - カラムを主キーの一部としてマークします。複数のカラムがこのヒントを持つことで、複合主キーを作成できます。

テーブルエンジン

デフォルトでは、ClickHouseではReplicatedMergeTreeテーブルエンジンを使用してテーブルが作成されます。clickhouseアダプタを使用して、代替のテーブルエンジンをtable_engine_typeで指定できます:

from dlt.destinations.adapters import clickhouse_adapter

@dlt.resource()
def my_resource():
  ...

clickhouse_adapter(my_resource, table_engine_type="merge_tree")

サポートされている値は次のとおりです:

  • merge_tree - MergeTreeエンジンを使用してテーブルを作成します。
  • replicated_merge_tree (デフォルト) - ReplicatedMergeTreeエンジンを使用してテーブルを作成します。

ステージングサポート

ClickHouseは、Amazon S3、Google Cloud Storage、Azure Blob Storageをファイルのステージング宛先としてサポートしています。

dltは、Parquetまたはjsonlファイルをステージング場所にアップロードし、ClickHouseテーブル関数を使用して、ステージングされたファイルから直接データをロードします。

ステージング宛先の認証情報を構成する方法については、ファイルシステムのドキュメントを参照してください:

ステージングが有効な状態でパイプラインを実行するには:

pipeline = dlt.pipeline(
  pipeline_name='chess_pipeline',
  destination='clickhouse',
  staging='filesystem',  # add this to activate staging
  dataset_name='chess_data'
)

Google Cloud Storageをステージングエリアとして使用する

dltは、ClickHouseにデータをロードする際にGoogle Cloud Storage(GCS)をステージングエリアとして使用することをサポートしています。これは、ClickHouseのGCSテーブル関数によって自動的に処理され、dltが内部で使用します。

clickhouseのGCSテーブル関数は、ハッシュベースのメッセージ認証コード(HMAC)キーを使用した認証のみをサポートしています。これを有効にするために、GCSはAmazon S3 APIをエミュレートするS3互換モードを提供しています。ClickHouseは、これを利用してGCSバケットにS3統合を通じてアクセスします。

dltでHMAC認証を使用したGCSステージングを設定するには:

  1. Google Cloudガイドに従って、GCSサービスアカウントのHMACキーを作成します。

  2. dltプロジェクトのClickHouse宛先設定で、HMACキーとともにclient_emailproject_idprivate_keyを設定しますconfig.tomlに:

[destination.filesystem]
bucket_url = "gs://dlt-ci"

[destination.filesystem.credentials]
project_id = "a-cool-project"
client_email = "[email protected]"
private_key = "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkaslkdjflasjnkdcopauihj...wEiEx7y+mx\nNffxQBqVVej2n/D93xY99pM=\n-----END PRIVATE KEY-----\n"

[destination.clickhouse.credentials]
database = "dlt"
username = "dlt"
password = "Dlt*12345789234567"
host = "localhost"
port = 9440
secure = 1
gcp_access_key_id = "JFJ$$*f2058024835jFffsadf"
gcp_secret_access_key = "DFJdwslf2hf57)%$02jaflsedjfasoi"

注意:HMACキーbashgcp_access_key_idgcp_secret_access_keyに加え、サービスアカウントのclient_emailproject_idprivate_key[destination.filesystem.credentials]の下に提供する必要があります。これは、GCSステージングサポートが一時的な回避策として実装されており、まだ最適化されていないためです。

dltはこれらの認証情報をClickHouseに渡し、認証とGCSアクセスを処理させます。

将来的にClickHouse dlt宛先のGCSステージングのセットアップを簡素化し改善するための作業が進行中です。適切なGCSステージングサポートは、以下のGitHub課題で追跡されています:

  • ファイルシステム宛先が機能するようにgcsのS3互換モードで
  • Google Cloud Storageステージングエリアのサポート

Dbtサポート

dbtとの統合は、dbt-clickhouseを介して一般的にサポートされています。

dlt状態の同期

この宛先は、dlt状態の同期を完全にサポートしています。