Skip to main content

· 3 min read

You reached the parts_to_throw_insert setting on a MergeTree table. You can monitor the number of active parts for a given table with:

select count(*) from system.parts where table = '<table_name>' and active == 1

The main requirement about inserting into Clickhouse: you should never send too many INSERT statements per second. Ideally - one insert per second / per few seconds.

So you can insert 100K rows per second but only with one big bulk INSERT statement. When you send hundreds / thousands insert statements per second to *MergeTree table you will always get some errors, and it can not be changed by adjusting some settings.

If you can't combine lot of inserts into one big bulk insert statement outside - then you should create Buffer table before *MergeTree table.

  1. Each insert create a folder in /var/lib/clickhouse/.../table_name/. Inside that folder there are 2 files per each column - one with data (compressed), second with index. Data is physically sorted by primary key inside those files. Those folders are called 'parts'.

  2. ClickHouse merges those smaller parts to bigger parts in the background. It chooses parts to merge according to some rules. After merging two (or more) parts one bigger part is being created and old parts are queued to be removed. The settings you list allow finetuning the rules of merging parts. The goal of merging process - is to leave one big part for each partition (or few big parts per partition which are not worth to merge because they are too big). Please check also that comment.

  3. If you create new parts too fast (for example by doing lot of small inserts) and ClickHouse is not able to merge them with proper speed (so new parts come faster than ClickHouse can merge them) - then you get the exception 'Merges are processing significantly slower than inserts'. You can try to increase the limit but you can get the situation then you get filesystem problems caused by the too big number of files / directories (like inodes limit).

  4. If you insert to lot of partitions at once the problem is multiplied by the number of partitions affected by insert.

  5. You can try to adjust the behaviour of clickhouse with one of the listed settings, or with max_insert_block_size / max_block_size / insert_format_max_block_size / max_client_network_bandwidth. But: the better solution is just to insert data in expected tempo. The expected tempo is: one insert per 1-2 sec, each insert containing 10K-500K rows of data.

  6. So proper solution to solve "Merges are processing significantly slower than inserts" is to adjust the number of inserts per second and number of rows in each insert. Use batch insert to combine small inserts into one bigger if data comes row-by-row. Throttle huge inserts if you have too much data to insert at once. Don't change clickhouse internals, unless you really understand well what does they it mean.

  7. If your data comes faster than 500K rows per second - most probably you need more servers in the cluster to serve that traffic, not the adjustment of settings.

  8. The speed of background merges usually depends on storage speed, used compression settings, the MergeTree option (the merge algorithm - plain merge/aggregating/summing/collapsing, etc.), and the used sorting key.

· One min read

ClickHouse supports a wide range of data formats for input and output. There are multiple JSON variations among them, but the most commonly used for data ingestion is JSONEachRow. It expects one JSON object per row, each object separated by a newline.

Examples

Using HTTP interface:

$ echo '{"foo":"bar"}' | curl 'http://localhost:8123/?query=INSERT%20INTO%20test%20FORMAT%20JSONEachRow' --data-binary @-

Using CLI interface:

$ echo '{"foo":"bar"}'  | clickhouse-client --query="INSERT INTO test FORMAT JSONEachRow"

Instead of inserting data manually, you might consider to use one of client libraries instead.

Useful Settings

  • input_format_skip_unknown_fields allows to insert JSON even if there were additional fields not present in table schema (by discarding them).
  • input_format_import_nested_json allows to insert nested JSON objects into columns of Nested type.
Note

Settings are specified as GET parameters for the HTTP interface or as additional command-line arguments prefixed with -- for the CLI interface.

· 4 min read

In no particular order, here are some handy queries for troubleshooting ClickHouse and figuring out what is happening. We also have a great blog with some essential queries for monitoring ClickHouse.

View which settings have been changed from the default

SELECT
name,
value
FROM system.settings
WHERE changed

Get the size of all your tables

