Движок распределённой таблицы
Чтобы создать движок распределённой таблицы в ClickHouse Cloud, можно использовать табличные функции remote и remoteSecure.
Синтаксис Distributed(...) в ClickHouse Cloud использовать нельзя.
Таблицы с движком Distributed не хранят собственные данные, но позволяют выполнять распределённую обработку запросов на нескольких серверах. Чтение автоматически распараллеливается. Во время чтения используются индексы таблиц на удалённых серверах, если они есть.
Создание таблицы
Из таблицы
Когда таблица Distributed указывает на таблицу на текущем сервере, вы можете заимствовать её схему:
Параметры движка Distributed
| Параметр | Описание |
|---|---|
cluster | Имя кластера в конфигурационном файле сервера |
database | Имя удалённой базы данных |
table | Имя удалённой таблицы |
sharding_key (необязательно) | Ключ шардинга. Указание sharding_key необходимо в следующих случаях:
|
policy_name (необязательно) | Имя политики, которое будет использоваться для хранения временных файлов при фоновой отправке |
См. также
- настройка distributed_foreground_insert
- MergeTree для примеров
Настройки движка Distributed
| Параметр | Описание | Значение по умолчанию |
|---|---|---|
fsync_after_insert | Выполнять fsync для данных файла после фоновой вставки в Distributed. Гарантирует, что ОС сбросила все вставленные данные в файл на диске инициирующего узла. | false |
fsync_directories | Выполнять fsync для каталогов. Гарантирует, что ОС обновила метаданные каталога после операций, связанных с фоновыми вставками в таблицу Distributed (после вставки, после отправки данных на шард и т. д.). | false |
skip_unavailable_shards | Если true, ClickHouse молча пропускает недоступные шарды. Шард помечается как недоступный, когда: 1) к нему нет доступа из‑за ошибки соединения; 2) шард не может быть разрешён через DNS; 3) таблица не существует на шарде. | false |
bytes_to_throw_insert | Если количество сжатых байт, ожидающих фоновой операции INSERT, превысит это значение, будет выброшено исключение. 0 — не выбрасывать. | 0 |
bytes_to_delay_insert | Если количество сжатых байт, ожидающих фоновой операции INSERT, превысит это значение, выполнение запроса будет задержано. 0 — не задерживать. | 0 |
max_delay_to_insert | Максимальная задержка вставки данных в таблицу Distributed в секундах, если имеется много данных (байт), ожидающих фоновой отправки. | 60 |
background_insert_batch | То же, что и distributed_background_insert_batch. | 0 |
background_insert_split_batch_on_failure | То же, что и distributed_background_insert_split_batch_on_failure. | 0 |
background_insert_sleep_time_ms | То же, что и distributed_background_insert_sleep_time_ms. | 0 |
background_insert_max_sleep_time_ms | То же, что и distributed_background_insert_max_sleep_time_ms. | 0 |
flush_on_detach | Сбрасывать данные на удалённые узлы при DETACH/DROP/остановке сервера. | true |
Параметры надёжности (fsync_...):
- Влияют только на фоновые
INSERT(то есть приdistributed_foreground_insert=false), когда данные сначала сохраняются на диск инициирующего узла, а затем в фоне отправляются на шарды. - Могут существенно снизить производительность
INSERT. - Влияют на запись данных, хранящихся внутри каталога распределённой таблицы, на узел, который принял вашу вставку. Если вам нужны гарантии записи данных в базовые таблицы MergeTree, см. параметры надёжности (
...fsync...) вsystem.merge_tree_settings.
Для параметров ограничений вставки (..._insert) см. также:
- параметр
distributed_foreground_insert - параметр
prefer_localhost_replica bytes_to_throw_insertобрабатывается раньше, чемbytes_to_delay_insert, поэтому его не следует устанавливать в значение меньше значенияbytes_to_delay_insert.
Пример
Данные будут читаться со всех серверов в кластере logs из таблицы default.hits, расположенной на каждом сервере кластера. Данные не только читаются, но и частично обрабатываются на удалённых серверах (насколько это возможно). Например, для запроса с GROUP BY данные будут агрегироваться на удалённых серверах, а промежуточные состояния агрегатных функций будут отправлены на сервер, выполняющий запрос. Затем данные будут дополнительно агрегированы.
Вместо имени базы данных вы можете использовать константное выражение, которое возвращает строку. Например: currentDatabase().
Кластеры
Кластеры настраиваются в конфигурационном файле сервера:
Здесь определяется кластер с именем logs, который состоит из двух шардов, каждый из которых содержит по две реплики. Шарды — это серверы, в которых хранятся разные части данных (чтобы прочитать все данные, необходимо обратиться ко всем шардам). Реплики — это серверы-дубликаты (чтобы прочитать все данные, можно обратиться к данным на любой из реплик).
Имена кластеров не должны содержать точек.
Для каждого сервера указываются параметры host, port, а также, при необходимости, user, password, secure, compression, bind_host:
| Parameter | Description | Default Value |
|---|---|---|
host | Адрес удалённого сервера. Можно использовать либо домен, либо IPv4- или IPv6-адрес. Если вы указываете домен, сервер выполняет DNS-запрос при запуске и сохраняет результат на всё время своей работы. Если DNS-запрос завершился с ошибкой, сервер не запускается. Если вы изменили DNS-запись, перезапустите сервер. | - |
port | TCP-порт для работы сервера (tcp_port в конфигурации, обычно имеет значение 9000). Не путать с http_port. | - |
user | Имя пользователя для подключения к удалённому серверу. У этого пользователя должны быть права на подключение к указанному серверу. Доступ настраивается в файле users.xml. Дополнительные сведения см. в разделе Access rights. | default |
password | Пароль для подключения к удалённому серверу (не маскируется). | '' |
secure | Использовать ли защищённое SSL/TLS-подключение. Обычно также требуется указать порт (порт по умолчанию для защищённого подключения — 9440). Сервер должен слушать на <tcp_port_secure>9440</tcp_port_secure> и быть настроен с корректными сертификатами. | false |
compression | Использовать сжатие данных. | true |
bind_host | Исходный адрес, используемый при подключении к удалённому серверу с этого узла. Поддерживается только IPv4-адрес. Предназначен для продвинутых сценариев развертывания, когда требуется задать исходный IP-адрес, используемый распределёнными запросами ClickHouse. | - |
При указании реплик одна из доступных реплик будет выбрана для каждого шарда при чтении. Можно настроить алгоритм балансировки нагрузки (предпочтения при выборе реплики для доступа) — см. настройку load_balancing. Если не удалось установить подключение с сервером, выполняется попытка подключения с коротким таймаутом. Если подключиться не удалось, выбирается следующая реплика и так далее для всех реплик. Если попытка подключения не удалась для всех реплик, попытка будет повторена тем же образом несколько раз. Это повышает устойчивость, но не обеспечивает полной отказоустойчивости: удалённый сервер может принять подключение, но не работать или работать некорректно.
Можно указать только один шард (в этом случае обработку запроса корректнее называть удалённой, а не распределённой) или любое количество шардов. В каждом шарде можно указать от одной до любого количества реплик. Можно задать разное количество реплик для каждого шарда.
В конфигурации можно указать произвольное количество кластеров.
Для просмотра ваших кластеров используйте таблицу system.clusters.
Движок Distributed позволяет работать с кластером как с локальным сервером. Однако конфигурацию кластера нельзя задавать динамически — её необходимо настроить в конфигурационном файле сервера. Обычно все серверы в кластере имеют одинаковую конфигурацию кластера (хотя это и не обязательно). Кластеры из конфигурационного файла обновляются на лету, без перезапуска сервера.
Если при каждом выполнении запроса нужно отправлять его в неизвестный заранее набор шардов и реплик, нет необходимости создавать таблицу Distributed — вместо этого используйте табличную функцию remote. См. раздел Table functions.
Запись данных
Существует два способа записи данных в кластер:
Во-первых, вы можете определить, на какие серверы записывать какие данные, и выполнять запись непосредственно на каждый шард. Другими словами, выполнять прямые запросы INSERT в удалённые таблицы в кластере, на которые указывает таблица Distributed. Это наиболее гибкое решение, поскольку вы можете использовать любую схему шардинга, даже нетривиальную, в силу требований предметной области. Это также наиболее оптимальное решение, так как данные могут записываться в разные шарды полностью независимо.
Во-вторых, вы можете выполнять запросы INSERT в таблицу Distributed. В этом случае таблица будет самостоятельно распределять вставленные данные по серверам. Чтобы выполнять запись в таблицу Distributed, у неё должен быть настроен параметр sharding_key (кроме случая, когда существует только один шард).
Для каждого шарда в конфигурационном файле может быть задан <weight>. По умолчанию вес равен 1. Данные распределяются по шардам в объёме, пропорциональном весу шарда. Все веса шардов суммируются, затем вес каждого шарда делится на эту сумму для определения доли каждого шарда. Например, если есть два шарда и первый имеет вес 1, а второй — вес 2, первому будет отправляться одна треть (1 / 3) вставляемых строк, а второму — две трети (2 / 3).
Для каждого шарда в конфигурационном файле может быть задан параметр internal_replication. Если этот параметр установлен в true, операция записи выбирает первую «здоровую» реплику и записывает данные в неё. Используйте это, если таблицы, лежащие в основе таблицы Distributed, являются реплицируемыми таблицами (например, любые движки таблиц Replicated*MergeTree). Одна из реплик таблицы получит запись, и она будет автоматически реплицирована на остальные реплики.
Если internal_replication установлено в false (значение по умолчанию), данные записываются во все реплики. В этом случае таблица Distributed сама реплицирует данные. Это хуже, чем использование реплицируемых таблиц, потому что согласованность реплик не проверяется, и со временем они будут содержать немного отличающиеся данные.
Для выбора шарда, на который будет отправлена строка данных, анализируется выражение шардинга и берётся остаток от его деления на суммарный вес шардов. Строка отправляется на шард, соответствующий полуинтервалу остатков от prev_weights до prev_weights + weight, где prev_weights — суммарный вес шардов с наименьшими номерами, а weight — вес данного шарда. Например, если есть два шарда, и первый имеет вес 9, а второй — вес 10, строка будет отправлена на первый шард для остатков из диапазона [0, 9), и на второй — для остатков из диапазона [9, 19).
Выражение шардинга может быть любым выражением на основе констант и столбцов таблицы, которое возвращает целое число. Например, вы можете использовать выражение rand() для случайного распределения данных или UserID для распределения по остатку от деления идентификатора пользователя (тогда данные одного пользователя будут находиться на одном шарде, что упрощает выполнение IN и JOIN по пользователям). Если один из столбцов распределяется недостаточно равномерно, вы можете обернуть его в хеш-функцию, например intHash64(UserID).
Простой остаток от деления — ограниченное решение для шардинга и подходит не всегда. Он работает для средних и больших объёмов данных (десятки серверов), но не для очень больших объёмов данных (сотни серверов и более). В последнем случае используйте схему шардинга, требуемую предметной областью, вместо использования таблиц Distributed.
О схеме шардинга следует задуматься в следующих случаях:
- Используются запросы, которые выполняют объединение данных (
INилиJOIN) по определённому ключу. Если данные разбиты на шарды по этому ключу, можно использовать локальныеINилиJOINвместоGLOBAL INилиGLOBAL JOIN, что значительно эффективнее. - Используется большое количество серверов (сотни и более) с большим количеством небольших запросов, например запросов по данным отдельных клиентов (например, веб‑сайтов, рекламодателей или партнёров). Чтобы небольшие запросы не влияли на весь кластер, имеет смысл размещать данные одного клиента на одном шарде. В качестве альтернативы можно настроить двухуровневый шардинг: разделить весь кластер на «слои», где слой может состоять из нескольких шардов. Данные одного клиента располагаются в одном слое, но шарды можно добавлять в слой по мере необходимости, а данные распределяются между ними случайным образом. Для каждого слоя создаются таблицы
Distributed, а для глобальных запросов создаётся одна общая распределённая таблица.
Данные записываются в фоновом режиме. При вставке в таблицу блок данных просто записывается в локальную файловую систему. Данные отправляются на удалённые серверы в фоновом режиме как можно скорее. Периодичность отправки данных управляется настройками distributed_background_insert_sleep_time_ms и distributed_background_insert_max_sleep_time_ms. Движок Distributed отправляет каждый файл с вставленными данными отдельно, но можно включить пакетную отправку файлов с помощью настройки distributed_background_insert_batch. Эта настройка повышает производительность кластера за счёт более эффективного использования ресурсов локального сервера и сети. Следует проверять, были ли данные успешно отправлены, просматривая список файлов (данных, ожидающих отправки) в каталоге таблицы: /var/lib/clickhouse/data/database/table/. Количество потоков, выполняющих фоновые задачи, можно задать настройкой background_distributed_schedule_pool_size.
Если сервер был аварийно остановлен или пережил жёсткую перезагрузку (например, из‑за сбоя оборудования) после выполнения INSERT в таблицу Distributed, вставленные данные могут быть потеряны. Если в каталоге таблицы обнаруживается повреждённая часть данных, она переносится в подкаталог broken и больше не используется.
Чтение данных
При выполнении запроса к таблице Distributed запросы SELECT отправляются на все шарды и выполняются независимо от того, как распределены данные по шардам (они могут быть распределены полностью случайным образом). При добавлении нового шарда нет необходимости переносить в него старые данные. Вместо этого можно записывать в него новые данные, используя более высокий вес — данные будут распределены немного неравномерно, но запросы по-прежнему будут выполняться корректно и эффективно.
Когда включён параметр max_parallel_replicas, обработка запросов параллелизуется по всем репликам внутри одного шарда. Дополнительную информацию см. в разделе max_parallel_replicas.
Чтобы узнать больше о том, как обрабатываются распределённые запросы in и global in, см. документацию.
Виртуальные столбцы
_shard_num
_shard_num — содержит значение shard_num из таблицы system.clusters. Тип: UInt32.
См. также
- Описание виртуальных столбцов
- Настройка
background_distributed_schedule_pool_size - Функции
shardNum()иshardCount()