본문으로 바로가기
본문으로 바로가기

Flink Connector

ClickHouse Supported

이것은 ClickHouse에서 지원하는 공식 Apache Flink 싱크 커넥터입니다. 이 커넥터는 Flink의 AsyncSinkBase와 공식 ClickHouse java client를 사용하여 구축되었습니다.

이 커넥터는 Apache Flink의 DataStream API를 지원합니다. Table API 지원은 향후 릴리스에서 제공될 예정입니다.

요구 사항

  • Java 11+ (Flink 1.17+용) 또는 17+ (Flink 2.0+용)
  • Apache Flink 1.17+

이 커넥터는 Flink 1.17+와 Flink 2.0+를 모두 지원할 수 있도록 두 개의 아티팩트로 나뉘어 있습니다. 사용하려는 Flink 버전에 맞는 아티팩트를 선택하십시오:

Flink 버전아티팩트ClickHouse Java Client 버전필요한 Java
latestflink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.1flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.0flink-connector-clickhouse-2.0.00.9.5Java 17+
1.20.2flink-connector-clickhouse-1.170.9.5Java 11+
1.19.3flink-connector-clickhouse-1.170.9.5Java 11+
1.18.1flink-connector-clickhouse-1.170.9.5Java 11+
1.17.2flink-connector-clickhouse-1.170.9.5Java 11+
참고

이 커넥터는 Flink 1.17.2 이전 버전에서는 테스트되지 않았습니다.

설치 및 설정

의존성으로 추가하기

<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-2.0.0</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>
<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-1.17</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>

바이너리 다운로드

바이너리 JAR의 파일명 패턴은 다음과 같습니다:

flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar

여기서:

사용 가능한 모든 JAR 릴리스 파일은 Maven Central Repository에서 확인할 수 있습니다.

DataStream API 사용

스니펫

원시 CSV 데이터를 ClickHouse에 삽입하려고 한다고 가정하겠습니다:

public static void main(String[] args) {
    // ClickHouseClient 구성
    ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);

    // ElementConverter 생성
    ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);

    // sink를 생성하고 `setClickHouseFormat`을 사용해 형식을 설정
    ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
            convertorString,
            MAX_BATCH_SIZE,
            MAX_IN_FLIGHT_REQUESTS,
            MAX_BUFFERED_REQUESTS,
            MAX_BATCH_SIZE_IN_BYTES,
            MAX_TIME_IN_BUFFER_MS,
            MAX_RECORD_SIZE_IN_BYTES,
            clickHouseClientConfig
    );

    csvSink.setClickHouseFormat(ClickHouseFormat.CSV);

    // 마지막으로 DataStream을 sink에 연결
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Path csvFilePath = new Path(fileFullName);
    FileSource<String> csvSource = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
            .build();

    env.fromSource(
            csvSource,
            WatermarkStrategy.noWatermarks(),
            "GzipCsvSource"
    ).sinkTo(csvSink);
}

추가 예시와 스니펫은 테스트 코드에서 확인할 수 있습니다:

빠른 시작 예제

ClickHouse 싱크를 쉽게 시작할 수 있도록 Maven 기반 예제를 만들었습니다:

더 자세한 지침은 Example Guide를 참조하십시오.

DataStream API 연결 옵션

ClickHouse 클라이언트 옵션

매개변수설명기본값필수 여부
url완전한 ClickHouse URL해당 없음
usernameClickHouse 데이터베이스 사용자 이름해당 없음
passwordClickHouse 데이터베이스 비밀번호해당 없음
databaseClickHouse 데이터베이스 이름해당 없음
tableClickHouse 테이블 이름해당 없음
optionsJava 클라이언트 구성 옵션 맵빈 맵아니요
serverSettingsClickHouse 서버 세션 설정 맵빈 맵아니요
enableJsonSupportAsStringJSON 데이터 타입에 대해 JSON 형식의 String을 기대하도록 하는 ClickHouse 서버 설정true아니요

optionsserverSettingsMap<String, String> 형식으로 클라이언트에 전달해야 합니다. 둘 중 하나에 빈 맵을 사용하면 각각 클라이언트 또는 서버의 기본값이 사용됩니다.

참고

사용 가능한 모든 Java 클라이언트 옵션은 ClientConfigProperties.java이 문서 페이지에 나와 있습니다.

