Перейти к основному содержанию
Перейти к основному содержанию

Движок распределённой таблицы

Распределённый движок в Cloud

Чтобы создать движок распределённой таблицы в ClickHouse Cloud, можно использовать табличные функции remote и remoteSecure. Синтаксис Distributed(...) в ClickHouse Cloud использовать нельзя.

Таблицы с движком Distributed не хранят собственные данные, но позволяют выполнять распределённую обработку запросов на нескольких серверах. Чтение автоматически распараллеливается. Во время чтения используются индексы таблиц на удалённых серверах, если они есть.

Создание таблицы

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]

Из таблицы

Когда таблица Distributed указывает на таблицу на текущем сервере, вы можете заимствовать её схему:

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]

Параметры движка Distributed

ПараметрОписание
clusterИмя кластера в конфигурационном файле сервера
databaseИмя удалённой базы данных
tableИмя удалённой таблицы
sharding_key (необязательно)Ключ шардинга.
Указание sharding_key необходимо в следующих случаях:
  • Для операций INSERT в распределённую таблицу (так как движку таблицы нужен sharding_key, чтобы определить, как распределить данные по шардам). Однако, если включена настройка insert_distributed_one_random_shard, то для INSERT ключ шардинга не требуется.
  • Для использования с optimize_skip_unused_shards, так как sharding_key необходим для определения, какие шарды должны запрашиваться
policy_name (необязательно)Имя политики, которое будет использоваться для хранения временных файлов при фоновой отправке

См. также

Настройки движка Distributed

ПараметрОписаниеЗначение по умолчанию
fsync_after_insertВыполнять fsync для данных файла после фоновой вставки в Distributed. Гарантирует, что ОС сбросила все вставленные данные в файл на диске инициирующего узла.false
fsync_directoriesВыполнять fsync для каталогов. Гарантирует, что ОС обновила метаданные каталога после операций, связанных с фоновыми вставками в таблицу Distributed (после вставки, после отправки данных на шард и т. д.).false
skip_unavailable_shardsЕсли true, ClickHouse молча пропускает недоступные шарды. Шард помечается как недоступный, когда: 1) к нему нет доступа из‑за ошибки соединения; 2) шард не может быть разрешён через DNS; 3) таблица не существует на шарде.false
bytes_to_throw_insertЕсли количество сжатых байт, ожидающих фоновой операции INSERT, превысит это значение, будет выброшено исключение. 0 — не выбрасывать.0
bytes_to_delay_insertЕсли количество сжатых байт, ожидающих фоновой операции INSERT, превысит это значение, выполнение запроса будет задержано. 0 — не задерживать.0
max_delay_to_insertМаксимальная задержка вставки данных в таблицу Distributed в секундах, если имеется много данных (байт), ожидающих фоновой отправки.60
background_insert_batchТо же, что и distributed_background_insert_batch.0
background_insert_split_batch_on_failureТо же, что и distributed_background_insert_split_batch_on_failure.0
background_insert_sleep_time_msТо же, что и distributed_background_insert_sleep_time_ms.0
background_insert_max_sleep_time_msТо же, что и distributed_background_insert_max_sleep_time_ms.0
flush_on_detachСбрасывать данные на удалённые узлы при DETACH/DROP/остановке сервера.true
Примечание

Параметры надёжности (fsync_...):

  • Влияют только на фоновые INSERT (то есть при distributed_foreground_insert=false), когда данные сначала сохраняются на диск инициирующего узла, а затем в фоне отправляются на шарды.
  • Могут существенно снизить производительность INSERT.
  • Влияют на запись данных, хранящихся внутри каталога распределённой таблицы, на узел, который принял вашу вставку. Если вам нужны гарантии записи данных в базовые таблицы MergeTree, см. параметры надёжности (...fsync...) в system.merge_tree_settings.

Для параметров ограничений вставки (..._insert) см. также:

  • параметр distributed_foreground_insert
  • параметр prefer_localhost_replica
  • bytes_to_throw_insert обрабатывается раньше, чем bytes_to_delay_insert, поэтому его не следует устанавливать в значение меньше значения bytes_to_delay_insert.

