이 글은 ksqlDB
(당시에는 KSQL
이라는 명칭이었습니다)를 학습하기 위해 정리한 연재글입니다.
이 글의 순서는 아래와 같습니다.
상술하였던 아파치 카프카의 API에 대해 상세히 살펴봅시다.
아파치 카프카에서의 Stream이 무슨의미를 지니는지 살펴봅시다.
[k|v] -> [k|v] -> [k|v] -> [k|v]
모양으로 계속 흐르며, [k|v]
한 쌍은 데이터 레코드라고 부릅니다.카프카 스트림의 주요 용어에 대해 살펴보겠습니다.
스트림 처리 애플리케이션(Stream Processing Application)
스트림 프로세서(Stream Processor)
소스 프로세서(Source Processor)
싱크 프로세서(Sink Processor)
KStream
과 KTable
이란?KStream
은 stream의 abstraction입니다.KStream
을, changelog 주어진 키에 대해 가장 최신값만 원한다면 KTable
을 사용하는 것이 좋습니다.KStream
과 KTable
의 차이?KStream
postgres.exe
프로세스에 대한 1500번 이벤트가 10월 31일, A클라이언트에서 발생했다 (서로 독립적)KTable
KTable
은 time-evolving stream 과 관련있습니다. 시간이 흐름에 따라 값이 변화하는 스트림에 주로 사용될 수 있겠습니다.Time
의 개념스트림 프로세싱의 핵심은 시간을 가지고 작업하는 개념입니다.
event time
ingestion time
processing time
APPLICATION_ID_CONFIG
, BOOTSTRAP_SERVERS_CONFIG
을 설정합니다.
APPLICATION_ID_CONFIG
: 새로 만들 필터링 앱의 이름 (unique)BOOSTRAP_SERVERS_CONFIG
: 데이터 스트림을 가져올 카프카의 주소new StreamsBuilder()
구문으로 빌더를 만들고(토폴로지 정의 빌더), KStream
변수타입을 사용해서 어느 토픽에 어떤 필터로 값을 가져올지 정합니다.KafkaStreams
타입의 변수를 만들고 빌더, 설정값을 세팅합니다.kafka stream
설정을 입력합니다.KStream
, KTable
및 GlobalKTable
을 정의합니다 (앞서 입력한 설정을 추가)KafkaStreams
객체를 선언하고 consume 을 통해 새로운 스트림 생성합니다.import os
import faust
CONSUMER_NAME = "TEST_CLICK_CONSUMER_01"
KAFKA_BROKER = "임의의 브로커 주소"
SRC_TOPIC = "임의의 토픽주소"
# 앱 구동 전, 기본 설정값 세팅
app = faust.App(
CONSUMER_NAME,
broker=f"kafka://{KAFKA_BROKER}",
)
# 값을 가져오기위한 메인토픽
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app.topic(
SRC_TOPIC,
key_type=str,
value_type=int,
)
# 새로이 저장할 토픽
# default value for missing URL will be 0 with `default=int`
counts = app.Table(
'click_counts',
default=int,
)
@app.agent(src_topic)
async def count_click(clicks):
async for url, count in clicks.items():
counts[url] += count
도커 이미지를 아래와 같이 작성 후 구동하면 파이썬 코드로도 상기 Kafka Streams 처리가 가능합니다.
FROM python:3.9
COPY . /app
WORKDIR /app
RUN \
pip install -r requirements.txt
RUN ["python", "stream.py"]
Faust 에 대한 상세한 설명은 여기를 살펴봐주세요.
만일 파이썬 베이스 이미지를 통해 librdkafka 기반의 카프카 처리 라이브러리를 사용해야 한다면, 추가적인 의존성을 필요로 할 수 있습니다. 관련 내용은 이 링크를 참고해주세요.
이번 글을 통해, 아래 내용들을 살펴볼 수 있었습니다:
다음 파트에선 본격적으로 ksqlDB가 무엇인지, 그리고 이를 통해 어떤식으로 실시간 이벤트 처리를 수행할 수 있는지를 대표적인 예시로 살펴보겠습니다.
읽어주셔서 대단히 감사합니다.