SELECT table,
formatReadableSize(sum(bytes)) as size
FROM system.parts
WHERE active
GROUP BY table

The response looks like:

┌─table───────────┬─size──────┐
│ stat │ 38.89 MiB │
│ customers │ 525.00 B │
│ my_sparse_table │ 40.73 MiB │
│ crypto_prices │ 32.18 MiB │
│ hackernews │ 6.23 GiB │
└─────────────────┴───────────┘

Row count and average day size of your table

SELECT
table,
formatReadableSize(size) AS size,
rows,
days,
formatReadableSize(avgDaySize) AS avgDaySize
FROM
(
SELECT
table,
sum(bytes) AS size,
sum(rows) AS rows,
min(min_date) AS min_date,
max(max_date) AS max_date,
max_date - min_date AS days,
size / (max_date - min_date) AS avgDaySize
FROM system.parts
WHERE active
GROUP BY table
ORDER BY rows DESC
)

Compression columns percentage as well as the size of primary index in memory

You can see how compressed your data is by column. This query also returns the size of your primary indexes in memory - useful to know because primary indexes must fit in memory.

SELECT
parts.*,
columns.compressed_size,
columns.uncompressed_size,
columns.compression_ratio,
columns.compression_percentage
FROM
(
SELECT
table,
formatReadableSize(sum(data_uncompressed_bytes)) AS uncompressed_size,
formatReadableSize(sum(data_compressed_bytes)) AS compressed_size,
round(sum(data_compressed_bytes) / sum(data_uncompressed_bytes), 3) AS compression_ratio,
round(100 - ((sum(data_compressed_bytes) * 100) / sum(data_uncompressed_bytes)), 3) AS compression_percentage
FROM system.columns
GROUP BY table
) AS columns
RIGHT JOIN
(
SELECT
table,
sum(rows) AS rows,
max(modification_time) AS latest_modification,
formatReadableSize(sum(bytes)) AS disk_size,
formatReadableSize(sum(primary_key_bytes_in_memory)) AS primary_keys_size,
any(engine) AS engine,
sum(bytes) AS bytes_size
FROM system.parts
WHERE active
GROUP BY
database,
table
) AS parts ON columns.table = parts.table
ORDER BY parts.bytes_size DESC

Number of queries sent by client in the last 10 minutes

Feel free to increase or decrease the time interval in the toIntervalMinute(10) function:

SELECT
client_name,
count(),
query_kind,
toStartOfMinute(event_time) AS event_time_m
FROM system.query_log
WHERE (type = 'QueryStart') AND (event_time > (now() - toIntervalMinute(10)))
GROUP BY
event_time_m,
client_name,
query_kind
ORDER BY
event_time_m DESC,
count() ASC

Number of parts in each partition

SELECT
concat(database, '.', table),
partition_id,
count()
FROM system.parts
WHERE active
GROUP BY
database,
table,
partition_id

Finding long running queries

This can help find queries that are stuck:

SELECT
elapsed,
initial_user,
client_name,
hostname(),
query_id,
query
FROM clusterAllReplicas(default, system.processes)
ORDER BY elapsed DESC

Using the query id of the worst running query, we can get a stack trace that can help when debugging.

SET allow_introspection_functions=1;

SELECT
arrayStringConcat(
arrayMap(
x,
y -> concat(x, ': ', y),
arrayMap(x -> addressToLine(x), trace),
arrayMap(x -> demangle(addressToSymbol(x)), trace)
),
'\n'
) as trace
FROM
system.stack_trace
WHERE
query_id = '0bb6e88b-9b9a-4ffc-b612-5746c859e360';

View the most recent errors

SELECT *
FROM system.errors
ORDER BY last_error_time DESC

The response looks like:

