メインコンテンツまでスキップ
メインコンテンツまでスキップ

アーキテクチャ概要

ClickHouseは真の列指向DBMSです。データはカラム単位で保存され、配列(カラムのベクトルまたはチャンク)を実行する際に処理されます。 可能な限り、個々の値ではなく、配列に対して操作がディスパッチされます。 これを「ベクトル化クエリ実行」と呼び、実際のデータ処理コストを低下させるのに役立ちます。

このアイデアは新しいものではありません。 1957年の APL(A programming language)やその子孫(A +(APL方言)、J(1990年)、K(1993年)、Q(Kx Systemsのプログラミング言語、2003年))にまで遡ります。 配列プログラミングは科学的データ処理で使用されており、このアイデアはリレーショナルデータベースにおいても新しいものではありません。例えば、VectorWiseシステム(Actian CorporationによるActian Vector Analytic Databaseとしても知られる)がこのアイデアを使用しています。

クエリ処理を迅速化するための2つの異なるアプローチがあります:ベクトル化クエリ実行とランタイムコード生成です。後者はすべての間接呼び出しと動的ディスパッチを取り除きます。これらのアプローチのどちらも、必ずしも他のどちらより優れているわけではありません。ランタイムコード生成は多くの操作を融合させ、CPU実行ユニットとパイプラインを完全に活用する場合に優れています。一方で、ベクトル化クエリ実行は一時的なベクトルをキャッシュに書き込み、再読み込みする必要があるため、実用性が低くなる場合があります。もし一時データがL2キャッシュに収まらない場合、これが問題となります。しかし、ベクトル化クエリ実行はCPUのSIMD機能をより活用しやすいです。友人たちが書いた研究論文によれば、この2つのアプローチを組み合わせるのが最も効果的であることが示されています。ClickHouseはベクトル化クエリ実行を使用し、初期段階でランタイムコード生成に対する限定的なサポートがあります。

カラム

IColumnインターフェースは、メモリ内のカラム(実際にはカラムのチャンク)を表現するために使用されます。このインターフェースは、さまざまなリレーショナル演算子の実装に役立つメソッドを提供します。ほとんどの操作は不変であり、元のカラムを修正するのではなく、新たに修正されたカラムを作成します。例えば、IColumn :: filterメソッドはフィルタバイトマスクを受け取ります。これはWHEREおよびHAVINGリレーショナル演算子で使用されます。その他の例としては、ORDER BYをサポートするためのIColumn :: permuteメソッドや、LIMITをサポートするためのIColumn :: cutメソッドがあります。

さまざまなIColumnの実装(ColumnUInt8ColumnStringなど)は、カラムのメモリレイアウトを担当します。メモリレイアウトは通常、連続した配列です。整数型のカラムの場合、それは一つの連続配列(std::vectorのようなもの)で表現されます。 StringArrayカラムについては、連続的に配置された配列のすべての要素用の一つと、各配列の先頭へのオフセット用の二つのベクトルがあります。また、ColumnConstはメモリに1つの値だけを保存しますが、カラムのように見えます。

フィールド

それでも、個々の値を操作することも可能です。個々の値を表現するために、Fieldが使用されます。Fieldは単にUInt64Int64Float64StringArrayの識別されたユニオンです。IColumnには、n番目の値をFieldとして取得するためのoperator []メソッドと、Fieldをカラムの末尾に追加するためのinsertメソッドがあります。これらのメソッドは、一個の値を表す一時的なFieldオブジェクトを扱う必要があるため、非常に効率的ではありません。insertFrominsertRangeFromなど、より効率的なメソッドもあります。

Fieldは、テーブル用の特定のデータ型に関する十分な情報を持っていません。例えば、UInt8UInt16UInt32UInt64はすべてFieldではUInt64として表されます。

漏れた抽象

IColumnはデータの一般的なリレーショナル変換のメソッドを持っていますが、すべてのニーズに応えるわけではありません。例えば、ColumnUInt64には2つのカラムの合計を計算するメソッドがなく、ColumnStringには部分文字列検索を実行するメソッドがありません。これらの無数のルーチンは、IColumnの外部で実装されています。

カラムに対するさまざまな関数は、IColumnメソッドを使用してField値を抽出する一般的かつ非効率的な方法で実装するか、特定のIColumn実装でデータの内部メモリレイアウトに関する知識を利用して特化した方法で実装することができます。これは、特定のIColumn型へのキャスト関数を使用して内部表現を直接扱うことで実現されます。例えば、ColumnUInt64には、内部配列への参照を返すgetDataメソッドがあり、その後、別のルーチンがその配列を直接読み取ったり埋めたりします。「漏れた抽象」の存在により、さまざまなルーチンの効率的な特化が可能になります。

データ型

