Skip to main content

· 2 min read

Question: When a source table has new rows inserted into it, those new rows are also sent to all of the materialized views of that source table. Are inserts into Materialized Views performed synchronously, meaning that once the insert is acknowledged successfully from the server to the client, it means that all Materialized Views have been fully updated and available for queries?

Answer:

  1. When an INSERT succeeds, the data is inserted both to the table and all materialized views.
  2. The insert is not atomic with respect to materialized views. At the moment of time when the INSERT is in progress, concurrent clients may see the intermediate state, when the data is inserted to the main table, but not to materialized views, or vice versa.
  3. If you are using async inserts, they collect the data and perform a regular insert under the hood, returning the same type of answer to the client as for regular inserts. If the client received success from an async insert with the option wait_for_async_insert (as by default), the data is inserted into both the table and all of its materialized views.

Question: How about chained/cascaded materialized views?

Answer: The same rules apply - an INSERT with a successful response means that the data was inserted into every materialized view in the chain. The insert is non-atomic.

· 4 min read

Normally the max_threads setting controls the number of parallel reading threads and parallel query processing threads:

Untitled scene

The data is read 'in order', column after column, from disk.

Asynchronous data reading

The new setting allow_asynchronous_read_from_io_pool_for_merge_tree allows the number of reading threads (streams) to be higher than the number of threads in the rest of the query execution pipeline to speed up cold queries on low-CPU ClickHouse Cloud services, and to increase performance for I/O bound queries. When the setting is enabled, then the amount of reading threads is controlled by the max_streams_for_merge_tree_reading setting:

Untitled scene

The data is read asynchronously, in parallel from different columns.

Note that there is also the max_streams_to_max_threads_ratio setting for configuring the ratio between the number of reading threads (streams) and the number of threads in the rest of the query execution pipeline. But in benchmarks it did not help as much as the max_streams_for_merge_tree_reading setting

Benchmarks

Here and here are some benchmarks regarding speeding up a cold query on a ClickHouse Cloud service.

What about optimize_read_in_order?

With the optimize_read_in_order optimization, ClickHouse can skip resorting data in memory if the queries sort order reflects the physical order of data on disk, but that requires reading the data in order (in contrast to asynchronous reading):

Untitled scene

optimize_read_in_order has precedence over asynchronous reading

When ClickHouse sees that optimize_read_in_order optimization can be applied, then the allow_asynchronous_read_from_io_pool_for_merge_tree setting will be ignored / disabled.

Example demonstrating all of the above

  • Create and load the UK Property Price Paid table

  • Check set value of max_threads (by default the amount of CPU cores that ClickHouse sees on the node executing the query

SELECT getSetting('max_threads');


┌─getSetting('max_threads')─┐
│ 10 │
└───────────────────────────┘
  • Check query pipeline with default amount of threads for both reading and processing the data
EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid;

┌─explain──────────────────────┐
│ (Expression) │
│ ExpressionTransform × 10 │
│ (ReadFromMergeTree) │
│ MergeTreeThread × 10 0 → 1 │
└──────────────────────────────┘
  • Check query pipeline with 60 async reading threads and default amount of threads for the rest of the query execution pipeline
EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid
SETTINGS
allow_asynchronous_read_from_io_pool_for_merge_tree = 1,
max_streams_for_merge_tree_reading = 60;


┌─explain────────────────────────┐
│ (Expression) │
│ ExpressionTransform × 10 │
│ (ReadFromMergeTree) │
│ Resize 60 → 10 │
│ MergeTreeThread × 60 0 → 1 │
└────────────────────────────────┘
  • Check query pipeline with 20 threads for both reading and processing the data
EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid
SETTINGS
max_threads = 20;


┌─explain──────────────────────┐
│ (Expression) │
│ ExpressionTransform × 20 │
│ (ReadFromMergeTree) │
│ MergeTreeThread × 20 0 → 1 │
└──────────────────────────────┘
  • Check query pipeline with 60 async reading threads and 20 threads for the rest of the query execution pipeline
EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid
SETTINGS
max_threads = 20,
allow_asynchronous_read_from_io_pool_for_merge_tree = 1,
max_streams_for_merge_tree_reading = 60;


┌─explain────────────────────────┐
│ (Expression) │
│ ExpressionTransform × 20 │
│ (ReadFromMergeTree) │
│ Resize 60 → 20 │
│ MergeTreeThread × 60 0 → 1 │
└────────────────────────────────┘
  • Check query pipeline with 60 async reading threads and 20 threads for the rest of the query execution pipeline when optimize_read_in_order optimization can be applied
EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid
ORDER BY postcode1, postcode2
SETTINGS
max_threads = 20,
allow_asynchronous_read_from_io_pool_for_merge_tree= 1,
max_streams_for_merge_tree_reading= 60;


┌─explain───────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Sorting) │
│ MergingSortedTransform 20 → 1 │
│ (Expression) │
│ ExpressionTransform × 20 │
│ (ReadFromMergeTree) │
│ MergeTreeInOrder × 20 0 → 1 │
└───────────────────────────────────┘