┌─name──────────────────┬─code─┬─value─┬─────last_error_time─┬─last_error_message──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─last_error_trace─┬─remote─┐
│ UNKNOWN_TABLE │ 60 │ 3 │ 2023-03-14 01:02:35 │ Table system.stack_trace doesn't exist │ [] │ 0 │
│ BAD_GET │ 170 │ 1 │ 2023-03-14 00:58:55 │ Requested cluster 'default' not found │ [] │ 0 │
│ UNKNOWN_IDENTIFIER │ 47 │ 1 │ 2023-03-14 00:49:12 │ Missing columns: 'parts.table' 'table' while processing query: 'table = parts.table', required columns: 'table' 'parts.table' 'table' 'parts.table' │ [] │ 0 │
│ NO_ELEMENTS_IN_CONFIG │ 139 │ 2 │ 2023-03-14 00:42:11 │ Certificate file is not set. │ [] │ 0 │
└───────────────────────┴──────┴───────┴─────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴──────────────────┴────────┘

Top 10 queries that are using the most CPU and memory

SELECT
type,
event_time,
initial_query_id,
formatReadableSize(memory_usage) AS memory,
`ProfileEvents.Values`[indexOf(`ProfileEvents.Names`, 'UserTimeMicroseconds')] AS userCPU,
`ProfileEvents.Values`[indexOf(`ProfileEvents.Names`, 'SystemTimeMicroseconds')] AS systemCPU,
normalizedQueryHash(query) AS normalized_query_hash
FROM system.query_log
ORDER BY memory_usage DESC
LIMIT 10

How much disk space are my projection using

SELECT
name,
parent_name,
formatReadableSize(bytes_on_disk) AS bytes,
formatReadableSize(parent_bytes_on_disk) AS parent_bytes,
bytes_on_disk / parent_bytes_on_disk AS ratio
FROM system.projection_parts

Show disk storage, number of parts, number of rows in system.parts and marks across databases

SELECT
database,
table,
partition,
count() AS parts,
formatReadableSize(sum(bytes_on_disk)) AS bytes_on_disk,
formatReadableQuantity(sum(rows)) AS rows,
sum(marks) AS marks
FROM system.parts
WHERE (database != 'system') AND active
GROUP BY
database,
table,
partition
ORDER BY database ASC

List details of recently written new parts

The details include when they got created, how large they are, how many rows, and more:

SELECT
modification_time,
rows,
formatReadableSize(bytes_on_disk),
*
FROM clusterAllReplicas(default, system.parts)
WHERE (database = 'default') AND active AND (level = 0)
ORDER BY modification_time DESC
LIMIT 100

· 3 min read

Happy Pi Day! We thought it would be fun to calculate pi using SQL queries in ClickHouse. Here is what we came up with so far...

  1. This one uses the ClickHouse numbers_mt table function to return 1B rows and only takes 40ms to compute the calculation:
SELECT 4 * sum(if(number % 2, -1, 1) / ((number * 2) + 1)) AS pi
FROM numbers_mt(1000000000.)

┌────────────────pi─┐
3.141592652589797
└───────────────────┘

1 row in set. Elapsed: 0.432 sec. Processed 1.00 billion rows, 8.00 GB (2.32 billion rows/s., 18.53 GB/s.)
  1. The following example also processes 1B numbers, just not as quickly:
SELECT 3 + (4 * sum(if((number % 2) = 0, if((number % 4) = 0, -1 / ((number * (number + 1)) * (number + 2)), 1 / ((number * (number + 1)) * (number + 2))), 0))) AS pi
FROM numbers_mt(2, 10000000000)

┌─────────────────pi─┐
3.1415926525808087
└────────────────────┘

1 row in set. Elapsed: 9.825 sec. Processed 10.00 billion rows, 80.00 GB (1.02 billion rows/s., 8.14 GB/s.)
  1. This one is obviously our favorite in ClickHouse (and the most accurate!):
SELECT pi()

┌──────────────pi()─┐
3.141592653589793
└───────────────────┘

1 row in set. Elapsed: 0.008 sec.
  1. Someone knew their trigonometry with this one:
SELECT 2 * asin(1) AS pi

┌────────────────pi─┐
3.141592653589793
└───────────────────┘

1 row in set. Elapsed: 0.005 sec.
  1. Here is a handy API that lets you specify the number of digits you want:
SELECT *
FROM url('https://api.pi.delivery/v1/pi?start=0&numberOfDigits=100', 'JSONEachRow')

┌───────────────content─┐
3.1415926535897933e99 │
└───────────────────────┘

1 row in set. Elapsed: 0.556 sec.
  1. This one is clever - it uses ClickHouse distance functions:
WITH random_points AS
(
SELECT (rand64(1) / pow(2, 64), rand64(2) / pow(2, 64)) AS point
FROM numbers(1000000000)
)
SELECT (4 * countIf(L2Norm(point) < 1)) / count() AS pi
FROM random_points


┌──────────pi─┐
3.141627208
└─────────────┘

1 row in set. Elapsed: 4.742 sec. Processed 1.00 billion rows, 8.00 GB (210.88 million rows/s., 1.69 GB/s.)
  1. If you're a physicist, you will be content with this one:
SELECT 22 / 7

┌─────divide(22, 7)─┐
3.142857142857143
└───────────────────┘
  1. Another indirect mehthod (this one came from Alexey Milovidov) that is accurate to 7 decimal places - and it's quick:
WITH
10 AS length,
(number / 1000000000.) * length AS x
SELECT pow((2 * length) * avg(exp(-(x * x))), 2) AS pi
FROM numbers_mt(1000000000.)


┌─────────────────pi─┐
3.1415926890388595
└────────────────────┘

1 row in set. Elapsed: 1.245 sec. Processed 1.00 billion rows, 8.00 GB (803.25 million rows/s., 6.43 GB/s.)
Note

If you have any more, we'd love for you to contribute. Thanks!

· 2 min read

Question: When a source table has new rows inserted into it, those new rows are also sent to all of the materialized views of that source table. Are inserts into Materialized Views performed synchronously, meaning that once the insert is acknowledged successfully from the server to the client, it means that all Materialized Views have been fully updated and available for queries?

Answer:

  1. When an INSERT succeeds, the data is inserted both to the table and all materialized views.
  2. The insert is not atomic with respect to materialized views. At the moment of time when the INSERT is in progress, concurrent clients may see the intermediate state, when the data is inserted to the main table, but not to materialized views, or vice versa.
  3. If you are using async inserts, they collect the data and perform a regular insert under the hood, returning the same type of answer to the client as for regular inserts. If the client received success from an async insert with the option wait_for_async_insert (as by default), the data is inserted into both the table and all of its materialized views.

Question: How about chained/cascaded materialized views?

Answer: The same rules apply - an INSERT with a successful response means that the data was inserted into every materialized view in the chain. The insert is non-atomic.

· 4 min read

Normally the max_threads setting controls the number of parallel reading threads and parallel query processing threads:

Untitled scene

The data is read 'in order', column after column, from disk.

Asynchronous data reading

The new setting allow_asynchronous_read_from_io_pool_for_merge_tree allows the number of reading threads (streams) to be higher than the number of threads in the rest of the query execution pipeline to speed up cold queries on low-CPU ClickHouse Cloud services, and to increase performance for I/O bound queries. When the setting is enabled, then the amount of reading threads is controlled by the max_streams_for_merge_tree_reading setting:

Untitled scene

The data is read asynchronously, in parallel from different columns.

Note that there is also the max_streams_to_max_threads_ratio setting for configuring the ratio between the number of reading threads (streams) and the number of threads in the rest of the query execution pipeline. However, in benchmarks it did not help as much as the max_streams_for_merge_tree_reading setting

What about optimize_read_in_order?

With the optimize_read_in_order optimization, ClickHouse can skip resorting data in memory if the queries sort order reflects the physical order of data on disk, but that requires reading the data in order (in contrast to asynchronous reading):