Пример

CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;

Данные будут читаться со всех серверов в кластере logs из таблицы default.hits, расположенной на каждом сервере кластера. Данные не только читаются, но и частично обрабатываются на удалённых серверах (насколько это возможно). Например, для запроса с GROUP BY данные будут агрегироваться на удалённых серверах, а промежуточные состояния агрегатных функций будут отправлены на сервер, выполняющий запрос. Затем данные будут дополнительно агрегированы.

Вместо имени базы данных вы можете использовать константное выражение, которое возвращает строку. Например: currentDatabase().

Кластеры

Кластеры настраиваются в конфигурационном файле сервера:

<remote_servers>
    <logs>
        <!-- Межсерверный секрет для распределённых запросов на уровне кластера
             по умолчанию: секрет не задан (аутентификация не выполняется)

             Если задан, распределённые запросы будут проверяться на шардах, поэтому необходимо как минимум:
             - чтобы такой кластер существовал на шарде,
             - чтобы у такого кластера был тот же секрет.

             Кроме того (что более важно), initial_user будет
             использоваться в качестве текущего пользователя для запроса.
        -->
        <!-- <secret></secret> -->
        
        <!-- Необязательно. Разрешены ли распределённые DDL-запросы (конструкция ON CLUSTER) для этого кластера. По умолчанию: true (разрешены). -->        
        <!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->
        
        <shard>
            <!-- Необязательно. Вес шарда при записи данных. По умолчанию: 1. -->
            <weight>1</weight>
            <!-- Необязательно. Имя шарда. Должно быть непустым и уникальным среди шардов кластера. Если не указано, будет пустым. -->
            <name>shard_01</name>
            <!-- Необязательно. Записывать ли данные только на одну из реплик. По умолчанию: false (записывать на все реплики). -->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- Необязательно. Приоритет реплики для балансировки нагрузки (см. также настройку load_balancing). По умолчанию: 1 (меньшее значение означает более высокий приоритет). -->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <name>shard_02</name>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>

Здесь определяется кластер с именем logs, который состоит из двух шардов, каждый из которых содержит по две реплики. Шарды — это серверы, в которых хранятся разные части данных (чтобы прочитать все данные, необходимо обратиться ко всем шардам). Реплики — это серверы-дубликаты (чтобы прочитать все данные, можно обратиться к данным на любой из реплик).

Имена кластеров не должны содержать точек.

Для каждого сервера указываются параметры host, port, а также, при необходимости, user, password, secure, compression, bind_host:

ParameterDescriptionDefault Value
hostАдрес удалённого сервера. Можно использовать либо домен, либо IPv4- или IPv6-адрес. Если вы указываете домен, сервер выполняет DNS-запрос при запуске и сохраняет результат на всё время своей работы. Если DNS-запрос завершился с ошибкой, сервер не запускается. Если вы изменили DNS-запись, перезапустите сервер.-
portTCP-порт для работы сервера (tcp_port в конфигурации, обычно имеет значение 9000). Не путать с http_port.-
userИмя пользователя для подключения к удалённому серверу. У этого пользователя должны быть права на подключение к указанному серверу. Доступ настраивается в файле users.xml. Дополнительные сведения см. в разделе Access rights.default
passwordПароль для подключения к удалённому серверу (не маскируется).''
secureИспользовать ли защищённое SSL/TLS-подключение. Обычно также требуется указать порт (порт по умолчанию для защищённого подключения — 9440). Сервер должен слушать на <tcp_port_secure>9440</tcp_port_secure> и быть настроен с корректными сертификатами.false
compressionИспользовать сжатие данных.true
bind_hostИсходный адрес, используемый при подключении к удалённому серверу с этого узла. Поддерживается только IPv4-адрес. Предназначен для продвинутых сценариев развертывания, когда требуется задать исходный IP-адрес, используемый распределёнными запросами ClickHouse.-