-- note that this is equivalent to disabling allow_asynchronous_read_from_io_pool_for_merge_tree

EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid
ORDER BY postcode1, postcode2
SETTINGS
max_threads = 20,
allow_asynchronous_read_from_io_pool_for_merge_tree = 0,
max_streams_for_merge_tree_reading = 0;


┌─explain───────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Sorting) │
│ MergingSortedTransform 20 → 1 │
│ (Expression) │
│ ExpressionTransform × 20 │
│ (ReadFromMergeTree) │
│ MergeTreeInOrder × 20 0 → 1 │
└───────────────────────────────────┘

-- note that you can enforce allow_asynchronous_read_from_io_pool_for_merge_tree by disabling optimize_read_in_order

EXPLAIN PIPELINE
SELECT *
FROM uk_price_paid
ORDER BY
postcode1 ASC,
postcode2 ASC
SETTINGS
max_threads = 20,
allow_asynchronous_read_from_io_pool_for_merge_tree = 1,
max_streams_for_merge_tree_reading = 60,
optimize_read_in_order = 0;


┌─explain──────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Sorting) │
│ MergingSortedTransform 20 → 1 │
│ MergeSortingTransform × 20 │
│ (Expression) │
│ ExpressionTransform × 20 │
│ (ReadFromMergeTree) │
│ Resize 60 → 20 │
│ MergeTreeThread × 60 0 → 1 │
└──────────────────────────────────────┘


· 3 min read

Happy Pi Day! We thought it would be fun to calculate pi using SQL queries in ClickHouse. Here is what we came up with so far...

  1. This one uses the ClickHouse numbers_mt table function to return 1B rows and only takes 40ms to compute the calculation:
SELECT 4 * sum(if(number % 2, -1, 1) / ((number * 2) + 1)) AS pi
FROM numbers_mt(1000000000.)

┌────────────────pi─┐
3.141592652589797
└───────────────────┘

1 row in set. Elapsed: 0.432 sec. Processed 1.00 billion rows, 8.00 GB (2.32 billion rows/s., 18.53 GB/s.)
  1. The following example also processes 1B numbers, just not as quickly:
SELECT 3 + (4 * sum(if((number % 2) = 0, if((number % 4) = 0, -1 / ((number * (number + 1)) * (number + 2)), 1 / ((number * (number + 1)) * (number + 2))), 0))) AS pi
FROM numbers_mt(2, 10000000000)

┌─────────────────pi─┐
3.1415926525808087
└────────────────────┘

1 row in set. Elapsed: 9.825 sec. Processed 10.00 billion rows, 80.00 GB (1.02 billion rows/s., 8.14 GB/s.)
  1. This one is obviously our favorite in ClickHouse (and the most accurate!):
SELECT pi()

┌──────────────pi()─┐
3.141592653589793
└───────────────────┘

1 row in set. Elapsed: 0.008 sec.
  1. Someone knew their trigonometry with this one:
SELECT 2 * asin(1) AS pi