사용 가능한 모든 서버 세션 설정은 이 문서 페이지에 나와 있습니다.

예시는 다음과 같습니다:

Map<String, String> javaClientOptions = Map.of(
    ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
    ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
    ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
    ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);

Map<String, String> serverSettings = Map.of(
    "insert_deduplicate", "1"
);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
    url,
    username,
    password,
    database,
    tableName,
    javaClientOptions,
    serverSettings,
    false // enableJsonSupportAsString
);

싱크 옵션

다음 옵션은 Flink의 AsyncSinkBase에서 직접 제공됩니다:

ParametersDescriptionDefault ValueRequired
maxBatchSize단일 배치에 삽입되는 최대 레코드 수N/A
maxInFlightRequests싱크가 백프레셔를 적용하기 전에 허용되는 진행 중 요청의 최대 수N/A
maxBufferedRequests백프레셔가 적용되기 전에 싱크에서 버퍼링할 수 있는 최대 레코드 수N/A
maxBatchSizeInBytes배치가 가질 수 있는 최대 크기(바이트)입니다. 전송되는 모든 배치는 이 크기보다 작거나 같게 됩니다N/A
maxTimeInBufferMS플러시되기 전에 레코드가 싱크에 머무를 수 있는 최대 시간N/A
maxRecordSizeInBytes싱크가 허용하는 최대 레코드 크기이며, 이를 초과하는 레코드는 자동으로 거부됩니다N/A

지원되는 데이터 타입

아래 표는 Flink에서 ClickHouse로 데이터를 삽입할 때의 데이터 타입 변환을 빠르게 참조할 수 있도록 제공합니다.

Java 타입ClickHouse 타입지원 여부직렬화 방식
byte/ByteInt8DataWriter.writeInt8
short/ShortInt16DataWriter.writeInt16
int/IntegerInt32DataWriter.writeInt32
long/LongInt64DataWriter.writeInt64
BigIntegerInt128DataWriter.writeInt128
BigIntegerInt256DataWriter.writeInt256
short/ShortUInt8DataWriter.writeUInt8
int/IntegerUInt8DataWriter.writeUInt8
int/IntegerUInt16DataWriter.writeUInt16
long/LongUInt32DataWriter.writeUInt32
long/LongUInt64DataWriter.writeUInt64
BigIntegerUInt64DataWriter.writeUInt64
BigIntegerUInt128DataWriter.writeUInt128
BigIntegerUInt256DataWriter.writeUInt256
BigDecimalDecimalDataWriter.writeDecimal
BigDecimalDecimal32DataWriter.writeDecimal
BigDecimalDecimal64DataWriter.writeDecimal
BigDecimalDecimal128DataWriter.writeDecimal
BigDecimalDecimal256DataWriter.writeDecimal
float/FloatFloatDataWriter.writeFloat32
double/DoubleDoubleDataWriter.writeFloat64
boolean/BooleanBooleanDataWriter.writeBoolean
StringStringDataWriter.writeString
StringFixedStringDataWriter.writeFixedString
LocalDateDateDataWriter.writeDate
LocalDateDate32DataWriter.writeDate32
LocalDateTimeDateTimeDataWriter.writeDateTime
ZonedDateTimeDateTimeDataWriter.writeDateTime
LocalDateTimeDateTime64DataWriter.writeDateTime64
ZonedDateTimeDateTime64DataWriter.writeDateTime64
int/IntegerTime해당 없음
long/LongTime64해당 없음
byte/ByteEnum8DataWriter.writeInt8
int/IntegerEnum16DataWriter.writeInt16
java.util.UUIDUUIDDataWriter.writeIntUUID
StringJSONDataWriter.writeJSON
Array<Type>Array<Type>DataWriter.writeArray
Map<K,V>Map<K,V>DataWriter.writeMap
Tuple<Type,..>Tuple<T1,T2,..>DataWriter.writeTuple
ObjectVariant해당 없음

