카프카 커넥트를 데이터 파이프라인으로 사용하는 이유? kafka-sink-connector 오픈소스 언빡싱!

들어가며

안녕하세요, 광고추천팀에서 데이터 엔지니어로 일하고 있는 cory 입니다. 저는 광고추천팀에서 카프카(Kafka) 기반 스트림 데이터 플랫폼을 개발 및 운영하고 있습니다. 

광고추천팀에서는 노출(impression), 클릭(click), 전환(conversion) 등의 광고 로그 데이터를 원천 데이터라고 부르며, 이 원천 데이터 분석을 기반으로 개인화된 광고를 서빙(Serving)하는 작업을 진행합니다. 팀 내 데이터 사이언티스트 크루들이 원활하게 데이터를 분석할 수 있도록 하려면 데이터 처리 프로세스(Extraction Transformation Load, 이하 ETL)가 선행되어야 합니다. 제가 속한 광고추천데이터플랫폼파트에서는 대용량 대규모 데이터인 광고 로그 데이터를 원활히 ETL 프로세싱하기 위해, 제네시스라는 이름의 카프카 기반 데이터 플랫폼을 운영하고 있습니다.

이번 글에서는 제네시스 플랫폼에서 활용하고 있는 커스텀 커넥터인 kafka-sink-connector를 개발한 배경과, 현재 광고추천팀에서 사용하고 있는 방법, 그리고 활용도에 대해서 이야기해 보려 합니다.  

kafka-sink-connector는 2023년 1월에 깃허브에 오픈소스로 공개되었고, 개인 및 회사에서 자유롭게 사용할 수 있습니다. 이 글 마지막에는 kafka-sink-connector의 사용 방법도 간략히 설명합니다.

 

kafka-sink-connector Github repository: https://github.com/kakao/kafka-sink-connector​​

광고 스트림 데이터의 지면별 처리

광고에서는 지면(placement, 또는 at)이라고 불리는 단위가 있습니다. 지면은 웹사이트 또는 모바일 애플리케이션의 특정 영역을 뜻하며, 지면에서 광고를 게재합니다. 대부분의 지면에는 개인화 추천 과정을 거쳐 pCTR(predicted click-through rate; 예상 클릭률)이 높은 추천 점수를 받은 광고를 노출합니다. 

광고추천팀에서 사용하는 광고 데이터는 지면이라고 불리는 개별 광고의 노출(impression), 클릭(click), 전환(conversion) 데이터를 ETL 과정을 거친 후 모델 학습 수행에 사용하고 있습니다. 문제는 데이터가 지면 단위로 분리되어 제공되지 않는다는 점입니다. 광고가 게시될 수 있는 인터넷 세상에는 엄청나게 방대한 양의 지면이 존재하며, 지면들은 시시각각 줄어들고 늘어나기를 반복하고 있습니다. 그렇기 때문에 ‘원천 광고 스트림 데이터’라고 불리는 거대한 규모의 스트림 데이터를 카프카 토픽을 통해 제공되고 있으며, 해당 데이터를 실시간 모델 학습용으로 사용하고 있습니다. 카프카는 대규모 이벤트 데이터를 실시간으로 처리하기 적합한 플랫폼이기 때문에 실시간 광고 데이터 처리에 적극적으로 사용하고 있습니다.

이런 과정을 거쳐 제공하는 ‘원천 광고 스트림 데이터’를 가공하지 않고 그대로 실시간 모델 학습용으로 사용할 수도 있겠지만, 다음과 같은 두 가지 요구사항을 모두 만족하려면 필터링과 분기 같은 작업이 필요합니다.

  1. 지면별 모델과 입찰 전략을 다르게 가져감
  2. 모델별 성능을 최적화 하기 위해 학습에 꼭 필요한 데이터만 존재해야 함
 

위 두 가지 요구사항을 만족시키기 위해서는 지면별로 학습에 필요한 스트림 데이터가 별도로 필요합니다. 즉, ‘원천 광고 스트림 데이터’를 소스(source)로 하고 필요한 데이터만 추출(sink) 하는 스트림 데이터 파이프라인 이 필요하다는 것이죠. 이미 눈치채신 분도 계시겠지만, 이런 지면별 스트림에서 데이터를 추출할 때, 데이터의 분기와 필터링의 생성, 수정 및 삭제는 빈번히 일어납니다. 이런 상황에서 스트림 데이터 파이프라인의 효율적인 운영과 플랫폼화를 위해, 카프카 커넥트를 기반으로 하는 제네시스 플랫폼과 kafka-sink-connector를 만들었습니다.