При указании реплик одна из доступных реплик будет выбрана для каждого шарда при чтении. Можно настроить алгоритм балансировки нагрузки (предпочтения при выборе реплики для доступа) — см. настройку load_balancing. Если не удалось установить подключение с сервером, выполняется попытка подключения с коротким таймаутом. Если подключиться не удалось, выбирается следующая реплика и так далее для всех реплик. Если попытка подключения не удалась для всех реплик, попытка будет повторена тем же образом несколько раз. Это повышает устойчивость, но не обеспечивает полной отказоустойчивости: удалённый сервер может принять подключение, но не работать или работать некорректно.

Можно указать только один шард (в этом случае обработку запроса корректнее называть удалённой, а не распределённой) или любое количество шардов. В каждом шарде можно указать от одной до любого количества реплик. Можно задать разное количество реплик для каждого шарда.

В конфигурации можно указать произвольное количество кластеров.

Для просмотра ваших кластеров используйте таблицу system.clusters.

Движок Distributed позволяет работать с кластером как с локальным сервером. Однако конфигурацию кластера нельзя задавать динамически — её необходимо настроить в конфигурационном файле сервера. Обычно все серверы в кластере имеют одинаковую конфигурацию кластера (хотя это и не обязательно). Кластеры из конфигурационного файла обновляются на лету, без перезапуска сервера.

Если при каждом выполнении запроса нужно отправлять его в неизвестный заранее набор шардов и реплик, нет необходимости создавать таблицу Distributed — вместо этого используйте табличную функцию remote. См. раздел Table functions.

Запись данных

Существует два способа записи данных в кластер:

Во-первых, вы можете определить, на какие серверы записывать какие данные, и выполнять запись непосредственно на каждый шард. Другими словами, выполнять прямые запросы INSERT в удалённые таблицы в кластере, на которые указывает таблица Distributed. Это наиболее гибкое решение, поскольку вы можете использовать любую схему шардинга, даже нетривиальную, в силу требований предметной области. Это также наиболее оптимальное решение, так как данные могут записываться в разные шарды полностью независимо.

Во-вторых, вы можете выполнять запросы INSERT в таблицу Distributed. В этом случае таблица будет самостоятельно распределять вставленные данные по серверам. Чтобы выполнять запись в таблицу Distributed, у неё должен быть настроен параметр sharding_key (кроме случая, когда существует только один шард).

Для каждого шарда в конфигурационном файле может быть задан <weight>. По умолчанию вес равен 1. Данные распределяются по шардам в объёме, пропорциональном весу шарда. Все веса шардов суммируются, затем вес каждого шарда делится на эту сумму для определения доли каждого шарда. Например, если есть два шарда и первый имеет вес 1, а второй — вес 2, первому будет отправляться одна треть (1 / 3) вставляемых строк, а второму — две трети (2 / 3).

Для каждого шарда в конфигурационном файле может быть задан параметр internal_replication. Если этот параметр установлен в true, операция записи выбирает первую «здоровую» реплику и записывает данные в неё. Используйте это, если таблицы, лежащие в основе таблицы Distributed, являются реплицируемыми таблицами (например, любые движки таблиц Replicated*MergeTree). Одна из реплик таблицы получит запись, и она будет автоматически реплицирована на остальные реплики.

Если internal_replication установлено в false (значение по умолчанию), данные записываются во все реплики. В этом случае таблица Distributed сама реплицирует данные. Это хуже, чем использование реплицируемых таблиц, потому что согласованность реплик не проверяется, и со временем они будут содержать немного отличающиеся данные.

Для выбора шарда, на который будет отправлена строка данных, анализируется выражение шардинга и берётся остаток от его деления на суммарный вес шардов. Строка отправляется на шард, соответствующий полуинтервалу остатков от prev_weights до prev_weights + weight, где prev_weights — суммарный вес шардов с наименьшими номерами, а weight — вес данного шарда. Например, если есть два шарда, и первый имеет вес 9, а второй — вес 10, строка будет отправлена на первый шард для остатков из диапазона [0, 9), и на второй — для остатков из диапазона [9, 19).