참고:

  • 날짜 연산을 수행할 때는 ZoneId를 제공해야 합니다.
  • decimal 연산을 수행할 때는 정밀도와 스케일을 지정해야 합니다.
  • ClickHouse가 Java String을 JSON으로 파싱할 수 있도록 하려면 ClickHouseClientConfig에서 enableJsonSupportAsString을 활성화해야 합니다.
  • 커넥터는 입력 DataStream의 요소를 ClickHouse 페이로드에 매핑하기 위해 ElementConvertor가 필요합니다. 이를 위해 커넥터는 ClickHouseConvertorPOJOConvertor를 제공하며, 위의 DataWriter 직렬화 메서드를 사용해 이 매핑을 구현할 수 있습니다.

지원되는 입력 형식

사용 가능한 ClickHouse 입력 형식 목록은 이 문서 페이지ClickHouseFormat.java에서 확인할 수 있습니다.

커넥터가 DataStream을 ClickHouse 페이로드로 직렬화할 때 사용할 형식을 지정하려면 setClickHouseFormat 함수를 사용하십시오. 예시는 다음과 같습니다.

ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
        convertorString,
        MAX_BATCH_SIZE,
        MAX_IN_FLIGHT_REQUESTS,
        MAX_BUFFERED_REQUESTS,
        MAX_BATCH_SIZE_IN_BYTES,
        MAX_TIME_IN_BUFFER_MS,
        MAX_RECORD_SIZE_IN_BYTES,
        clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
참고

기본적으로 커넥터는 ClickHouseClientConfigsetSupportDefault가 명시적으로 true 또는 false로 설정된 경우, 각각 RowBinaryWithDefaults 또는 RowBinary를 사용합니다.

메트릭

이 커넥터는 Flink의 기존 메트릭 외에 다음과 같은 추가 메트릭을 노출합니다:

MetricDescriptionTypeStatus
numBytesSend요청 페이로드에서 ClickHouse로 전송된 총 바이트 수입니다. 참고: 이 메트릭은 네트워크를 통해 전송된 직렬화 데이터의 크기를 측정하므로, 처리 후 스토리지에 실제로 기록된 바이트 수를 반영하는 system.query_log의 ClickHouse written_bytes와 다를 수 있습니다Counter
numRecordSendClickHouse로 전송된 총 레코드 수입니다Counter
numRequestSubmitted전송된 총 요청 수입니다(실제로 수행된 flush 횟수)Counter
numOfDroppedBatches재시도할 수 없는 실패로 인해 폐기된 총 배치 수입니다Counter
numOfDroppedRecords재시도할 수 없는 실패로 인해 폐기된 총 레코드 수입니다Counter
totalBatchRetries재시도 가능한 실패로 인한 총 배치 재시도 횟수입니다Counter
writeLatencyHistogram쓰기 성공 지연 시간 분포(ms) 히스토그램입니다Histogram
writeFailureLatencyHistogram쓰기 실패 지연 시간 분포(ms) 히스토그램입니다Histogram
triggeredByMaxBatchSizeCountermaxBatchSize에 도달해 발생한 총 flush 횟수입니다Counter
triggeredByMaxBatchSizeInBytesCountermaxBatchSizeInBytes에 도달해 발생한 총 flush 횟수입니다Counter
triggeredByMaxTimeInBufferMSCountermaxTimeInBufferMS에 도달해 발생한 총 flush 횟수입니다Counter
actualRecordsPerBatch실제 배치 크기 분포 히스토그램입니다Histogram
actualBytesPerBatch배치당 실제 바이트 수 분포 히스토그램입니다Histogram

제한 사항

  • 이 싱크는 현재 at-least-once 전달을 보장합니다. exactly-once semantics 지원 작업은 여기에서 진행 상황을 추적하고 있습니다.
  • 이 싱크는 아직 처리할 수 없는 레코드를 버퍼링하기 위한 데드 레터 큐(DLQ)를 지원하지 않습니다. 현재로서는 커넥터가 삽입에 실패한 레코드를 다시 삽입하려고 시도하며, 그래도 성공하지 못하면 해당 레코드를 버립니다. 이 기능은 여기에서 진행 상황을 추적하고 있습니다.
  • 이 싱크는 아직 Flink의 Table API 또는 Flink SQL을 통한 생성을 지원하지 않습니다. 이 기능은 여기에서 진행 상황을 추적하고 있습니다.