┌────────────────pi─┐
3.141592653589793
└───────────────────┘

1 row in set. Elapsed: 0.005 sec.
  1. Here is a handy API that lets you specify the number of digits you want:
SELECT *
FROM url('https://api.pi.delivery/v1/pi?start=0&numberOfDigits=100', 'JSONEachRow')

┌───────────────content─┐
3.1415926535897933e99 │
└───────────────────────┘

1 row in set. Elapsed: 0.556 sec.
  1. This one is clever - it uses ClickHouse distance functions:
WITH random_points AS
(
SELECT (rand64(1) / pow(2, 64), rand64(2) / pow(2, 64)) AS point
FROM numbers(1000000000)
)
SELECT (4 * countIf(L2Norm(point) < 1)) / count() AS pi
FROM random_points


┌──────────pi─┐
3.141627208
└─────────────┘

1 row in set. Elapsed: 4.742 sec. Processed 1.00 billion rows, 8.00 GB (210.88 million rows/s., 1.69 GB/s.)
  1. If you're a physicist, you will be content with this one:
SELECT 22 / 7

┌─────divide(22, 7)─┐
3.142857142857143
└───────────────────┘
  1. Another indirect mehthod (this one came from Alexey Milovidov) that is accurate to 7 decimal places - and it's quick:
WITH
10 AS length,
(number / 1000000000.) * length AS x
SELECT pow((2 * length) * avg(exp(-(x * x))), 2) AS pi
FROM numbers_mt(1000000000.)


┌─────────────────pi─┐
3.1415926890388595
└────────────────────┘

1 row in set. Elapsed: 1.245 sec. Processed 1.00 billion rows, 8.00 GB (803.25 million rows/s., 6.43 GB/s.)
note

If you have any more, we'd love for you to contribute. Thanks!

· 2 min read

A columnar database stores the data of each column independently. This allows reading data from disk only for those columns that are used in any given query. The cost is that operations that affect whole rows become proportionally more expensive. The synonym for a columnar database is a column-oriented database management system. ClickHouse is a typical example of such a system.

Key columnar database advantages are:

  • Queries that use only a few columns out of many.
  • Aggregating queries against large volumes of data.
  • Column-wise data compression.

Here is the illustration of the difference between traditional row-oriented systems and columnar databases when building reports:

Traditional row-oriented Traditional row-oriented

Columnar Columnar

A columnar database is the preferred choice for analytical applications because it allows having many columns in a table just in case, but to not pay the cost for unused columns on read query execution time (a traditional OLTP database reads all of the data during queries as the data is stored in rows and not columns). Column-oriented databases are designed for big data processing and data warehousing, they often natively scale using distributed clusters of low-cost hardware to increase throughput. ClickHouse does it with combination of distributed and replicated tables.

· 2 min read

There are several ways to define a setting for a user in ClickHouse, depending on the use case and how long you want the setting to be configured. Let's look at a few scenarios...

Configure a setting for a single query

A SELECT query can contain a SETTINGS clause where you can define any number of settings. The settings are only applied for that particular query. For example:

SELECT *
FROM my_table
SETTINGS max_threads = 8;

The maximum number of threads will be 8 for this particular query.

Configure a setting for a session

You can define a setting for the lifetime of a client session using a SET clause. This is handy for ad-hoc testing or for when you want a setting to live for the lifetime of a few queries - but not longer.

SET max_threads = 8;

SELECT *
FROM my_table;

Configure a setting for a particular user

Use ALTER USER to define a setting just for one user. For example:

ALTER USER my_user_name SETTINGS max_threads = 8;

You can verify it worked by logging out of your client, logging back in, then use the getSetting function:

SELECT getSetting('max_threads');

· 2 min read

Problem remote() or remoteSecure() table function allows the access of remote table from another ClickHouse node.

When using these functions on a node that is located more than 100ms (latency wise) away from the remote node, it is common to encounter the following timeout error.

4776d4bd8190 :) SELECT * FROM remoteSecure('HOSTNAME.us-east-2.aws.clickhouse.cloud', DATABASE, TABLE, 'USER', 'USER_PASSWORD')

