Skip to main content
Skip to main content

Connect Apache Airflow to ClickHouse

ClickHouse Supported

Apache Airflow is an open-source platform for authoring, scheduling, and monitoring workflows as code. Workflows are defined as directed acyclic graphs (DAGs) of tasks written in Python.

The apache-airflow-providers-clickhousedb provider connects Airflow to ClickHouse, letting you run queries, create tables, and load data as part of a DAG. It connects over the HTTP interface using the clickhouse-connect client, and exposes ClickHouse through Airflow's common SQL framework, so the standard SQLExecuteQueryOperator handles DDL, DML, and analytical queries with no ClickHouse-specific operator required.

Install the provider

Install the provider into the environment where your Airflow scheduler and workers run:

pip install apache-airflow-providers-clickhousedb

The provider depends on apache-airflow-providers-common-sql and clickhouse-connect, which are installed alongside it. To pass query results to pandas or polars DataFrames, install the optional extras:

pip install 'apache-airflow-providers-common-sql[pandas,polars]'

Create a ClickHouse connection

The provider registers a connection type of clickhouse. Create a connection from the Airflow UI under Admin > Connections, or define one through the CLI or an environment variable.

In the UI, select ClickHouse as the connection type and fill in the fields:

FieldDescriptionDefault
HostClickHouse server hostname, for example abc123.clickhouse.cloudlocalhost
PortHTTP(S) port8123 (plain), 8443 (TLS)
LoginClickHouse usernamedefault
PasswordClickHouse user password(empty)
DatabaseDefault database for the connection. The UI labels this Database; it is the schema field when you define the connection by URI or JSON.default

For ClickHouse Cloud or any self-hosted cluster with TLS enabled, set secure to true in the Extra field and use the TLS port (8443).

Extra connection options

The provider exposes additional options as dedicated fields in the connection form. When you define the connection by URI, JSON, or environment variable instead, supply them as keys in the extra JSON object. All are optional:

extra keyUI fieldDefaultDescription
secureUse TLS (HTTPS)falseEnable HTTPS/TLS.
verifyVerify SSL CertificatetrueVerify the server TLS certificate when secure is true. Set to false for self-signed certificates.
connect_timeoutConnection Timeout (seconds)10HTTP connection timeout in seconds.
send_receive_timeoutQuery Timeout (seconds)300Query read/write timeout in seconds. Increase this for long-running analytical queries.
compressEnable LZ4 CompressiontrueEnable LZ4 result compression.
client_nameClient Name(empty)A label appended to the Airflow version identifier in the ClickHouse User-Agent and the client_name column of system.query_log.
session_settingsSession Settings (JSON)(empty)ClickHouse session settings applied to every query on the connection, for example {"max_execution_time": 300, "max_threads": 8}.
client_kwargsClient kwargs (JSON)(empty)Additional keyword arguments forwarded to clickhouse_connect.get_client(), for example an http_proxy.

Define a connection without the UI

Set the connection through an environment variable. The URI form covers host, credentials, and database:

export AIRFLOW_CONN_CLICKHOUSE_DEFAULT='clickhouse://default:password@localhost:8123/my_database'

All components of the URI must be URL-encoded. For TLS, timeouts, or session settings, use the JSON form, which exposes the Extra fields:

export AIRFLOW_CONN_CLICKHOUSE_DEFAULT='{
    "conn_type": "clickhouse",
    "host": "abc123.clickhouse.cloud",
    "port": 8443,
    "login": "default",
    "password": "secret",
    "schema": "my_database",
    "extra": {
        "secure": true,
        "session_settings": {
            "max_execution_time": 300,
            "max_memory_usage": 10000000000
        }
    }
}'

All hooks and operators use the connection ID clickhouse_default unless you specify another.

Run queries with SQLExecuteQueryOperator

Set the operator's conn_id to your ClickHouse connection. The following DAG creates a table, inserts rows, reads them back, and drops the table:

from datetime import datetime

from airflow import DAG
from airflow.providers.common.sql.hooks.sql import fetch_all_handler
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

CLICKHOUSE_CONN_ID = "clickhouse_default"
CLICKHOUSE_TABLE = "airflow_example"

with DAG(
    dag_id="example_clickhouse",
    start_date=datetime(2021, 1, 1),
    default_args={"conn_id": CLICKHOUSE_CONN_ID},
    schedule="@once",
    catchup=False,
) as dag:
    create_table = SQLExecuteQueryOperator(
        task_id="create_table",
        sql=f"""
            CREATE TABLE IF NOT EXISTS {CLICKHOUSE_TABLE} (
                id   UInt32,
                name String,
                ts   DateTime DEFAULT now()
            ) ENGINE = MergeTree()
            ORDER BY id
        """,
    )

    insert_rows = SQLExecuteQueryOperator(
        task_id="insert_rows",
        sql=f"""
            INSERT INTO {CLICKHOUSE_TABLE} (id, name) VALUES
                (1, 'Alice'),
                (2, 'Bob'),
                (3, 'Charlie')
        """,
    )

    read_rows = SQLExecuteQueryOperator(
        task_id="read_rows",
        sql=f"SELECT id, name FROM {CLICKHOUSE_TABLE} ORDER BY id",
        handler=fetch_all_handler,
    )

    drop_table = SQLExecuteQueryOperator(
        task_id="drop_table",
        sql=f"DROP TABLE IF EXISTS {CLICKHOUSE_TABLE}",
    )

    create_table >> insert_rows >> read_rows >> drop_table

Query results are fetched with the default handler (fetch_all_handler). To return something other than the full result set, pass a different handler, such as fetch_one_handler for the first row only.

Target a different database per task

When one connection points at a cluster and individual tasks query different databases, override the database through hook_params instead of creating a separate connection:

read_rows = SQLExecuteQueryOperator(
    task_id="read_rows",
    conn_id=CLICKHOUSE_CONN_ID,
    sql="SELECT count() FROM events",
    hook_params={"database": "analytics"},
)

Use the hook directly

For work that doesn't fit a SQL operator — bulk inserts, streaming, or ClickHouse-specific client calls — use ClickHouseHook inside a Python task.

The hook's bulk_insert_rows method uses the native columnar insert path in clickhouse-connect, which is much faster than row-by-row inserts for large datasets. Set batch_size to bound peak memory on very large inputs:

from airflow.providers.clickhousedb.hooks.clickhouse import ClickHouseHook

hook = ClickHouseHook(clickhouse_conn_id="clickhouse_default")

hook.bulk_insert_rows(
    table="events",
    rows=[("user1", "click"), ("user2", "view")],
    column_names=["user_id", "action"],
    batch_size=1000,
)

Call get_client() to reach the underlying clickhouse-connect client for anything the hook doesn't expose directly:

client = hook.get_client()
total = client.query("SELECT count() FROM events").result_rows[0][0]

Apply session settings

Pass session settings when constructing the hook, either directly or through an operator's hook_params. Settings passed to the constructor are merged on top of any session_settings defined in the connection's Extra field, and the constructor values win on conflicting keys:

hook = ClickHouseHook(
    clickhouse_conn_id="clickhouse_default",
    session_settings={"max_execution_time": 60, "max_threads": 4},
)