카프카 스트림즈
토픽에 적재된 데이터를 상태기반(Stateful) 또는 비상태기반(Stateless)으로 실시간 변환하여 다른 토픽에 적재하는 라이브러리이다.
스트림즈는 카프카에서 공식적으로 지원하는 라이브러리이기 때문에, 자바 기반의 스트림즈 애플리케이션은 카프카 클러스터와 완벽하게 호환되며 스트림 처리에 필요한 편리한 기능(신규 토픽 생성, 상태 저장, 데이터 조인)들을 제공한다.
스트림즈 애플리케이션 또는 카프카 브로커에 장애가 발생하더라도 장해 허용 시스템을 가지고 있기 때문에 데이터 처리 안정성이 매우 뛰어나다.
스트림즈 애플리케이션은 내부적으로 스레드를 1개 이상 생성할 수 있으며, 스레드는 1개 이상의 태스크를 가진다.
스트림즈의 태스크는 스트림즈 애플리케이션을 실행하면 생기는 데이터 처리 최소 단위이다.
카프카 스트림즈는 컨슈머 스레드를 늘리는 방법과 동일하게 병렬처리를 위해 파티션과 스트림즈 스레드 개수를 늘림으로써 처리량을 늘릴 수 있다.
토폴로지
2개 이상의 노드들과 선으로 이루어진 집합을 뜻한다. 토폴로지의 종류로는 링형(ring), 트리형(tree), 성형(star) 등이 있는데 스트림즈에서 사용하는 토폴로지는 트리 형태와 유사하다.
카프카 스트림즈에서는 토폴로지를 이루는 노드를 하나의 '프로세서(processor)'라고 부르고 노드와 노드를 이은 선을 '스트림(stream)'이라고 부른다. 스트림은 토픽의 데이터를 뜻하는데, 프로듀서와 컨슈머에서 활용했던 레코드와 동일하다.
3가지의 프로세서가 존재한다.
소스 프로세서 - 데이터를 처리하기 위해 최초로 선언해야 하는 노드. 하나 이상의 토픽에서 데이터를 가져오는 역할
스트림 프로세서 - 다른 프로세서가 반환한 데이터를 처리하는 역할. 변환/분기처리와 같은 로직
싱크 프로세서 - 데이터를 특정 카프카 토픽으로 저장하는 역할.
[카프카 스트림즈 토폴로지의 구성요소 이미지]
스트림즈DSL (Domain Specific Language)과 프로세서 API 2가지 방법으로 개발 가능하다.
스트림즈DSL은 스트림 프로세싱에 쓰이는 기능을 자체 API로 제공하며, 그 외에 제공되지 않는 기능은 프로세서 API를 사용하여 구현할 수 있다.
스트림즈DSL로 구현하는 데이터 처리 예시
- 메시지 값을 기반으로 토픽 분기처리
- 지난 10분간 들어온 데이터의 개수 집계
- 토픽과 다른 토픽의 결합으로 새로운 데이터 생성
프로세서 API로 구현하는 데이터 처리 예시
- 메시지 값의 종류에 따라 토픽을 가변적으로 전송
- 일정한 시간 간격으로 데이터 처리
스트림즈DSL
스트림즈DSL에는 레코드의 흐름을 추상화한 3가지 개념인 KStream, KTable, GlobalKTable이 있다.
KStream
레코드의 흐름을 표현한 것으로 메시지 키와 메시지 값으로 구성되어 있다. KStream으로 데이터를 조회하면 토픽에 존재하는 모든 레코드가 출력된다. KStream은 컨슈머로 토픽을 구독하는 것과 동일한 선상에서 사용하는 것이라 볼 수 있다.
KTable
KStream과 다르게 메시지 키를 기준으로 묶어서 사용한다. KStream은 토픽의 모든 레코드를 조회할 수 있지만, KTable은 유니크한 메시지 키를 기준으로 가장 최신 레코드를 사용한다. KTable로 데이터를 조회하면 메시지 키를 기준으로 가장 최신에 추가된 레코드의 데이터가 출력된다. 새로운 데이터를 적재할 때 동일한 메시지 키가 있을 경우 데이터가 업데이트된 거소가 같은 형태이다.
GlobalKTable
KTable과 동일하게 메시지 키를 기준으로 묶어서 사용된다. 그러나 KTable로 선언된 토픽은 1개 파티션이 1개 태스크에 할당되어 사용되고, GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당된다.
KStream과 KTable이 데이터 조인(join)을 수행할 때는 반드시 코파티셔닝(co-partitioning)이 되어 있어야 한다.
* 코파티셔닝(co-partitioning) : 조인을 하는 2개 데이터의 파티션 개수가 동일하고 파티셔닝 전락을 동일하게 맞추는 작업
파티션 개수가 동일하고 파티셔닝 전략이 같은 경우에는 동일한 메시지 키를 가진 데이터가 동일한 태스크에 들어가는 것을 보장한다. 이를 통해 각각의 메시지 키가 동일할 경우 조인을 수행할 수 있다.
하지만 KStream과 KTable로 사용하는 2개의 토픽이 파티션 개수가 다를 수도 있고 파티션 전략이 다를 수 있으며, 이런 경우에는 조인을 수행할 수 없다. 코파티셔닝이 되어있지 않으면 KStream 또는 KTable을 리파티셔닝하는 과정을 거쳐야 한다.
* 리파티셔닝(repartitioning) : 새로운 토픽에 새로운 메시지 키를 가지도록 재배열하는 과정
하지만 GlobalKTable으로 정의된 데이터는 스트림즈 애플리케이션의 모든 태스크에 동일하게 공유되어 사용되기 때문에, KTable 대신 GlobalKTable을 사용한다면 코파티셔닝을 하지 않아도 된다. 단, 각 태스크마다 GlobalKTable로 정의된 모든 데이터를 저장하고 사용하기 때문에 로컬 스토리지의 사용량이 증가하고 브로커에 부하가 생기므로 작은 용량의 데이터일 경우에만 사용하는 것이 좋다.
스트림즈DSL 주요 옵션
필수 옵션
- bootstrap.servers : 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성한다.
- application.id : 스트림즈 애플리케이션을 구분하기 위한 고유한 아이디를 설정한다. 다른 로직을 가진 스트림즈 애플리케이션들은 서로 다른 application.id 값을 가져야 한다.
선택 옵션
- default.key.serde : 레코드의 메시지 키를 직렬화, 역직렬화하는 클래스를 지정한다.
- num.stream.threads : 스트림 프로세싱 실행 시 실행될 스레드 개수를 지정한다.
- state.dir : rocksDB 저장소가 위치할 디렉토리를 지정한다. (key-value DB로서 카프카 스트림즈가 상태기반 데이터 처리를 할 때 로컬 저장소로 활용)
스트림즈DSL 개발하기
간단한 스트림 프로세싱으로, 특정 토픽의 데이터를 다른 토픽으로 전달하는 것을 구현해보자. stream_log 토픽에서 stream_log_copy 토픽으로 동일한 데이터를 옮겨본다.
1) stream_log 토픽 생성
2) 스트림즈 코드
스트림즈 애플리케이션을 실행한다.
package com.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class SimpleStreamApplication {
private static String APPLICATION_NAME = "streams-application"; // application.id 값을 기준으로 병렬처리하기 때문에 지정해야 한다.
private static String BOOTSTRAP_SERVERS = "my-kafka:9092"; // 스트림즈 애플리케이션과 연동할 카프카 클러스터 정보를 입력한다.
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_COPY = "stream_log_copy";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 메시지 키, 메시지 값의 직렬화/역직렬화할 방식을 지정
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder(); // 스트림 토폴로지를 정의
KStream<String, String> streamLog = builder.stream(STREAM_LOG); // stream_log 토픽으로부터 KStream 객체를 만들기 위해 stream() 메서드 활용
streamLog.to(STREAM_LOG_COPY); // stream_log 토픽을 담은 KStream 객체를 다른 토픽으로 전송
KafkaStreams streams = new KafkaStreams(builder.build(), props); // Kafkastreams 인스턴스를 생성하여 stream_log 토픽의 데이터를 stream_log_copy 토픽으로 전달한다.
streams.start();
}
}
3. stream_log에 데이터를 프로듀스하고, stream_log_copy를 확인한다.
stream_log 토픽의 데이터를 stream_log_copy 토픽으로 보낸 것을 확인할 수 있다.
스트림즈DSL - filter()
소스 프로세서, 싱크 프로세서로 이루어진 토폴로지를 스크림즈DSL로 구현해보았다. 데이터를 처리하기 위해서는 스트림 프로세서가 추가되어야 하는데, 이것을 구현하는 방법을 알아보자.
토픽으로 들어온 문자열 데이터 중 문자열의 길이가 5보다 큰 경우만 필터링하는 스트림즈 애플리케이션을 스트림 프로세서를 사용하여 만들어보자.
메시지 키 또는 메시지 값을 필터링하여 특정 조건에 맞는 데이터를 골라낼 때는 filter() 메서드를 사용하면 된다.
[소스, 스트림, 싱크 토폴로지 구조 이미지]
1) 스트림즈 코드
해당 코드를 실행한다.
package com.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class StreamsFilter {
private static String APPLICATION_NAME = "streams-filter-application"; // application.id 값을 기준으로 병렬처리하기 때문에 지정해야 한다.
private static String BOOTSTRAP_SERVERS = "54.184.145.255:9092"; // 스트림즈 애플리케이션과 연동할 카프카 클러스터 정보를 입력한다.
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_FILTER = "stream_log_filter";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 메시지 키, 메시지 값의 직렬화/역직렬화할 방식을 지정
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder(); // 스트림 토폴로지를 정의
KStream<String, String> streamLog = builder.stream(STREAM_LOG);
KStream<String, String> filteredStream = streamLog.filter( // 데이터를 필터링하는 filter() 메서드를 활용한다. 메시지 값의 길이가 5보다 큰 경우만 필터링하도록 한다.
(key, value) -> value.length() > 5);
filteredStream.to(STREAM_LOG_FILTER); // 필터링된 KStream을 stream_log_filter 토픽에 저장하도록 소스 프로세서를 작성한다.
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
2) stream_log 데이터 입력
stream_log 에 다양한 길이의 메시지 값을 입력한다.
3) stream_log_filter 데이터 확인
입력 오류로 인해 약간 이상하게 출력되었지만 hello 데이터가 필터링된 것을 볼 수 있는 듯하다.
스트림즈DSL - KTable과 KStream을 join()
KTable과 KStream은 메시지 키를 기준으로 조인할 수 있다. 카프카에서는 실시간으로 들어오는 데이터들을 조인할 수 있다.
이름(메시지 키), 주소(메시지 값)을 가지고 있는 KTable과 이름(메시지 키), 주문한 물품(메시지 값)을 가지고 있는 KStream이 있다고 하자. 사용자가 물품을 주문하면 토픽에 저장된 이름:주소로 구성된 KTable과 조인하여 물품과 주소가 조합된 데이터를 새로 생성할 수 있다.
이와 같이 KTable과 KStream을 소스 프로세서로 가져와서 조인을 수행하는 스트림 프로세서를 거쳐, 특정 토픽에 싱크 프로세서로 저장하는 로직을 구현해보자.
[KTable과 KStream 조인 토폴로지 이미지]
'AI & 빅데이터 > kafka' 카테고리의 다른 글
[kafka] 웹 페이지 이벤트 적재 파이프라인 생성 - 1) 요구 사항과 정책 및 기능 정의하기 (0) | 2022.08.22 |
---|---|
[kafka] kafka connect, 카프카 커넥트 (0) | 2022.08.18 |
[kafka] 카프카 클라이언트 개발하기 - 어드민 API (0) | 2022.08.08 |
[kafka] 카프카 클라이언트 개발하기 - 컨슈머 API (0) | 2022.08.06 |
[kafka] 카프카 클라이언트 개발하기 - 프로듀서 API (0) | 2022.08.06 |