IDataTypeは、シリアル化とデシリアル化を担当します:バイナリまたはテキスト形式でカラムのチャンクや個々の値を読み書きします。IDataTypeは、テーブル内のデータ型に直接対応します。例えば、DataTypeUInt32DataTypeDateTimeDataTypeStringなどがあります。

IDataTypeIColumnは互いに緩やかに関連しています。異なるデータ型は、同じIColumn実装でメモリ内に表現されることがあります。例えば、DataTypeUInt32DataTypeDateTimeはどちらもColumnUInt32またはColumnConstUInt32によって表されます。さらに、同じデータ型は異なるIColumn実装によって表されることもあります。たとえば、DataTypeUInt8ColumnUInt8またはColumnConstUInt8で表すことができます。

IDataTypeはメタデータのみを保存します。例えば、DataTypeUInt8は何も保存せず(仮想ポインタvptrを除く)、DataTypeFixedStringN(固定サイズの文字列のサイズ)だけを保存します。

IDataTypeには、さまざまなデータ形式用のヘルパーメソッドがあります。例としては、値を可能な引用符を付けてシリアル化するメソッド、JSON用に値をシリアル化するメソッド、XML形式の一部として値をシリアル化するメソッドがあります。データ形式との直接的な対応はありません。例えば、異なるデータ形式PrettyTabSeparatedは、IDataTypeインターフェースのserializeTextEscapedヘルパーメソッドを共用することができます。

ブロック

Blockは、メモリ内のテーブルのサブセット(チャンク)を表すコンテナです。これは、(IColumn, IDataType, カラム名)の三組からなるセットです。クエリの実行中、データはBlockによって処理されます。Blockがあれば、データ(IColumnオブジェクトに格納されています)、そのタイプについての情報(IDataTypeに格納され、どのようにそのカラムを扱うかを示します)、そしてカラム名があります。これは、元のテーブルのカラム名のままであることもあれば、計算結果の一時的な結果を取得するために割り当てられた人工的な名前であることもあります。

ブロック内のカラムに対して関数を計算する場合、結果をブロックに追加するための別のカラムを追加し、関数の引数に対するカラムは触れません。なぜなら、操作は不変だからです。後で不要になったカラムはブロックから削除できますが、修正はできません。これは、共通の部分式を排除するのに便利です。

データの処理ごとにブロックが作成されます。同じ計算のために、カラム名とタイプは異なるブロックで同じままで、カラムデータだけが変更されます。ブロックデータとブロックヘッダーを分離する方が良いです。なぜなら、小さなブロックサイズは一時的な文字列のコピーに対して高いオーバーヘッドを引き起こすためです(shared_ptrやカラム名のため)。

プロセッサー

詳細はhttps://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/IProcessor.hを参照してください。

フォーマット

データフォーマットはプロセッサーで実装されています。

I/O

バイト指向の入出力のために、ReadBufferWriteBufferの抽象クラスがあります。これらはC++のiostreamの代わりに使用されます。心配しないでください。成熟したC++プロジェクトは、人道的な理由からiostreamの他の方法を使用しています。

ReadBufferWriteBufferは、単に連続したバッファと、そのバッファ内の位置を指すカーソルです。実装は、バッファ用のメモリを所有する場合と所有しない場合があります。バッファに以下のデータを埋め込むための仮想メソッドがあります(ReadBufferの場合)またはどこかにバッファをフラッシュするためのメソッドがあります(WriteBufferの場合)。仮想メソッドは滅多に呼び出されません。

ReadBuffer/WriteBufferの実装は、ファイルやファイルディスクリプタ、ネットワークソケットとの作業、圧縮の実装(CompressedWriteBufferは他のWriteBufferで初期化され、データを書き込む前に圧縮を実行します)などの目的で使用されます。名前ConcatReadBufferLimitReadBuffer、およびHashingWriteBufferは便宜上決まっています。

Read/WriteBuffersはバイトのみを扱います。入力/出力をフォーマットするのに役立つ関数は、ReadHelpersおよびWriteHelpersヘッダファイルにあります。例えば、数字を10進形式で書き込むためのヘルパーがあります。

JSON形式で結果セットをstdoutに書き込もうとすると、何が起こるかを見てみましょう。 結果セットは、プル型のQueryPipelineから取得する準備が整っています。 まず、バイトをstdoutに書き込むためにWriteBufferFromFileDescriptor(STDOUT_FILENO)を作成します。 次に、クエリパイプラインからの結果を、JSONRowOutputFormatに接続します。これは、そのWriteBufferで初期化され、行をJSON形式でstdoutに書き込むためのものです。 これは、completeメソッドを介して行うことができ、これによりプル型のQueryPipelineは完了したQueryPipelineになります。 内部的に、JSONRowOutputFormatはさまざまなJSON区切り文字を出力し、IDataType::serializeTextJSONメソッドを呼び出します。このとき、IColumnへの参照と行番号を引数として渡します。結果として、IDataType::serializeTextJSONは、WriteHelpers.hからのメソッドを呼び出します。例えば、数値型にはwriteTextDataTypeStringにはwriteJSONStringが使用されます。