ClickHouse 버전 호환성 및 보안

  • 이 커넥터는 일일 CI 워크플로를 통해 최신 버전과 head를 포함한 여러 최신 ClickHouse 버전에서 테스트됩니다. 테스트 대상 버전은 새로운 ClickHouse 릴리스가 적용됨에 따라 주기적으로 업데이트됩니다. 커넥터가 매일 어떤 버전에서 테스트되는지는 여기에서 확인하십시오.
  • 알려진 보안 취약점과 취약점 보고 방법은 ClickHouse 보안 정책을 참조하십시오.
  • 보안 수정 사항과 새로운 개선 사항을 놓치지 않도록 커넥터를 지속적으로 업그레이드할 것을 권장합니다.
  • 마이그레이션 관련 문제가 있으면 GitHub issue를 생성해 주십시오. 확인 후 응답하겠습니다.
  • 최적의 성능을 위해 DataStream 요소 타입이 Generic 타입이 아니어야 합니다. Flink의 타입 구분에 대해서는 여기를 참조하십시오. Generic이 아닌 요소는 Kryo로 인해 발생하는 직렬화 오버헤드를 피할 수 있어 ClickHouse로의 처리량을 향상시킵니다.
  • maxBatchSize는 최소 1000으로, 이상적으로는 10,000~100,000 범위로 설정하는 것을 권장합니다. 자세한 내용은 대량 삽입에 대한 이 가이드를 참조하십시오.
  • ClickHouse에서 OLTP 스타일의 중복 제거 또는 upsert를 수행하려면 이 문서 페이지를 참조하십시오. 참고: 이는 아래의 중복 배치에서 자세히 설명하는, 재시도 시 발생하는 배치 중복 제거와 혼동하면 안 됩니다.

문제 해결

CANNOT_READ_ALL_DATA

다음과 같은 오류가 발생할 수 있습니다:

com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)

원인: 일반적으로 CANNOT_READ_ALL_DATA 오류는 ClickHouse 테이블 스키마가 Flink 레코드 스키마와 서로 달라졌음을 의미합니다. 이는 둘 중 하나 또는 둘 다가 하위 호환되지 않는 방식으로 변경될 때 발생할 수 있습니다.

해결 방법: ClickHouse 테이블 또는 커넥터 입력 데이터 타입 중 하나(또는 둘 다)의 스키마를 업데이트하여 서로 호환되도록 하십시오. 필요한 경우, Java 타입을 ClickHouse 타입으로 조회하는 방법은 type mapping을 참조하십시오. 참고: 아직 전송 중인 레코드가 남아 있다면 커넥터를 다시 시작할 때 Flink 상태를 재설정해야 합니다.

낮은 처리량

ClickHouse에 쓸 때 커넥터의 처리량이 작업의 병렬성(Flink 태스크 수)에 비례해 확장되지 않을 수 있습니다.

원인: ClickHouse의 백그라운드 파트 머지 프로세스로 인해 insert 속도가 느려질 수 있습니다. 이는 구성된 배치 크기가 너무 작거나, 커넥터가 너무 자주 플러시하거나, 또는 이 두 가지가 함께 작용할 때 발생할 수 있습니다.

해결 방법: numRequestSubmittedactualRecordsPerBatch 메트릭을 모니터링하여 배치 크기(maxBatchSize)를 어떻게 조정할지, 그리고 얼마나 자주 플러시할지를 판단하십시오. 또한 배치 크기 권장 사항은 고급 및 권장 사용법도 참조하십시오.

ClickHouse 테이블에서 일부 행이 누락됩니다

원인: 재시도할 수 없는 오류가 발생했거나, 구성된 재시도 횟수(ClickHouseClientConfig.setNumberOfRetries()를 통해 설정 가능) 내에 배치를 삽입하지 못해 배치가 폐기되었습니다. 참고: 기본적으로 커넥터는 배치를 폐기하기 전에 최대 3번까지 다시 삽입을 시도합니다.

해결 방법: 근본 원인을 파악하려면 TaskManager 로그 및/또는 스택 추적을 확인하십시오.

기여 및 지원

프로젝트에 기여하거나 문제를 보고하려는 경우, 의견을 보내주시면 감사하겠습니다! 문제를 등록하거나, 개선 사항을 제안하거나, pull request를 제출하려면 GitHub repository를 방문하십시오.

기여를 환영합니다! 시작하기 전에 저장소의 contribution guide를 확인하십시오. ClickHouse Flink 커넥터 개선에 도움을 주셔서 감사합니다!