[experimental] MaterializedPostgreSQL 

Creates ClickHouse database with an initial data dump of PostgreSQL database tables and starts replication process, i.e. executes background job to apply new changes as they happen on PostgreSQL database tables in the remote PostgreSQL database.

ClickHouse server works as PostgreSQL replica. It reads WAL and performs DML queries. DDL is not replicated, but can be handled (described below).

Creating a Database 

CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializedPostgreSQL('host:port', ['database' | database], 'user', 'password') [SETTINGS ...]

Engine Parameters

  • host:port — PostgreSQL server endpoint.
  • database — PostgreSQL database name.
  • user — PostgreSQL user.
  • password — User password.

Dynamically adding new tables to replication 

ATTACH TABLE postgres_database.new_table;

It will work as well if there is a setting materialized_postgresql_tables_list.

Dynamically removing tables from replication 

DETACH TABLE postgres_database.table_to_remove;

Settings 

CREATE DATABASE database1
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS materialized_postgresql_max_block_size = 65536,
         materialized_postgresql_tables_list = 'table1,table2,table3';

SELECT * FROM database1.table1;

It is also possible to change settings at run time.

ALTER DATABASE postgres_database MODIFY SETTING materialized_postgresql_max_block_size = <new_size>;

Requirements 

  1. The wal_level setting must have a value logical and max_replication_slots parameter must have a value at least 2 in the PostgreSQL config file.

  2. Each replicated table must have one of the following replica identity:

  • primary key (by default)

  • index

postgres# CREATE TABLE postgres_table (a Integer NOT NULL, b Integer, c Integer NOT NULL, d Integer, e Integer NOT NULL);
postgres# CREATE unique INDEX postgres_table_index on postgres_table(a, c, e);
postgres# ALTER TABLE postgres_table REPLICA IDENTITY USING INDEX postgres_table_index;

The primary key is always checked first. If it is absent, then the index, defined as replica identity index, is checked.
If the index is used as a replica identity, there has to be only one such index in a table.
You can check what type is used for a specific table with the following command:

postgres# SELECT CASE relreplident
          WHEN 'd' THEN 'default'
          WHEN 'n' THEN 'nothing'
          WHEN 'f' THEN 'full'
          WHEN 'i' THEN 'index'
       END AS replica_identity
FROM pg_class
WHERE oid = 'postgres_table'::regclass;

Example of Use 

CREATE DATABASE postgresql_db
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password');

SELECT * FROM postgresql_db.postgres_table;

Notes 

Failover of the logical replication slot 

Logical Replication Slots which exist on the primary are not available on standby replicas.
So if there is a failover, new primary (the old physical standby) won’t be aware of any slots which were existing with old primary. This will lead to a broken replication from PostgreSQL.
A solution to this is to manage replication slots yourself and define a permanent replication slot (some information can be found here). You'll need to pass slot name via materialized_postgresql_replication_slot setting, and it has to be exported with EXPORT SNAPSHOT option. The snapshot identifier needs to be passed via materialized_postgresql_snapshot setting.

Please note that this should be used only if it is actually needed. If there is no real need for that or full understanding why, then it is better to allow the table engine to create and manage its own replication slot.

Example (from @bchrobot)

  1. Configure replication slot in PostgreSQL.
apiVersion: "acid.zalan.do/v1"
kind: postgresql
metadata:
  name: acid-demo-cluster
spec:
  numberOfInstances: 2
  postgresql:
    parameters:
      wal_level: logical
  patroni:
    slots:
      clickhouse_sync:
        type: logical
        database: demodb
        plugin: pgoutput
  1. Wait for replication slot to be ready, then begin a transaction and export the transaction snapshot identifier:
BEGIN;
SELECT pg_export_snapshot();
  1. In ClickHouse create database:
CREATE DATABASE demodb
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
SETTINGS
  materialized_postgresql_replication_slot = 'clickhouse_sync',
  materialized_postgresql_snapshot = '0000000A-0000023F-3',
  materialized_postgresql_tables_list = 'table1,table2,table3';
  1. End the PostgreSQL transaction once replication to ClickHouse DB is confirmed. Verify that replication continues after failover:
kubectl exec acid-demo-cluster-0 -c postgres -- su postgres -c 'patronictl failover --candidate acid-demo-cluster-1 --force'

Rating: 2.7 - 9 votes

Was this content helpful?
★★★☆☆