SELECT *
FROM remoteSecure('HOSTNAME.us-east-2.aws.clickhouse.cloud', DATABASE, TABLE, 'USER', 'USER_PASSWORD')

Query id: 2bd6ddd0-66d9-4d19-830f-87e3cec3724b


0 rows in set. Elapsed: 1.213 sec.

Received exception from server (version 22.6.9):
Code: 519. DB::Exception: Received from localhost:9000. DB::NetException. DB::NetException: All attempts to get table structure failed. Log:

Code: 279. DB::NetException: All connection tries failed. Log:

Code: 209. DB::NetException: Timeout: connect timed out: 18.218.245.169:9440 (hc7d963h1t.us-east-2.aws.clickhouse.cloud:9440, connection timeout 100 ms). (SOCKET_TIMEOUT) (version 22.6.9.11 (official build))
Code: 209. DB::NetException: Timeout: connect timed out: 18.218.245.169:9440 (hc7d963h1t.us-east-2.aws.clickhouse.cloud:9440, connection timeout 100 ms). (SOCKET_TIMEOUT) (version 22.6.9.11 (official build))
Code: 209. DB::NetException: Timeout: connect timed out: 18.218.245.169:9440 (hc7d963h1t.us-east-2.aws.clickhouse.cloud:9440, connection timeout 100 ms). (SOCKET_TIMEOUT) (version 22.6.9.11 (official build))

. (ALL_CONNECTION_TRIES_FAILED) (version 22.6.9.11 (official build))

. (NO_REMOTE_SHARD_AVAILABLE)

Workaround To get increase the connection timeout, set connect_timeout_with_failover_secure_ms to a higher value (e.g. 1 second) from the default 100ms.

4776d4bd8190 :) SELECT * FROM remoteSecure('HOSTNAME.us-east-2.aws.clickhouse.cloud:9440', DATABASE, TABLE, 'USER', 'USER_PASSWORD') SETTINGS connect_timeout_with_failover_secure_ms = 1000

SELECT *
FROM remoteSecure('HOSTNAME.us-east-2.aws.clickhouse.cloud:9440', DATABASE, TABLE, 'USER', 'USER_PASSWORD')
SETTINGS connect_timeout_with_failover_secure_ms = 1000

Query id: 8e2f4d41-307b-4e61-abb8-809190023247

┌─x─┐
│ 1 │
└───┘

1 row in set. Elapsed: 2.403 sec.

· One min read

It’s a combination of “Clickstream” and “Data wareHouse”. It comes from the original use case at Yandex.Metrica, where ClickHouse was supposed to keep records of all clicks by people from all over the Internet, and it still does the job. You can read more about this use case on ClickHouse history page.

This two-part meaning has two consequences:

  • The only correct way to write ClickHouse is with capital H.
  • If you need to abbreviate it, use CH. For some historical reasons, abbreviating as CK is also popular in China, mostly because one of the first talks about ClickHouse in Chinese used this form.
info

Many years after ClickHouse got its name, this approach of combining two words that are meaningful on their own has been highlighted as the best way to name a database in a research by Andy Pavlo, an Associate Professor of Databases at Carnegie Mellon University. ClickHouse shared his “best database name of all time” award with Postgres.

· 2 min read

The short answer is “yes”. ClickHouse has multiple mechanisms that allow freeing up disk space by removing old data. Each mechanism is aimed for different scenarios.

TTL

ClickHouse allows to automatically drop values when some condition happens. This condition is configured as an expression based on any columns, usually just static offset for any timestamp column.

The key advantage of this approach is that it does not need any external system to trigger, once TTL is configured, data removal happens automatically in background.

note

TTL can also be used to move data not only to /dev/null, but also between different storage systems, like from SSD to HDD.

More details on configuring TTL.

DELETE FROM

DELETE FROM allows standard DELETE queries to be run in ClickHouse. The rows targeted in the filter clause are marked as deleted, and removed from future result sets. Cleanup of the rows happens asynchronously.

note