Untitled scene

optimize_read_in_order has precedence over asynchronous reading

When ClickHouse sees that optimize_read_in_order optimization can be applied, then the allow_asynchronous_read_from_io_pool_for_merge_tree setting will be ignored / disabled.

Example demonstrating all of the above

  • Create and load the UK Property Price Paid table

  • Check set value of max_threads (by default the amount of CPU cores that ClickHouse sees on the node executing the query

SELECT getSetting('max_threads');


┌─getSetting('max_threads')─┐
│ 10 │
└───────────────────────────┘
  • Check query pipeline with default amount of threads for both reading and processing the data
EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid;

┌─explain──────────────────────┐
│ (Expression) │
│ ExpressionTransform × 10 │
│ (ReadFromMergeTree) │
│ MergeTreeThread × 10 0 → 1 │
└──────────────────────────────┘
  • Check query pipeline with 60 async reading threads and default amount of threads for the rest of the query execution pipeline
EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid
SETTINGS
allow_asynchronous_read_from_io_pool_for_merge_tree = 1,
max_streams_for_merge_tree_reading = 60;


┌─explain────────────────────────┐
│ (Expression) │
│ ExpressionTransform × 10 │
│ (ReadFromMergeTree) │
│ Resize 60 → 10 │
│ MergeTreeThread × 60 0 → 1 │
└────────────────────────────────┘
  • Check query pipeline with 20 threads for both reading and processing the data
EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid
SETTINGS
max_threads = 20;


┌─explain──────────────────────┐
│ (Expression) │
│ ExpressionTransform × 20 │
│ (ReadFromMergeTree) │
│ MergeTreeThread × 20 0 → 1 │
└──────────────────────────────┘
  • Check query pipeline with 60 async reading threads and 20 threads for the rest of the query execution pipeline
EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid
SETTINGS
max_threads = 20,
allow_asynchronous_read_from_io_pool_for_merge_tree = 1,
max_streams_for_merge_tree_reading = 60;


┌─explain────────────────────────┐
│ (Expression) │
│ ExpressionTransform × 20 │
│ (ReadFromMergeTree) │
│ Resize 60 → 20 │
│ MergeTreeThread × 60 0 → 1 │
└────────────────────────────────┘
  • Check query pipeline with 60 async reading threads and 20 threads for the rest of the query execution pipeline when optimize_read_in_order optimization can be applied
EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid
ORDER BY postcode1, postcode2
SETTINGS
max_threads = 20,
allow_asynchronous_read_from_io_pool_for_merge_tree= 1,
max_streams_for_merge_tree_reading= 60;


┌─explain───────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Sorting) │
│ MergingSortedTransform 20 → 1 │
│ (Expression) │
│ ExpressionTransform × 20 │
│ (ReadFromMergeTree) │
│ MergeTreeInOrder × 20 0 → 1 │
└───────────────────────────────────┘


-- note that this is equivalent to disabling allow_asynchronous_read_from_io_pool_for_merge_tree

EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid
ORDER BY postcode1, postcode2
SETTINGS
max_threads = 20,
allow_asynchronous_read_from_io_pool_for_merge_tree = 0,
max_streams_for_merge_tree_reading = 0;


┌─explain───────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Sorting) │
│ MergingSortedTransform 20 → 1 │
│ (Expression) │
│ ExpressionTransform × 20 │
│ (ReadFromMergeTree) │
│ MergeTreeInOrder × 20 0 → 1 │
└───────────────────────────────────┘

-- note that you can enforce allow_asynchronous_read_from_io_pool_for_merge_tree by disabling optimize_read_in_order

EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid
ORDER BY
postcode1 ASC,
postcode2 ASC
SETTINGS
max_threads = 20,
allow_asynchronous_read_from_io_pool_for_merge_tree = 1,
max_streams_for_merge_tree_reading = 60,
optimize_read_in_order = 0;