Выражение шардинга может быть любым выражением на основе констант и столбцов таблицы, которое возвращает целое число. Например, вы можете использовать выражение rand() для случайного распределения данных или UserID для распределения по остатку от деления идентификатора пользователя (тогда данные одного пользователя будут находиться на одном шарде, что упрощает выполнение IN и JOIN по пользователям). Если один из столбцов распределяется недостаточно равномерно, вы можете обернуть его в хеш-функцию, например intHash64(UserID).

Простой остаток от деления — ограниченное решение для шардинга и подходит не всегда. Он работает для средних и больших объёмов данных (десятки серверов), но не для очень больших объёмов данных (сотни серверов и более). В последнем случае используйте схему шардинга, требуемую предметной областью, вместо использования таблиц Distributed.

О схеме шардинга следует задуматься в следующих случаях:

  • Используются запросы, которые выполняют объединение данных (IN или JOIN) по определённому ключу. Если данные разбиты на шарды по этому ключу, можно использовать локальные IN или JOIN вместо GLOBAL IN или GLOBAL JOIN, что значительно эффективнее.
  • Используется большое количество серверов (сотни и более) с большим количеством небольших запросов, например запросов по данным отдельных клиентов (например, веб‑сайтов, рекламодателей или партнёров). Чтобы небольшие запросы не влияли на весь кластер, имеет смысл размещать данные одного клиента на одном шарде. В качестве альтернативы можно настроить двухуровневый шардинг: разделить весь кластер на «слои», где слой может состоять из нескольких шардов. Данные одного клиента располагаются в одном слое, но шарды можно добавлять в слой по мере необходимости, а данные распределяются между ними случайным образом. Для каждого слоя создаются таблицы Distributed, а для глобальных запросов создаётся одна общая распределённая таблица.

Данные записываются в фоновом режиме. При вставке в таблицу блок данных просто записывается в локальную файловую систему. Данные отправляются на удалённые серверы в фоновом режиме как можно скорее. Периодичность отправки данных управляется настройками distributed_background_insert_sleep_time_ms и distributed_background_insert_max_sleep_time_ms. Движок Distributed отправляет каждый файл с вставленными данными отдельно, но можно включить пакетную отправку файлов с помощью настройки distributed_background_insert_batch. Эта настройка повышает производительность кластера за счёт более эффективного использования ресурсов локального сервера и сети. Следует проверять, были ли данные успешно отправлены, просматривая список файлов (данных, ожидающих отправки) в каталоге таблицы: /var/lib/clickhouse/data/database/table/. Количество потоков, выполняющих фоновые задачи, можно задать настройкой background_distributed_schedule_pool_size.

Если сервер был аварийно остановлен или пережил жёсткую перезагрузку (например, из‑за сбоя оборудования) после выполнения INSERT в таблицу Distributed, вставленные данные могут быть потеряны. Если в каталоге таблицы обнаруживается повреждённая часть данных, она переносится в подкаталог broken и больше не используется.

Чтение данных

При выполнении запроса к таблице Distributed запросы SELECT отправляются на все шарды и выполняются независимо от того, как распределены данные по шардам (они могут быть распределены полностью случайным образом). При добавлении нового шарда нет необходимости переносить в него старые данные. Вместо этого можно записывать в него новые данные, используя более высокий вес — данные будут распределены немного неравномерно, но запросы по-прежнему будут выполняться корректно и эффективно.

Когда включён параметр max_parallel_replicas, обработка запросов параллелизуется по всем репликам внутри одного шарда. Дополнительную информацию см. в разделе max_parallel_replicas.

Чтобы узнать больше о том, как обрабатываются распределённые запросы in и global in, см. документацию.

Виртуальные столбцы

_shard_num

_shard_num — содержит значение shard_num из таблицы system.clusters. Тип: UInt32.

Примечание

Поскольку табличные функции remote и cluster внутренне создают временную таблицу Distributed, _shard_num доступен и в них.

См. также