DELETE FROM is an experimental feature and must be enabled with:

SET allow_experimental_lightweight_delete = true;

ALTER DELETE

ALTER DELETE removes rows using asynchronous batch operations. Unlike DELETE FROM, queries run after the ALTER DELETE and before the batch operations complete will include the rows targeted for deletion. For more details see the ALTER DELETE docs.

ALTER DELETE can be issued to flexibly remove old data. If you need to do it regularly, the main downside will be the need to have an external system to submit the query. There are also some performance considerations since mutations rewrite complete parts even there is only a single row to be deleted.

This is the most common approach to make your system based on ClickHouse GDPR-compliant.

More details on mutations.

DROP PARTITION

ALTER TABLE ... DROP PARTITION provides a cost-efficient way to drop a whole partition. It’s not that flexible and needs proper partitioning scheme configured on table creation, but still covers most common cases. Like mutations need to be executed from an external system for regular use.

More details on manipulating partitions.

TRUNCATE

It’s rather radical to drop all data from a table, but in some cases it might be exactly what you need.

More details on table truncation.

· 3 min read

The main requirement about inserting into Clickhouse: you should never send too many INSERT statements per second. Ideally - one insert per second / per few seconds.

So you can insert 100K rows per second but only with one big bulk INSERT statement. When you send hundreds / thousands insert statements per second to *MergeTree table you will always get some errors, and it can not be changed by adjusting some settings.

If you can't combine lot of inserts into one big bulk insert statement outside - then you should create Buffer table before *MergeTree table.

  1. Each insert create a folder in /var/lib/clickhouse/.../table_name/. Inside that folder there are 2 files per each column - one with data (compressed), second with index. Data is physically sorted by primary key inside those files. Those folders are called 'parts'.

  2. ClickHouse merges those smaller parts to bigger parts in the background. It chooses parts to merge according to some rules. After merging two (or more) parts one bigger part is being created and old parts are queued to be removed. The settings you list allow finetuning the rules of merging parts. The goal of merging process - is to leave one big part for each partition (or few big parts per partition which are not worth to merge because they are too big). Please check also that comment.

  3. If you create new parts too fast (for example by doing lot of small inserts) and ClickHouse is not able to merge them with proper speed (so new parts come faster than ClickHouse can merge them) - then you get the exception 'Merges are processing significantly slower than inserts'. You can try to increase the limit but you can get the situation then you get filesystem problems caused by the too big number of files / directories (like inodes limit).

  4. If you insert to lot of partitions at once the problem is multiplied by the number of partitions affected by insert.

  5. You can try to adjust the behaviour of clickhouse with one of the listed settings, or with max_insert_block_size / max_block_size / insert_format_max_block_size / max_client_network_bandwidth. But: the better solution is just to insert data in expected tempo. The expected tempo is: one insert per 1-2 sec, each insert containing 10K-500K rows of data.

  6. So proper solution to solve "Merges are processing significantly slower than inserts" is to adjust the number of inserts per second and number of rows in each insert. Use batch insert to combine small inserts into one bigger if data comes row-by-row. Throttle huge inserts if you have too much data to insert at once. Don't change clickhouse internals, unless you really understand well what does they it mean.

  7. If your data comes faster than 500K rows per second - most probably you need more servers in the cluster to serve that traffic, not the adjustment of settings.

  8. The speed of background merges usually depends on storage speed, used compression settings, and mergetree option, i.e. merge algorithm - plain merge / aggregating / summing / collapsing etc. & used soring key.

· One min read

In order to execute the same query on all nodes of a ClickHouse Cloud service, we can use clusterAllReplicas.

For example, in order to get entries from a (node-local) system table from all nodes, you can use:

SELECT ... FROM clusterAllReplicas(default, system.TABLE) ...;

Similarly, you can execute the same SYSTEM statement on all nodes with a single statement, by using the ON CLUSTER clause:

SYSTEM ... ON CLUSTER default;

For example for dropping the filesystem cache from all nodes, you can use:

SYSTEM DROP FILESYSTEM CACHE ON CLUSTER default;