┌─explain──────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Sorting) │
│ MergingSortedTransform 20 → 1 │
│ MergeSortingTransform × 20 │
│ (Expression) │
│ ExpressionTransform × 20 │
│ (ReadFromMergeTree) │
│ Resize 60 → 20 │
│ MergeTreeThread × 60 0 → 1 │
└──────────────────────────────────────┘


· 2 min read

There are several ways to define a setting for a user in ClickHouse, depending on the use case and how long you want the setting to be configured. Let's look at a few scenarios...

Configure a setting for a single query

A SELECT query can contain a SETTINGS clause where you can define any number of settings. The settings are only applied for that particular query. For example:

SELECT *
FROM my_table
SETTINGS max_threads = 8;

The maximum number of threads will be 8 for this particular query.

Configure a setting for a session

You can define a setting for the lifetime of a client session using a SET clause. This is handy for ad-hoc testing or for when you want a setting to live for the lifetime of a few queries - but not longer.

SET max_threads = 8;

SELECT *
FROM my_table;

Configure a setting for a particular user

Use ALTER USER to define a setting just for one user. For example:

ALTER USER my_user_name SETTINGS max_threads = 8;

You can verify it worked by logging out of your client, logging back in, then use the getSetting function:

SELECT getSetting('max_threads');

· One min read

In order to execute the same query on all nodes of a ClickHouse Cloud service, we can use clusterAllReplicas.

For example, in order to get entries from a (node-local) system table from all nodes, you can use:

SELECT ... FROM clusterAllReplicas(default, system.TABLE) ...;

Similarly, you can execute the same SYSTEM statement on all nodes with a single statement, by using the ON CLUSTER clause:

SYSTEM ... ON CLUSTER default;

For example for dropping the filesystem cache from all nodes, you can use:

SYSTEM DROP FILESYSTEM CACHE ON CLUSTER default;

· One min read

ClickHouse provides a simple and intuitive way to write filtered aggregates. For example, compare the standard SQL way to write filtered aggregates (which work fine in ClickHouse) with the shorthand syntax using the -If aggregate function combinator, which can be appended to any aggregate function:

--standard SQL
SELECT
avg(number)
FILTER (WHERE number > 50)
FROM numbers(100)

--ClickHouse using an aggregate combinator
SELECT
avgIf(number, number > 50)
FROM numbers(100)

Similarly, there is a -Distinct aggregate combinator:

--standard SQL
SELECT avg(DISTINCT number)

--ClickHouse using an aggregate combinator
SELECT avgDistinct(number)

Why are filtered aggregates are important? Because they allow you to implement the "segment comparison" feature in web analytics services. For example:

WITH
Region = 'us' AS segment1,
Browser = 'Chrome' AS segment2
SELECT
uniqIf(UserID, segment1),
uniqIf(UserID, segment2)
WHERE segment1 OR segment2

Check out the aggregate function combinator page in the docs for more details.

· One min read

ClickHouse uses threads from the Global Thread pool to process queries and also perform background operations like merges and mutations. If there is no idle thread to process a query, then a new thread is created in the pool.

The maximum size of the global thread pool is determined by the max_thread_pool_size setting, which defaults to 10,000. You can modify this value in your config - here we set it to 20,000:

<max_thread_pool_size>20000</max_thread_pool_size>

If you modify max_thread_pool_size, we recommend changing thread_pool_queue_size to be the same value. The thread_pool_queue_size setting is the maximum number of jobs that can be scheduled on the Global Thread pool:

<thread_pool_queue_size>20000</thread_pool_queue_size>

You can also free up resources if your server has a lot of idle threads - using the max_thread_pool_free_size setting. The default is 1,000, which means your Global Thread pool will never have more than 1,000 idle threads. The following example increases the value to 2,000:

<max_thread_pool_free_size>2000</max_thread_pool_free_size>

Check out the docs for more details on the settings above and other settings that affect the Global Thread pool.