Skip to main content

[experimental] MaterializedPostgreSQL

Creates a ClickHouse database with tables from PostgreSQL database. Firstly, database with engine MaterializedPostgreSQL creates a snapshot of PostgreSQL database and loads required tables. Required tables can include any subset of tables from any subset of schemas from specified database. Along with the snapshot database engine acquires LSN and once initial dump of tables is performed - it starts pulling updates from WAL. After database is created, newly added tables to PostgreSQL database are not automatically added to replication. They have to be added manually with ATTACH TABLE db.table query.

Replication is implemented with PostgreSQL Logical Replication Protocol, which does not allow to replicate DDL, but allows to know whether replication breaking changes happened (column type changes, adding/removing columns). Such changes are detected and according tables stop receiving updates. Such tables can be automatically reloaded in the background in case required setting is turned on (can be used starting from 22.1). Safest way for now is to use ATTACH/ DETACH queries to reload table completely. If DDL does not break replication (for example, renaming a column) table will still receive updates (insertion is done by position).

note

This database engine is experimental. To use it, set allow_experimental_database_materialized_postgresql to 1 in your configuration files or by using the SET command:

SET allow_experimental_database_materialized_postgresql=1

Creating a Database​

CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializedPostgreSQL('host:port', 'database', 'user', 'password') [SETTINGS ...]

Engine Parameters

  • host:port β€” PostgreSQL server endpoint.
  • database β€” PostgreSQL database name.
  • user β€” PostgreSQL user.
  • password β€” User password.

Example of Use​

CREATE DATABASE postgres_db
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password');

SHOW TABLES FROM postgres_db;

β”Œβ”€name───┐
β”‚ table1 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜

SELECT * FROM postgresql_db.postgres_table;

Dynamically adding new tables to replication​

After MaterializedPostgreSQL database is created, it does not automatically detect new tables in according PostgreSQL database. Such tables can be added manually:

ATTACH TABLE postgres_database.new_table;
warning

Before version 22.1, adding a table to replication left an unremoved temporary replication slot (named {db_name}_ch_replication_slot_tmp). If attaching tables in ClickHouse version before 22.1, make sure to delete it manually (SELECT pg_drop_replication_slot('{db_name}_ch_replication_slot_tmp')). Otherwise disk usage will grow. This issue is fixed in 22.1.

Dynamically removing tables from replication​

It is possible to remove specific tables from replication:

DETACH TABLE postgres_database.table_to_remove;

PostgreSQL schema​

PostgreSQL schema can be configured in 3 ways (starting from version 21.12).

  1. One schema for one MaterializedPostgreSQL database engine. Requires to use setting materialized_postgresql_schema. Tables are accessed via table name only:
CREATE DATABASE postgres_database
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_schema = 'postgres_schema';

SELECT * FROM postgres_database.table1;
  1. Any number of schemas with specified set of tables for one MaterializedPostgreSQL database engine. Requires to use setting materialized_postgresql_tables_list. Each table is written along with its schema. Tables are accessed via schema name and table name at the same time:
CREATE DATABASE database1
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_tables_list = 'schema1.table1,schema2.table2,schema1.table3',
materialized_postgresql_tables_list_with_schema = 1;

SELECT * FROM database1.`schema1.table1`;
SELECT * FROM database1.`schema2.table2`;

But in this case all tables in materialized_postgresql_tables_list must be written with its schema name. Requires materialized_postgresql_tables_list_with_schema = 1.

Warning: for this case dots in table name are not allowed.

  1. Any number of schemas with full set of tables for one MaterializedPostgreSQL database engine. Requires to use setting materialized_postgresql_schema_list.
CREATE DATABASE database1
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_schema_list = 'schema1,schema2,schema3';

SELECT * FROM database1.`schema1.table1`;
SELECT * FROM database1.`schema1.table2`;
SELECT * FROM database1.`schema2.table2`;

Warning: for this case dots in table name are not allowed.

Requirements​

  1. The wal_level setting must have a value logical and max_replication_slots parameter must have a value at least 2 in the PostgreSQL config file.

  2. Each replicated table must have one of the following replica identity:

  • primary key (by default)

  • index

postgres# CREATE TABLE postgres_table (a Integer NOT NULL, b Integer, c Integer NOT NULL, d Integer, e Integer NOT NULL);
postgres# CREATE unique INDEX postgres_table_index on postgres_table(a, c, e);
postgres# ALTER TABLE postgres_table REPLICA IDENTITY USING INDEX postgres_table_index;

The primary key is always checked first. If it is absent, then the index, defined as replica identity index, is checked. If the index is used as a replica identity, there has to be only one such index in a table. You can check what type is used for a specific table with the following command:

postgres# SELECT CASE relreplident
WHEN 'd' THEN 'default'
WHEN 'n' THEN 'nothing'
WHEN 'f' THEN 'full'
WHEN 'i' THEN 'index'
END AS replica_identity
FROM pg_class
WHERE oid = 'postgres_table'::regclass;
warning