제네시스 플랫폼

제네시스는 카프카 커넥트 기반 데이터 플랫폼으로서 기존 로그스태시 기반 파이프라인을 개선하기 위해 신규 개발한 플랫폼입니다. 특히 파이프라인의 오너십, 모니터링, 배포, 데이터 리니지(lineage, 계보)를 달성하기 위해 만들어졌습니다. 자세한 내용은 제네시스 – 광고추천팀의 카프카 기반 스트리밍 데이터 플랫폼(링크)를 참고해 주세요.

kafka-sink-connector 소개

제네시스의 핵심 도구인 카프카 커넥트는, 카프카와 외부 시스템(데이터베이스 등)과 연동 목적으로 사용하는 도구로 그 사용 범위를 한정지어 설명하곤 합니다. 하지만 “데이터베이스와 연동이 아닌 다른 방식으로도 사용할 수 있지 않을까?” 하고 고민하기 시작했습니다. 

많은 기능들 중에서도 카프카 커넥트가 가지고 있는 확장성, 재사용성, 편리한 운영과 관리 및 고가용성 특징을 고려하면, 카프카 카넥트는 스트림 데이터 파이프라인 그 자체로서도 완벽하다고 생각했습니다.

이런 생각 끝에 우리는 로그스태시(Logstash)의 필터링 기능을 대체할 카프카 커스텀 커넥터(Kafka Custom Connector)를 만들기로 결심했습니다. 그 결과물이 바로 kafka-sink-connector입니다. kafka-sink-connector는 특정 카프카 토픽을 소스로 데이터를 가져와서 필터링, 샘플링, 타임스탬프 주입, 메시지 키 주입 등의 기능을 수행할 수 있는 싱크 커넥터입니다.

커스텀 커넥터란?: 오픈소스 아파치 카프카에서 공식적으로 제공하는 인터페이스를 사용하여 개발자가 만든 싱크 커넥터 또는 소스 커넥터. 플러그인 JAR 파일을 생성하고, 카프카 커넥터에 생성한 JAR 파일을 포함시켜 실행하면, 커넥터를 반복해서 생성되는 파이프라인 도구로 사용할 수 있음.

흔히 주변에서 많이 사용하는 오픈소스 커넥터들과 다르게, 커스텀 커넥터는 파이프라인을 직접 정의하고 코드를 작성하여 개발해야 합니다. 그렇기 때문에 커스텀 커넥터의 내부 동작을 제대로 이해하고 개발하는 것이 중요합니다.

커스텀 커넥터를 개발하는 방법

오픈소스 아파치 카프카에서 공식적으로 제공하는 SinkTask, SinkConnector 인터페이스를 사용하면, 라이센스를 걱정할 필요 없이 커스텀 커넥터를 개발할 수 있습니다. 

커스텀 커넥터를 개발하는 방법에 대해 kafka-sink-connector의 소스코드를 보며 짧게 설명드리겠습니다. 먼저 build.gradle에 connect-api를 추가합니다. connect-api 라이브러리에는 커스텀 커넥터를 개발하기 위한 인터페이스들이 존재합니다.

				
					dependencies {
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'

    implementation "org.apache.kafka:connect-api:${connectApi}"
    // ... 추가로 필요한 디펜던시
}

				
			
다음으로 커넥터에서 사용할 컨피그를 설정합니다. connect-api 라이브러리는 AbstractConfig 클래스를 제공합니다. 이 클래스를 상속받는 컨피그 클래스를 다음 예시 코드처럼 설정합니다. 컨피그를 설정할 때 필요한 정보로는 컨피그 이름, 컨피그 타입, 컨피그 기본 값, 컨피그 중요도 그리고 컨피그 설명이 있습니다. 컨피그가 중요한 이유는, 커넥터를 파이프라인 용도로 반복적으로 생성할 때 컨피그를 사용하기 때문입니다. 그렇기 때문에 요구사항 중에서, 동적 해결할 수 있는 요구사항이 있다면 최대한 컨피그에 선언하여 생성합니다.
				
					public class KafkaSinkConnectorConfig extends AbstractConfig {

    public static final String SOURCE_TOPIC = "topics";
    public static final String SOURCE_TOPIC_DEFAULT_VALUE = "my-topic";
    public static final String SOURCE_TOPIC_DOC = "Define source topic";

    public static final String SINK_TOPIC = "kafka.sink.topic";
    public static final String SINK_TOPIC_DEFAULT_VALUE = "test";
    public static final String SINK_TOPIC_DOC = "Define sink topic";

    // ... 중략

    public static ConfigDef CONFIG = new ConfigDef()
            .define(SOURCE_TOPIC, Type.STRING, SOURCE_TOPIC_DEFAULT_VALUE, Importance.HIGH, SOURCE_TOPIC_DOC)
            .define(SINK_TOPIC, Type.STRING, SINK_TOPIC_DEFAULT_VALUE, Importance.HIGH, SINK_TOPIC_DOC)
            .define(FILTERING_CONDITION, Type.STRING, FILTERING_CONDITION_DEFAULT_VALUE, Importance.HIGH, FILTERING_CONDITION_DOC)
            .define(SINK_BOOTSTRAP_SERVER, Type.STRING, SINK_BOOTSTRAP_SERVER_DEFAULT_VALUE, Importance.HIGH, SINK_BOOTSTRAP_SERVER_DOC)
            // ... 중략
            ;

    public KafkaSinkConnectorConfig(Map<String, String> props) {
        super(CONFIG, props);
    }
}

				
			