テーブル

IStorageインターフェースはテーブルを表します。このインターフェースの異なる実装は異なるテーブルエンジンです。例としてStorageMergeTreeStorageMemoryなどがあります。これらのクラスのインスタンスは、単なるテーブルです。

IStorageの主要なメソッドはreadwriteであり、alterrenamedropなど他のメソッドもあります。readメソッドは、次の引数を受け取ります:テーブルから読み取るカラムのセット、考慮すべきASTクエリ、および希望するストリームの数。Pipeを返します。

ほとんどの場合、readメソッドは指定されたカラムをテーブルから読み取ることだけを担当し、さらなるデータ処理は行いません。 すべての後続のデータ処理は、IStorageの責任外のパイプラインの別の部分によって処理されます。

ただし、注目すべき例外があります:

  • ASTクエリがreadメソッドに渡され、テーブルエンジンはそれを使用してインデックス使用を導出し、テーブルから少ないデータを読み込むことができます。
  • ときどき、テーブルエンジンはデータを特定の段階まで自ら処理することもあります。たとえば、StorageDistributedはリモートサーバーにクエリを送信し、異なるリモートサーバーからのデータをマージできる段階まで処理を依頼し、その前処理されたデータを返すことができます。クエリインタプリタはその後データ処理を完了します。

テーブルのreadメソッドは、複数のProcessorsから成るPipeを返すことができます。これらのProcessorsはテーブルから並行して読み取ることができます。 次に、これらのプロセッサーを様々な他の変換(式評価やフィルタリングなど)と接続できます。これらは独立して計算できます。 その後、これらの上にQueryPipelineを作成し、PipelineExecutorを介して実行します。

また、TableFunctionもあります。これらは、クエリのFROM句で使用するための一時的なIStorageオブジェクトを返す関数です。

テーブルエンジンを実装する方法をすばやく理解するには、StorageMemoryStorageTinyLogのようなシンプルなものを参照してください。

readメソッドの結果として、IStorageQueryProcessingStageを返します。これは、ストレージ内で既に計算されたクエリの部分に関する情報です。

パーサー

手書きの再帰的降下パーサーがクエリを解析します。例えば、ParserSelectQueryは、クエリのさまざまな部分の基本的なパーサーを再帰的に呼び出します。パーサーはASTを生成します。ASTIASTのインスタンスであるノードで構成されています。

パーサージェネレーターは歴史的な理由で使用されていません。

インタプリター

インタプリターは、ASTからクエリ実行パイプラインを作成する責任があります。InterpreterExistsQueryInterpreterDropQueryのようなシンプルなインタプリターと、より高度なInterpreterSelectQueryがあります。

クエリ実行パイプラインは、チャンク(特定タイプのカラムのセット)を消費および生成できるプロセッサーの組み合わせです。 プロセッサーはポートを介して通信し、複数の入力ポートと複数の出力ポートを持つことができます。 詳細な説明はsrc/Processors/IProcessor.hにあります。

例えば、SELECTクエリを解釈した結果は、「プル型」のQueryPipelineであり、結果セットを読み取る特別な出力ポートを持っています。 INSERTクエリの結果は、「プッシュ型」のQueryPipelineであり、挿入データを書くための入力ポートを持っています。 INSERT SELECTクエリの解釈結果は、入力や出力を持たず、SELECTからINSERTへ同時にデータをコピーする「完了した」QueryPipelineです。

InterpreterSelectQueryは、クエリの分析や変換のためにExpressionAnalyzerおよびExpressionActionsメカニズムを使用します。ここで、ほとんどのルールベースのクエリ最適化が行われます。ExpressionAnalyzerは非常に複雑であり、書き直す必要があります。さまざまなクエリの変換や最適化は、モジュラーな変換を許可するために別個のクラスに抽出されるべきです。

インタプリターにおける問題に対処するために、新しいInterpreterSelectQueryAnalyzerが開発されました。これは、新しいInterpreterSelectQueryのバージョンで、ExpressionAnalyzerを使用せず、ASTQueryPipelineとの間にQueryTreeという追加の抽象層を導入します。これは、実際に生産環境で使用できる準備が整っていますが、万が一のために、enable_analyzer設定の値をfalseに設定することでオフにできます。

関数

普通の関数と集約関数があります。集約関数については次のセクションを参照してください。