Replication of TOAST values is not supported. The default value for the data type will be used.

Settings​

  1. materialized_postgresql_tables_list {#materialized-postgresql-tables-list}

    Sets a comma-separated list of PostgreSQL database tables, which will be replicated via MaterializedPostgreSQL database engine.

    Default value: empty list β€” means whole PostgreSQL database will be replicated.

  2. materialized_postgresql_schema {#materialized-postgresql-schema}

    Default value: empty string. (Default schema is used)

  3. materialized_postgresql_schema_list {#materialized-postgresql-schema-list}

    Default value: empty list. (Default schema is used)

  4. materialized_postgresql_allow_automatic_update {#materialized-postgresql-allow-automatic-update}

    Do not use this setting before 22.1 version.

    Allows reloading table in the background, when schema changes are detected. DDL queries on the PostgreSQL side are not replicated via ClickHouse MaterializedPostgreSQL engine, because it is not allowed with PostgreSQL logical replication protocol, but the fact of DDL changes is detected transactionally. In this case, the default behaviour is to stop replicating those tables once DDL is detected. However, if this setting is enabled, then, instead of stopping the replication of those tables, they will be reloaded in the background via database snapshot without data losses and replication will continue for them.

    Possible values:

    • 0 β€” The table is not automatically updated in the background, when schema changes are detected.
    • 1 β€” The table is automatically updated in the background, when schema changes are detected.

    Default value: 0.

  5. materialized_postgresql_max_block_size {#materialized-postgresql-max-block-size}

    Sets the number of rows collected in memory before flushing data into PostgreSQL database table.

    Possible values:

    • Positive integer.

    Default value: 65536.

  6. materialized_postgresql_replication_slot {#materialized-postgresql-replication-slot}

    A user-created replication slot. Must be used together with materialized_postgresql_snapshot.

  7. materialized_postgresql_snapshot {#materialized-postgresql-snapshot}

    A text string identifying a snapshot, from which initial dump of PostgreSQL tables will be performed. Must be used together with materialized_postgresql_replication_slot.

    CREATE DATABASE database1
    ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
    SETTINGS materialized_postgresql_tables_list = 'table1,table2,table3';

    SELECT * FROM database1.table1;

    The settings can be changed, if necessary, using a DDL query. But it is impossible to change the setting materialized_postgresql_tables_list. To update the list of tables in this setting use the ATTACH TABLE query.

    ALTER DATABASE postgres_database MODIFY SETTING materialized_postgresql_max_block_size = <new_size>;

Notes​

Failover of the logical replication slot​

Logical Replication Slots which exist on the primary are not available on standby replicas. So if there is a failover, new primary (the old physical standby) won’t be aware of any slots which were existing with old primary. This will lead to a broken replication from PostgreSQL. A solution to this is to manage replication slots yourself and define a permanent replication slot (some information can be found here). You'll need to pass slot name via materialized_postgresql_replication_slot setting, and it has to be exported with EXPORT SNAPSHOT option. The snapshot identifier needs to be passed via materialized_postgresql_snapshot setting.

Please note that this should be used only if it is actually needed. If there is no real need for that or full understanding why, then it is better to allow the table engine to create and manage its own replication slot.

Example (from @bchrobot)

  1. Configure replication slot in PostgreSQL.

    apiVersion: "acid.zalan.do/v1"
    kind: postgresql
    metadata:
    name: acid-demo-cluster
    spec:
    numberOfInstances: 2
    postgresql:
    parameters:
    wal_level: logical
    patroni:
    slots:
    clickhouse_sync:
    type: logical
    database: demodb
    plugin: pgoutput
  2. Wait for replication slot to be ready, then begin a transaction and export the transaction snapshot identifier:

    BEGIN;
    SELECT pg_export_snapshot();
  3. In ClickHouse create database:

    CREATE DATABASE demodb
    ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
    SETTINGS
    materialized_postgresql_replication_slot = 'clickhouse_sync',
    materialized_postgresql_snapshot = '0000000A-0000023F-3',
    materialized_postgresql_tables_list = 'table1,table2,table3';
  4. End the PostgreSQL transaction once replication to ClickHouse DB is confirmed. Verify that replication continues after failover:

    kubectl exec acid-demo-cluster-0 -c postgres -- su postgres -c 'patronictl failover --candidate acid-demo-cluster-1 --force'

Required permissions​

  1. CREATE PUBLICATION -- create query privilege.

  2. CREATE_REPLICATION_SLOT -- replication privelege.

  3. pg_drop_replication_slot -- replication privilege or superuser.

  4. DROP PUBLICATION -- owner of publication (username in MaterializedPostgreSQL engine itself).

It is possible to avoid executing 2 and 3 commands and having those permissions. Use settings materialized_postgresql_replication_slot and materialized_postgresql_snapshot. But with much care.

Access to tables:

  1. pg_publication

  2. pg_replication_slots

  3. pg_publication_tables