다음으로 작성할 것은 SinkConnector SinkTask 클래스를 상속받는 커스텀 커넥터 클래스입니다. SinkConnectorSinkTask를 실행시키고 컨피그를 분배하기 위한 도구로 사용하며, 실질적인 데이터 처리와는 무관합니다. 그렇기 때문에 데이터를 처리하는 로직은 SinkTask에 위치한다고 볼 수 있습니다. SinkTask 클래스를 상속할 경우 다음과 같은 3가지 메소드를 작성해야 합니다. 각 메소드에 대한 설명은 주석을 참고해 주시기 바랍니다.
				
					public abstract class SinkTask implements Task {
    // 태스크가 실행될 때 리소스를 초기화하는 로직이 구현되어야 하는 메서드. 태스크가 실행될 때 최초 한번 실행됩니다.
    public abstract void start(Map<String, String> props);
    
    // 카프카 컨슈머의 poll()과 유사하게 SinkRecord가 Collection 형태로 넘어와서 파라미터로 사용할 수 있습니다.
    // 실질적인 토픽의 레코드별 처리 로직이 여기에 들어간다고 볼 수 있습니다.
    public abstract void put(Collection<SinkRecord> records);
    
    // 태스크가 종료될 때 리소스를 종료하는 로직이 포함될 메서드. 태스크가 종료될 때 한번 실행됩니다.
    public abstract void stop();
}
				
			
SinkTask 클래스를 상속받은 KafkaSinkTask 클래스의 내부 로직 중 put() 내부 구문은 다음과 같습니다. SinkRecord에서 필요한 데이터를 추출, 필터링, 샘플링하여 sendRecord()를 통해 또 다른 클래스로 전달하는 것을 알 수 있습니다.
				
					@Override