普通の関数は行数を変更せず、各行を独立して処理しているかのように動作します。実際、関数は個々の行ではなく、データのBlockに対して呼び出され、ベクトル化クエリ実行を実現します。

[blockSize](/sql-reference/functions/other-functions#blockSize)[rowNumberInBlock](/sql-reference/functions/other-functions#rowNumberInBlock)、および[runningAccumulate](/sql-reference/functions/other-functions#runningaccumulate)といった付随的な関数もあり、これらはブロック処理を利用し、行の独立性を破っています。

ClickHouseは強い型付けを持っているため、暗黙的な型変換はありません。関数が特定の型の組み合わせをサポートしていない場合、例外をスローします。しかし、関数は多くの異なる型の組み合わせに対して機能します(オーバーロード可能です)。たとえば、plus関数(+演算子を実装するため) は任意の数値型の組み合わせで機能します:UInt8 + Float32UInt16 + Int8など。また、一部の可変引数関数は、任意の数の引数を受け取ることができます。たとえば、concat関数です。

関数を実装することは、サポートされるデータ型やサポートされるIColumnsを明示的にディスパッチするため、やや不便な場合があります。たとえば、plus関数には、数値型の各組み合わせや左および右の引数が定数か非定数かで、C++テンプレートの展開によって生成されたコードがあります。

ランタイムコード生成を実装する良い場所です。これにより、テンプレートコードの膨張を回避でき、融合関数(融合乗算-加算など)や、1回のループイテレーションで複数の比較を行うことが可能になります。

ベクトル化クエリ実行のため、関数はショートサーキットされません。たとえば、WHERE f(x) AND g(y)と書いた場合、f(x)がゼロである行であっても、両方の側が計算されます(f(x)がゼロの定数式でない限り)。しかし、f(x)条件の選択性が高く、f(x)の計算がg(y)よりもはるかに安価であれば、マルチパス計算を実施する方が良いでしょう。最初にf(x)を計算し、結果でカラムをフィルタリングし、次に小さなフィルタリングされたデータチャンクのためにのみg(y)を計算します。

集約関数

集約関数は状態を持つ関数です。これらは渡された値をどこかの状態に累積し、その状態から結果を取得できるようにします。これらはIAggregateFunctionインターフェースによって管理されます。状態は非常にシンプルな場合もあれば(AggregateFunctionCountの状態は単なる1つのUInt64値です)、かなり複雑な場合もあります(AggregateFunctionUniqCombinedの状態は、線形配列、ハッシュテーブル、およびHyperLogLog確率的データ構造の組み合わせです)。

状態は、ハイカードinalityのGROUP BYクエリを実行する際に複数の状態を扱うためにArena(メモリプール)に割り当てられます。状態は、何らかの注意が必要なコンストラクタとデストラクタを持つことがあります。複雑な集約状態は、追加のメモリを自分自身で割り当てることができます。状態の作成および破棄と、それらの所有権と破棄順序の適切な引き渡しに注目する必要があります。

集約状態は、分散クエリ実行中にネットワークを越えて渡すためや、十分なRAMがないディスクに書き込むためにシリアル化およびデシリアル化できます。集約関数のデータ型であるDataTypeAggregateFunctionにエクスポートして、データの増分集計を可能にすることさえできます。

集約関数の状態に対するシリアル化データ形式は、現在はバージョン管理されていません。集約状態が一時的に保持されている場合は問題ありませんが、我々は増分集計用のAggregatingMergeTreeテーブルエンジンを持ち、既に生産環境で使用されています。そのため、将来的に集約関数のシリアル化形式を変更する際には後方互換性が求められます。

サーバー

サーバは、いくつかの異なるインターフェースを実装しています:

  • 外部クライアント用のHTTPインターフェース。
  • ネイティブClickHouseクライアントおよび分散クエリ実行中のサーバー間通信のためのTCPインターフェース。
  • レプリケーションのためのデータ転送インターフェース。

内部的には、コルーチンやファイバーのない単純なマルチスレッドサーバーです。サーバーは高いレートの単純なクエリを処理するようには設計されておらず、比較的低いレートの複雑なクエリを処理するように設計されており、それぞれが分析のために大量のデータを処理できます。

サーバーはクエリ実行に必要な環境を備えたContextクラスを初期化します:使用可能なデータベースのリスト、ユーザーとアクセス権、設定、クラスター、プロセスリスト、クエリログなど。インタプリターはこの環境を使用します。

サーバーTCPプロトコルに対して完全な後方および前方の互換性を維持しています:古いクライアントは新しいサーバーと対話でき、新しいクライアントは古いサーバーと対話できます。しかし、我々は永遠にこれを維持したくなく、約1年後には古いバージョンのサポートを削除します。

注記

ほとんどの外部アプリケーションには、HTTPインターフェースを使用することをお勧めします。これはシンプルで使いやすいためです。TCPプロトコルは内部データ構造に密接に結びついています:データのブロックを渡すための内部形式を使用し、圧縮データのためのカスタムフレーミングを使用します。Cライブラリをそのプロトコルのためにリリースしていないのは、ClickHouseのコードベースの大部分にリンクする必要があり、実用的でないからです。

設定

ClickHouse ServerはPOCO C++ Librariesに基づき、Poco::Util::AbstractConfigurationを使用してその設定を表現します。設定は、DaemonBaseクラスから派生したPoco::Util::ServerApplicationクラスによって保持されます。このクラスはDB::Serverクラスを実装し、clickhouse-serverそのものを実現します。したがって、設定はServerApplication::config()メソッドを使用してアクセスできます。

設定は複数のファイル(XMLまたはYAML形式)から読み取られ、ConfigProcessorクラスによって単一のAbstractConfigurationにマージされます。設定はサーバーの起動時に読み込まれ、設定ファイルのいずれかが更新されたり削除されたり追加されたりした場合に再読み込みされることがあります。ConfigReloaderクラスは、これらの変更を定期的に監視し、再読み込み手順も担当します。また、SYSTEM RELOAD CONFIGクエリも設定を再読み込みさせます。

クエリやServer以外のサブシステムの設定は、Context::getConfigRef()メソッドを使用してアクセスできます。サーバーの再起動なしに設定を再読み込みできるすべてのサブシステムは、Server::main()メソッド内で再読み込みコールバックに自身を登録する必要があります。新しい設定にエラーがある場合、ほとんどのサブシステムは新しい設定を無視し、警告メッセージをログに記録し、以前に読み込まれた設定で動作し続けます。AbstractConfigurationの性質上、特定のセクションへの参照を渡すことはできないため、通常はString config_prefixが代わりに使用されます。

スレッドとジョブ

クエリを実行し、副次的な活動を行うために、ClickHouseはスレッドプールの1つからスレッドを割り当て、頻繁なスレッドの作成と破棄を避けます。目的やジョブの構造に応じて、いくつかのスレッドプールがあります:

  • クライアントセッション用のサーバープール。
  • 一般的なジョブ、バックグラウンド活動、スタンドアロンスレッドのためのグローバルスレッドプール。
  • 主にIOでブロックされ、CPU集約的でないジョブのためのIOスレッドプール。
  • 定期的なタスクのためのバックグラウンドプール。
  • ステップに分割できる先読み可能なタスクのためのプール。

サーバープールは、Server::main()メソッド内で定義されたPoco::ThreadPoolクラスのインスタンスです。このプールは最大max_connectionスレッドを持つことができます。各スレッドは単一のアクティブ接続に専念します。

グローバルスレッドプールはGlobalThreadPoolシングルトンクラスです。これからスレッドを割り当てるためにThreadFromGlobalPoolが使用されます。このクラスはstd::threadに似たインターフェースを持ちますが、グローバルプールからスレッドを引き出し、すべての必要な初期化を行います。これは次の設定で構成されています:

  • max_thread_pool_size - プール内のスレッド数の制限。
  • max_thread_pool_free_size - 新しいジョブを待っているアイドルスレッドの制限。
  • thread_pool_queue_size - 予約されたジョブ数の制限。

グローバルプールはユニバーサルであり、以下で説明するすべてのプールはこれを基に実装されています。これはプールの階層と考えることができます。すべての専門プールはThreadPoolクラスを使用してグローバルプールからスレッドを取得します。したがって、すべての専門プールの主な目的は、同時ジョブの数に制限を適用し、ジョブのスケジューリングを行うことです。プール内のスレッド数がジョブ数よりも少ない場合、ThreadPoolは優先順位付きのキューにジョブを蓄積します。各ジョブには整数の優先順位があり、デフォルトの優先順位はゼロです。優先順位値が高いすべてのジョブは、低い優先順位のジョブが実行される前に開始されます。しかし、すでに実行中のジョブには違いがないため、優先順位はプールが過負荷になっているときのみ重要です。

IOスレッドプールは、IOThreadPool::get()メソッドを介してアクセス可能な単純なThreadPoolとして実装されています。これは、グローバルプールと同様に、max_io_thread_pool_sizemax_io_thread_pool_free_size、およびio_thread_pool_queue_size設定で構成されます。IOスレッドプールの主な目的は、IOジョブによってグローバルプールが枯渇することを避け、クエリがCPUを完全に活用できるようにすることです。S3へのバックアップはかなりの量のIO操作を行い、対話型クエリに影響を与えないようにするため、max_backups_io_thread_pool_sizemax_backups_io_thread_pool_free_sizebackups_io_thread_pool_queue_size設定で構成された別のBackupsIOThreadPoolがあります。

定期的なタスク実行には、BackgroundSchedulePoolクラスがあります。BackgroundSchedulePool::TaskHolderオブジェクトを使用してタスクを登録でき、このプールは同時に二つのジョブを実行しないことを保証します。また、特定の未来の瞬間にタスクの実行を延期したり、一時的にタスクを無効にしたりすることもあります。グローバルContextは、このクラスの異なる目的のためにいくつかのインスタンスを提供します。一般的な目的のタスクにはContext::getSchedulePool()が使用されます。

前読み可能なタスクのための専門パラメータプールもあります。IExecutableTaskタスクは、ジョブの順序付けられたステップのシーケンスに分割できます。短いタスクが長いタスクよりも優先されるようにこれらのタスクをスケジューリングするには、MergeTreeBackgroundExecutorが使用されます。その名の通り、これはマージや変更、取得、移動といったバックグラウンドのMergeTree関連操作のために使用されます。プールインスタンスは、Context::getCommonExecutor()やその他の類似のメソッドを用いてアクセスできます。

ジョブに使用されるプールが何であれ、開始時にそのジョブのThreadStatusインスタンスが作成されます。これは、スレッドごとの情報をカプセル化します:スレッドID、クエリID、パフォーマンスカウンター、リソース消費、その他の便利なデータなど。ジョブはCurrentThread::get()コールによって、スレッドローカルポインタを介してこれをアクセスできますので、すべての関数に渡す必要はありません。

もしスレッドがクエリ実行に関連している場合、ThreadStatusに添付される最も重要なものはクエリコンテキストContextPtrです。各クエリにはサーバープール内にマスタースレッドがあります。マスタースレッドは、ThreadStatus::QueryScope query_scope(query_context)オブジェクトを保持してアタッチします。マスタースレッドはまた、ThreadGroupStatusオブジェクトで表されるスレッドグループを作成します。このクエリ実行中に割り当てられたすべての追加スレッドは、CurrentThread::attachTo(thread_group)コールによって、それのスレッドグループに接続されます。スレッドグループは、単一のタスクに割り当てられたすべてのスレッドによるメモリ消費を追跡し、プロファイルイベントカウンターを集約するために使用されます(詳細についてはMemoryTrackerおよびProfileEvents::Countersクラスを参照してください)。

同時実行制御

並行化できるクエリは、max_threads設定を使用して自らを制限します。この設定のデフォルト値は、単一のクエリが最良の方法で全てのCPUコアを利用できるよう選択されます。しかし、もし複数の同時クエリがあり、それぞれがデフォルトのmax_threads設定値を使用した場合はどうでしょうか?その場合、クエリはCPUリソースを共有します。OSはスレッドを常に切り替えて公平性を確保しますが、これにはある程度のパフォーマンスペナルティが伴います。ConcurrencyControlは、これらのペナルティに対処し、多くのスレッドを割り当てるのを避けるのに役立ちます。設定concurrent_threads_soft_limit_numは、CPUの圧力がかかる前に同時に割り当てられるスレッド数を制限するために使用されます。

CPUのスロットという概念が導入されます。スロットは同時実行の単位です:スレッドが実行されるには、事前にスロットを取得し、スレッドが停止するときにそれを解放する必要があります。スロットの数は、サーバ内で全体として限られています。複数の同時クエリがあり、要求の合計がスロットの総数を超える場合、ConcurrencyControlは公正にCPUスロットスケジューリングを行う責任を担います。

各スロットは、次の状態を持つ独立した状態機械として見なすことができます:

  • free:スロットは任意のクエリに割り当てることができます。
  • granted:スロットは特定のクエリによってallocatedされていますが、まだいかなるスレッドにも取得されていません。
  • acquired:スロットは特定のクエリによってallocatedされ、スレッドによって取得されています。

注意すべきことは、allocatedスロットが2つの異なる状態、grantedacquiredにあることです。前者は、実質的に短いはずの遷移状態です(スロットがクエリに割り当てられてから、スロットがそのクエリの任意のスレッドによってアップスケーリング手続きが行われるまで)。

ConcurrencyControlのAPIは、次の関数から構成されています:

  1. クエリのためのリソース割当てを作成します:auto slots = ConcurrencyControl::instance().allocate(1, max_threads);これは、最初のスロットが即時に許可されますが、残りのスロットは後で許可されるかもしれませんので、1つ以上のスロットが割り当てられます。つまり、制限はソフトであり、すべてのクエリは少なくとも1つのスレッドを取得します。
  2. 各スレッドのために、割当てからスロットを取得する必要があります:while (auto slot = slots->tryAcquire()) spawnThread([slot = std::move(slot)] { ... });
  3. スロットの総数を更新します:ConcurrencyControl::setMaxConcurrency(concurrent_threads_soft_limit_num)。サーバーを再起動せずに実行中に行えます。

このAPIにより、クエリは少なくとも1つのスレッドから始め、後でmax_threadsまでスケールアップすることができます。

Distributed Query Execution

クラスター設定内のサーバーはほとんど独立しています。クラスター内の1つまたはすべてのサーバーに Distributed テーブルを作成できます。 Distributed テーブルはデータ自体を保存せず、クラスター内の複数のノードにあるすべてのローカルテーブルへの「ビュー」を提供するだけです。 Distributed テーブルからSELECTすると、そのクエリは書き換えられ、負荷分散設定に従ってリモートノードを選択し、クエリが送信されます。 Distributed テーブルは、リモートサーバーにクエリを処理するよう要求し、異なるサーバーからの中間結果をマージできる段階まで処理を行います。その後、中間結果を受け取り、それらをマージします。分散テーブルは、可能な限り多くの作業をリモートサーバーに分散し、ネットワーク上で多くの中間データを送信しません。

IN や JOIN 句にサブクエリがある場合、そしてそれぞれが Distributed テーブルを使用する場合、事態はより複雑になります。これらのクエリの実行には異なる戦略があります。

分散クエリ実行のためのグローバルクエリプランはありません。各ノードは、ジョブの一部に対するローカルクエリプランを持っています。単純な1パスの分散クエリ実行のみが存在します:リモートノードにクエリを送信し、その後結果をマージします。しかし、これは高カーディナリティの GROUP BY や大きな一時データ量を伴う複雑なクエリには適していません。そのような場合、サーバー間でデータを「再シャッフル」する必要があり、追加の調整が必要です。ClickHouseはそのようなクエリ実行をサポートしておらず、改善する必要があります。

Merge Tree

MergeTree は、主キーによるインデックスをサポートするストレージエンジンのファミリーです。主キーは任意のカラムまたは式のタプルになることができます。 MergeTree テーブル内のデータは「パーツ」に保存されます。各パーツは主キー順にデータを保存するため、データは主キーのタプルによって辞書順に並べられます。すべてのテーブルカラムは、これらのパーツ内の別々の column.bin ファイルに保存されます。ファイルは圧縮ブロックで構成され、各ブロックは通常、平均値のサイズに応じて64KBから1MBの未圧縮データを含みます。ブロックは、カラム値が順に配置されているものです。カラム値は各カラムで同じ順序になっているため(主キーが順序を定義)、複数のカラムを反復処理する際には、対応する行に対する値を取得できます。

主キー自体は「スパース」です。それはすべての行を指し示すのではなく、特定のデータ範囲のみに対応します。別の primary.idx ファイルには、N番目の行の主キーの値があります。ここでNは index_granularity(通常、N = 8192)と呼ばれます。また、各カラムには、データファイル内のN番目の行に対するオフセットである「マーク」を持つ column.mrk ファイルがあります。各マークは、圧縮ブロックの開始位置へのオフセットと、データの開始位置へのオフセットのペアです。通常、圧縮ブロックはマークに整列され、解凍されたブロックのオフセットはゼロです。 primary.idx のデータは常にメモリに常駐しており、 column.mrk ファイルのデータはキャッシュされます。

MergeTree 内のパートから何かを読み取るつもりのときは、 primary.idx データを確認し、要求されたデータを含む可能性のある範囲を特定し、その後 column.mrk データを確認して、これらの範囲を読み始めるためのオフセットを計算します。スパースなため、余分なデータが読み取られる場合があります。ClickHouseは単純なポイントクエリの高負荷には適していません。各キーに対して index_granularity 行が含まれる全範囲を読み取る必要があり、各カラムに対して全圧縮ブロックを解凍する必要があります。インデックスをスパースにしたのは、単一サーバーで兆単位の行を目立ったメモリ消費なしに保持できなければならなかったからです。また、主キーがスパースであるため、ユニークではなく、INSERT時にテーブル内のキーの存在を確認できません。テーブル内に同じキーを持つ行が多数存在する可能性があります。

MergeTree にデータを INSERT すると、そのデータの集まりは主キー順に整列され、新しいパートを形成します。バックグラウンドスレッドは定期的にいくつかのパーツを選択し、単一のソートされたパートにマージして、パーツの数を比較的低く保ちます。これが MergeTree と呼ばれる理由です。もちろん、マージは「書き込み増幅」を引き起こします。すべてのパーツは不変です。作成および削除されるだけで、修正はされません。SELECTが実行されると、テーブルのスナップショット(一連のパーツ)を保持します。マージ後、障害発生時に回復を容易にするために、古いパーツも一時的に保持しますので、マージされたパートが壊れていると思われる場合は、それを元のパーツと置き換えることができます。

MergeTreeは LSM ツリーではありません。なぜなら、MEMTABLE や LOG を含まないからです:挿入されたデータはファイルシステムに直接書き込まれます。この振る舞いにより、MergeTree はバッチでのデータ挿入により適しています。したがって、少量の行を頻繁に挿入することは、MergeTree にとって理想的ではありません。たとえば、1秒あたり数行は問題ありませんが、1秒あたり千回行うことは MergeTree にとって最適ではありません。ただし、小さな挿入のための非同期挿入モードがあります。この制限を克服するためにこのようにしました。なぜなら、私たちのアプリケーションで既にバッチでデータを挿入しているからです。

バックグラウンドマージ中に追加の作業を行っている MergeTree エンジンがいくつかあります。例として、CollapsingMergeTree および AggregatingMergeTree があります。これは、更新の特別なサポートとして扱うことができます。これらは実際の更新ではないことを心に留めておいてください。ユーザーは通常、バックグラウンドマージが実行される時間を制御できず、 MergeTree テーブル内のデータはほとんど常に単一のパートではなく、複数のパートに保存されます。

Replication

ClickHouse のレプリケーションは、テーブル単位で構成できます。同じサーバー上に一部はレプリケートされたテーブルと一部はレプリケートされていないテーブルを持つことができます。また、1つのテーブルが二重レプリケーションされている一方で、別のテーブルは三重レプリケーションされている場合もあります。

レプリケーションは、ReplicatedMergeTree ストレージエンジンで実装されています。ストレージエンジンのパラメータとして ZooKeeper でのパスが指定されます。同じパスを持つすべてのテーブルは、互いのレプリカになります。これにより、データは同期され、一貫性が保たれます。レプリカは、テーブルを作成または削除することで動的に追加および削除できます。

レプリケーションは非同期のマルチマスター方式を使用しています。 ZooKeeper とセッションを持つ任意のレプリカにデータを挿入でき、そのデータは他のすべてのレプリカに非同期に複製されます。ClickHouse は UPDATE をサポートしていないため、レプリケーションは競合がありません。デフォルトでは挿入の過半数の承認はありませんので、一つのノードが故障した場合には直前に挿入されたデータが失われる可能性があります。 insert_quorum 設定を使って挿入の過半数を有効にできます。

レプリケーションのメタデータは ZooKeeper に保存されます。アクションは、パートを取得すること、パーツをマージすること、パーティションを削除することなど、実行するアクションのリストを示すレプリケーションログがあります。各レプリカは、そのキューにレプリケーションログをコピーし、キューからアクションを実行します。たとえば、挿入時には、「パートを取得」のアクションがログに作成され、すべてのレプリカがそのパートをダウンロードします。マージは、バイト単位で同一の結果を得るために、レプリカ間で調整されます。すべての部分は、すべてのレプリカで同じ方法でマージされます。リーダーの1人が最初に新しいマージを開始し、「マージパーツ」アクションをログに書き込みます。複数のレプリカ(またはすべて)が同時にリーダーになることができます。レプリカがリーダーにならないように制限するには、merge_tree 設定の replicated_can_become_leader を使用します。リーダーはバックグラウンドマージのスケジューリングを担当します。

レプリケーションは物理的です:ノード間で転送されるのは圧縮パーツのみで、クエリではありません。ほとんどのケースでは、マージは各レプリカで独立して処理され、ネットワークコストを削減してネットワーク増幅を回避します。大きなマージパーツは、重要なレプリケーション遅延がある場合にのみ、ネットワーク経由で送信されます。

さらに、各レプリカは、パーツのセットとそのチェックサムとして自分の状態を ZooKeeper に保存します。ローカルファイルシステムの状態が ZooKeeper の参照状態から外れた場合、レプリカは他のレプリカから不足しているパーツや壊れたパーツをダウンロードして一貫性を回復します。ローカルファイルシステムに予期しないデータや壊れたデータがある場合、ClickHouse はそれを削除せず、別のディレクトリに移動して忘れます。

注記

ClickHouse クラスターは独立したシャードで構成されており、各シャードはレプリカで構成されています。クラスターは エラスティックではない ため、新しいシャードを追加した後、データは自動的にシャード間で再バランスされません。その代わり、クラスターの負荷は均一でないように調整されることが想定されています。この実装は、より多くの制御を提供し、比較的小さなクラスター(数十ノード)には適しています。しかし、我々が生産で使用している数百ノードのクラスターでは、このアプローチは重要な欠点となります。クラスター全体に広がるテーブルエンジンを実装し、自動的に分割およびバランスが取れる動的にレプリケートされた領域を持つ必要があります。