このエンジンは Azure Blob Storage エコシステムとの統合を提供し、ストリーミングデータのインポートを可能にします。
テーブルの作成
CREATE TABLE test (name String, value UInt32)
ENGINE = AzureQueue(...)
[SETTINGS]
[mode = '',]
[after_processing = 'keep',]
[keeper_path = '',]
...
エンジンパラメータ
AzureQueue のパラメータは、AzureBlobStorage テーブルエンジンでサポートされるものと同一です。パラメータについてはこちらを参照してください。
AzureBlobStorage テーブルエンジンと同様に、ローカル環境での Azure Storage 開発には Azurite エミュレーターを利用できます。詳細はこちらを参照してください。
例
CREATE TABLE azure_queue_engine_table
(
`key` UInt64,
`data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS mode = 'unordered'
Settings
サポートされている設定群は、基本的には S3Queue テーブルエンジンと同じですが、s3queue_ というプレフィックスは付きません。設定の完全な一覧を参照してください。
テーブルに対して構成されている設定の一覧を取得するには、system.azure_queue_settings テーブルを使用します。24.10 以降のバージョンで利用可能です。
以下は、AzureQueue にのみ対応し、S3Queue には適用されない設定です。
after_processing_move_connection_string
宛先が別の Azure コンテナーである場合に、正常に処理されたファイルを移動するための Azure Blob Storage 接続文字列。
指定可能な値:
デフォルト値: 空文字列。
after_processing_move_container
移動先が別の Azure コンテナである場合に、正常に処理されたファイルを移動する移動先コンテナ名。
指定可能な値:
デフォルト値: 空文字列。
例:
CREATE TABLE azure_queue_engine_table
(
`key` UInt64,
`data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS
mode = 'unordered',
after_processing = 'move',
after_processing_move_connection_string = 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;',
after_processing_move_container = 'dst-container';
AzureQueue テーブルエンジンからの SELECT
AzureQueue テーブルでは、デフォルトで SELECT クエリは禁止されています。これは、データを一度読み取ったらキューから削除するという一般的なキューのパターンに従うためです。誤ってデータを失わないようにするため、SELECT は禁止されています。
ただし、場合によっては SELECT が必要になることもあります。その場合は、stream_like_engine_allow_direct_select 設定を True にする必要があります。
AzureQueue エンジンには、SELECT クエリ用の特別な設定 commit_on_select があります。キューから読み取った後もデータを保持したい場合は False に、読み取り後に削除したい場合は True に設定します。
SELECT は、各ファイルを 1 回しかインポートできないため(デバッグ用途を除き)ストリーミングインポートにはあまり有用ではありません。代わりに、マテリアライズドビュー を使用してリアルタイム処理フローを作成する方が実用的です。これを行うには、次のようにします。
- エンジンを使用して、S3 内の指定パスからデータを取り込むテーブルを作成し、それをデータストリームとみなします。
- 目的の構造を持つテーブルを作成します。
- エンジンからのデータを変換し、事前に作成したテーブルに格納するマテリアライズドビューを作成します。
MATERIALIZED VIEW をエンジンと関連付けると、バックグラウンドでデータの取り込みを開始します。
例:
CREATE TABLE azure_queue_engine_table (key UInt64, data String)
ENGINE=AzureQueue('<endpoint>', 'CSV', 'gzip')
SETTINGS
mode = 'unordered';
CREATE TABLE stats (key UInt64, data String)
ENGINE = MergeTree() ORDER BY key;
CREATE MATERIALIZED VIEW consumer TO stats
AS SELECT key, data FROM azure_queue_engine_table;
SELECT * FROM stats ORDER BY key;
仮想カラム
_path — ファイルへのパス。
_file — ファイル名。
仮想カラムの詳細については、こちらを参照してください。
イントロスペクション
テーブル設定 enable_logging_to_queue_log=1 を有効にして、テーブルに対するログ記録を有効化します。
イントロスペクション機能は S3Queue テーブルエンジン と同じですが、いくつか明確な違いがあります:
- サーバーバージョンが >= 25.1 の場合、キューのインメモリ状態には
system.azure_queue を使用します。古いバージョンでは system.s3queue を使用します(こちらにも azure テーブルに関する情報が含まれます)。
- メインの ClickHouse 設定で
system.azure_queue_log を有効化します。例:
<azure_queue_log>
<database>system</database>
<table>azure_queue_log</table>
</azure_queue_log>
この永続テーブルは、system.s3queue と同じ情報を保持しますが、処理済みおよび失敗したファイルに関するものです。
このテーブルの構造は次のとおりです。
CREATE TABLE system.azure_queue_log
(
`hostname` LowCardinality(String) COMMENT 'Hostname',
`event_date` Date COMMENT 'Event date of writing this log row',
`event_time` DateTime COMMENT 'Event time of writing this log row',
`database` String COMMENT 'The name of a database where current S3Queue table lives.',
`table` String COMMENT 'The name of S3Queue table.',
`uuid` String COMMENT 'The UUID of S3Queue table',
`file_name` String COMMENT 'File name of the processing file',
`rows_processed` UInt64 COMMENT 'Number of processed rows',
`status` Enum8('Processed' = 0, 'Failed' = 1) COMMENT 'Status of the processing file',
`processing_start_time` Nullable(DateTime) COMMENT 'Time of the start of processing the file',
`processing_end_time` Nullable(DateTime) COMMENT 'Time of the end of processing the file',
`exception` String COMMENT 'Exception message if happened'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
COMMENT 'Contains logging entries with the information files processes by S3Queue engine.'
例:
SELECT *
FROM system.azure_queue_log
LIMIT 1
FORMAT Vertical
Row 1:
──────
hostname: clickhouse
event_date: 2024-12-16
event_time: 2024-12-16 13:42:47
database: default
table: azure_queue_engine_table
uuid: 1bc52858-00c0-420d-8d03-ac3f189f27c8
file_name: test_1.csv
rows_processed: 3
status: Processed
processing_start_time: 2024-12-16 13:42:47
processing_end_time: 2024-12-16 13:42:47
exception:
1 row in set. Elapsed: 0.002 sec.