public void put(Collection<SinkRecord> records) {
    if (samplingEnabled) {
        for (SinkRecord record : records) {
            if (record.value() != null) {
                try {
                    String value = record.value().toString();
                    boolean samplingCondition = Math.random() < samplingPercentage;
                    if (samplingCondition
                            && isFilteringMatch(value)
                    ) {
                        sendRecord(value);
                    }
            // ... 중략
        }
    }
}
private void sendRecord(String value) {
    ProducerRecord<String, String> sendRecord = getSinkRecord(value);
    producer.send(sendRecord, new ProducerCallback());
}

				
			
위 코드를 보면, 커스텀 커넥터라는 단어에 겁먹을 필요가 없다는 걸 알 수 있습니다. 여러분들이 이제껏 많이 개발했거나, 보았던 카프카 컨슈머의 코드와 비교해 보세요. 매우 비슷한 부분도 있고, 더 쉬워 보이는 부분도 있지 않나요? 위 코드를 보면, 컨슈머의 poll()로 리턴되는 ConsumerRecords 대신 Collection로 가져와서 로직을 수행하는 것을 볼 수 있습니다. 다른 부분은 컨슈머는 그 자체로서 애플리케이션 실행 단위가 되지만, 커넥터는 스레드 단위로 커넥트 클러스터 위에서 태스크로 실행된다는 점입니다.

kafka-sink-connector 설명

kafka-sink-connector는 다음과 같은 기능을 가지고 있습니다.
  • 레코드를 다른 카프카 클러스터의 특정 토픽으로 전달
  • JSON 기반 데이터 필터링
  • 샘플링
  • 타임스탬프 파싱 및 주입
  • 메시지 키 파싱 및 주입
  • 고성능 프로듀서 옵션(linger.ms, batch.size)
위 기능들은 언제, 어떻게 사용 가능한지 사례별로 응용 방법을 소개하겠습니다.

kafka-sink-connector 응용 방법

1. 필터링과 분기

데이터 필터링, 분기는 앞서 설명드린 광고추천팀 사례처럼 거대한 원천 데이터에서 필요한 데이터만 추출할 때 유용합니다. kafka-sink-connector는 jsonPath를 사용하여 JSON 기반 필터링을 수행합니다. 예를 들어 다음과 같은 데이터들이 원천 토픽(레코드의 메시지 값)에 존재한다고 가정해 봅시다.
				
					{
    "color":"red",
    "name":"apple"
}
{
    "color":"yellow",
    "name":"banana"
}
{
    "color":"orange",
    "name":"tangerine"
}

				
			

위 데이터 중 색이 빨간 과일을 특정 토픽으로 전달해야 할 때는 다음과 같은 컨피그로 커넥터를 생성할 수 있습니다.

				
					{
    "name": "red-color-filtering-pipeline",
    "config":
    {
        "connector.class": "com.kakao.connector.kafka.KafkaSinkConnector",
        "kafka.filtering.condition": "$.[?(@.color == 'red')]",
        "kafka.sink.bootstrap": "another-kafka:9092",
        "kafka.sink.topic": "red-color-fruit",
        "tasks.max": "1",
        "topics": "fruits"
    }
}

				
			
이후에 fruits 토픽에 "color":"red" 인 데이터가 들어간다면 모두 red-color-fruit 토픽으로 전달되겠죠. 싱크 되는 데이터의 토픽은 kafka.sink.topic으로 설정하고 해당 토픽이 위치하는 카프카 클러스터는 kafka.sink.bootstrap으로 설정합니다. 만약 데이터양이 많아져서 병렬 처리 성능을 높이고 싶다면 tasks.max와 해당 토픽(여기서는 fruits)의 파티션 개수를 늘리면 됩니다. 다음은 광고추천팀에서 원천 데이터를 필터링, 분기하려고 kafka-sink-connector를 사용하여 생성한 파이프라인을 계보로 시각화한 스크린샷입니다. 보안 사유로 인해 토픽 이름을 가린 점은 양해 부탁드립니다.

2. 코파티셔닝

코파티셔닝(co-parititoning)은 카프카 스트림즈를 활용할 때 가장 많이 접하게 되는 용어입니다. 스트림-스트림 조인(join), 스트림-테이블 조인과 같이 KStream, KTable을 활용하여 스트림 데이터를 처리할 때, 코파티셔닝이 되어 있는 두 개의 토픽이 필요합니다. 코파티셔닝이란 서로 다른 두 개의 토픽이 파티션 개수가 같고 동일한 파티셔닝 전략(partition strategy)을 사용하면서, 조인이 되고자하는 데이터가 메시지 키에 주입된 상태를 뜻합니다. 이렇게 두 개의 토픽이 코파티셔닝된 상태여야만, 카프카 스트림즈 내부의 태스크에서 조인을 수행할 수 있습니다. 그래서 코파티셔닝은 조인을 하기 전 반드시 선행되어야만 합니다.

문제는 우리가 사용하는 모든 토픽들이 코파티셔닝된 상태라고 보장할 수 없다는 점입니다. 우리가 직접 프로듀스하고 있는 토픽을 기반으로 데이터를 처리한다면 이 문제는 생각보다 쉽게 해결될 수 있습니다. 파티션 개수를 맞추고 파티셔닝 전략을 맞추면 대부분의 일이 끝나기 때문이죠. 문제는 다른 팀, 다른 조직에서 제공하는 외부 토픽을 사용하는 경우입니다. 이러한 경우 코파티셔닝을 맞추기가 불가능 한 경우가 종종 존재합니다. 또한 스트림 데이터 처리를 위해 레코드에 이벤트 시간을 기준으로 타임스탬프를 주입하는 게 필요할지도 모르겠군요.

이런 문제를 해결하려면 kafka-sink-connector의 타임스탬프/메시지 키 파싱 및 주입 기능을 사용하면 됩니다. 예를 들어 다음과 같은 데이터가 원천으로 들어온다고 가정해 봅시다.

				
					{
    "time":"20230105 193813 741",
    "id":"11341",
    "action":"buy",
    "product":"red sport car"
}

				
			

여기서 레코드의 타임스탬프를 이벤트 발생 시간으로 맞추고, 조인이 되어야 하는 유저의 키를 메시지 키로 주입하려면 다음과 같이 설정하면 됩니다.

				
					{
    "name": "timestmap-messageKey-parsing-pipeline",
    "config":
    {
        "connector.class": "com.kakao.connector.kafka.KafkaSinkConnector",
        "kafka.filtering.condition": "$.[?( @.action=='buy' )]",
        "kafka.sink.bootstrap": "another-kafka:9092",
        "kafka.sink.topic": "user-action-buy-copartition",
        "kafka.key.enabled": "true",
        "kafka.key.field": "@.id",
        "kafka.timestamp.enabled": "true",
        "kafka.timestamp.field": "@.time",
        "kafka.timestamp.format": "yyyyMMdd HHmmss SSS",
        "tasks.max": "1",
        "topics": "user-actions"
    }
}

				
			

이 스트림 데이터 파이프라인을 통해 추출된 user-action-buy-copartition 토픽은 스트림 데이터 처리를 위한 모든 준비가 완료되었습니다. 이제 카프카 스트림즈를 사용하여 스트림 프로세싱을 진행하면 됩니다.

3. 미러링과 샘플링

데이터 미러링을 하려면, 최신 카프카 커넥트가 제공하는 미러메이커2(MirrorMaker2)를 사용해도 됩니다. 그러나 미러메이커2를 사용하는 방법은 과하다고 느끼는 분도 있을 겁니다. 미러메이커2는 재해복구(Disaster recovery)에 좀 더 적합하고, 관련 기능을 많이 내재하고 있기 때문입니다. 그래서 컨슈머 그룹의 오프셋과 토픽의 오프셋, 타임스탬프까지 완벽하게 동기화(sync) 할 필요 없이, 단순히 데이터를 다른 카프카에 전달만 해야 할 경우에는 kafka-sink-connector가 오히려 적합할 수 있습니다. 

예를 들어 상용 환경에 있는 user-action 이벤트 데이터 중 일부를 분석하기 위해 50% 샘플링하여 분석 전용 카프카 클러스터에 가져온다고 가정해 봅시다. 그러한 경우에는 다음과 같이 커넥터를 설정하여 만들 수 있습니다.

				
					{
    "name": "anaysis-sampling-pipeline",
    "config":
    {
        "connector.class": "com.kakao.connector.kafka.KafkaSinkConnector",
        "kafka.filtering.condition": "$.[?( @.action=='buy' )]",
        "kafka.sink.bootstrap": "analysis-kafka:9092",
        "kafka.sink.topic": "analysis",
        "kafka.sampling.enabled": "true",
        "kafka.sampling.percentage": "0.5",
        "tasks.max": "1",
        "topics": "user-actions"
    }
}

				
			

맺음말

이상으로 kafka-sink-connector에 대해 알아보았습니다. 더불어 커넥터를 활용할 수 있는 방안도 컨피그와 함께 자세히 살펴보았습니다. 오픈소스로 공개된 kafka-sink-connector는 카프카 커넥트를 이미 사용하고 있는 조직은 파이프라인을 손쉽게 확장할 수 있도록, 커넥트를 도입하지 않은 조직은 커넥트를 쉽게 도입할 수 있도록 도와줍니다. 이 글을 읽으면서 kafka-sink-connector가 할 수 있는 기능이 요구사항을 해결하는데 적합하다는 생각이 들었다면 한번 사용해 보시는 건 어떠신가요? 아무쪼록 kafka-sink-connector가 카프카와 함께 확장성이 높은 비상태(Stateless) 기반 스트림 데이터 파이프라인을 만들고 운영하는 데 큰 도움이 되었으면 좋겠습니다. 감사합니다.

이 자리를 빌려 오픈소스를 발행하고 오픈하는데 큰 도움을 준 광고추천팀 광고추천데이터플랫폼파트 ethan.1752(박경환)께 감사의 인사를 드립니다.

카카오톡 공유 보내기 버튼

Latest Posts

대학생 멘토링: 우물 밖 내다보기

https://www.youtube.com/watch?v=LGdb4Q5t-gw 안녕하세요. 저는 카카오톡 톡플랫폼개발팀에서 백엔드 서버 개발자로 일하고 있는 nano.son(손은호)입니다. 2023년 1월 16일, 제 모교인 경북대학교의 학생들과 판교 아지트에서 멘토링을 진행했습니다.