이 글은 ksqlDB(당시에는 KSQL이라는 명칭이었습니다)를 학습하기 위해 정리한 연재글입니다.
이 글의 순서는 아래와 같습니다.
상술하였던 아파치 카프카의 API에 대해 상세히 살펴봅시다.
아파치 카프카에서의 Stream이 무슨의미를 지니는지 살펴봅시다.
[k|v] -> [k|v] -> [k|v] -> [k|v] 모양으로 계속 흐르며, [k|v] 한 쌍은 데이터 레코드라고 부릅니다.카프카 스트림의 주요 용어에 대해 살펴보겠습니다.
스트림 처리 애플리케이션(Stream Processing Application)
스트림 프로세서(Stream Processor)
소스 프로세서(Source Processor)
싱크 프로세서(Sink Processor)
KStream과 KTable이란?KStream은 stream의 abstraction입니다.KStream을, changelog 주어진 키에 대해 가장 최신값만 원한다면 KTable을 사용하는 것이 좋습니다.KStream과 KTable의 차이?KStream
postgres.exe 프로세스에 대한 1500번 이벤트가 10월 31일, A클라이언트에서 발생했다 (서로 독립적)KTable
KTable 은 time-evolving stream 과 관련있습니다. 시간이 흐름에 따라 값이 변화하는 스트림에 주로 사용될 수 있겠습니다.Time의 개념스트림 프로세싱의 핵심은 시간을 가지고 작업하는 개념입니다.
event time
ingestion time
processing time
APPLICATION_ID_CONFIG, BOOTSTRAP_SERVERS_CONFIG을 설정합니다.
APPLICATION_ID_CONFIG: 새로 만들 필터링 앱의 이름 (unique)BOOSTRAP_SERVERS_CONFIG: 데이터 스트림을 가져올 카프카의 주소new StreamsBuilder() 구문으로 빌더를 만들고(토폴로지 정의 빌더), KStream 변수타입을 사용해서 어느 토픽에 어떤 필터로 값을 가져올지 정합니다.KafkaStreams 타입의 변수를 만들고 빌더, 설정값을 세팅합니다.kafka stream 설정을 입력합니다.KStream, KTable 및 GlobalKTable을 정의합니다 (앞서 입력한 설정을 추가)KafkaStreams 객체를 선언하고 consume 을 통해 새로운 스트림 생성합니다.import os
import faust
CONSUMER_NAME = "TEST_CLICK_CONSUMER_01"
KAFKA_BROKER = "임의의 브로커 주소"
SRC_TOPIC = "임의의 토픽주소"
# 앱 구동 전, 기본 설정값 세팅
app = faust.App(
CONSUMER_NAME,
broker=f"kafka://{KAFKA_BROKER}",
)
# 값을 가져오기위한 메인토픽
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app.topic(
SRC_TOPIC,
key_type=str,
value_type=int,
)
# 새로이 저장할 토픽
# default value for missing URL will be 0 with `default=int`
counts = app.Table(
'click_counts',
default=int,
)
@app.agent(src_topic)
async def count_click(clicks):
async for url, count in clicks.items():
counts[url] += count도커 이미지를 아래와 같이 작성 후 구동하면 파이썬 코드로도 상기 Kafka Streams 처리가 가능합니다.
FROM python:3.9
COPY . /app
WORKDIR /app
RUN \
pip install -r requirements.txt
RUN ["python", "stream.py"]Faust 에 대한 상세한 설명은 여기를 살펴봐주세요.
만일 파이썬 베이스 이미지를 통해 librdkafka 기반의 카프카 처리 라이브러리를 사용해야 한다면, 추가적인 의존성을 필요로 할 수 있습니다. 관련 내용은 이 링크를 참고해주세요.
이번 글을 통해, 아래 내용들을 살펴볼 수 있었습니다:
다음 파트에선 본격적으로 ksqlDB가 무엇인지, 그리고 이를 통해 어떤식으로 실시간 이벤트 처리를 수행할 수 있는지를 대표적인 예시로 살펴보겠습니다.
읽어주셔서 대단히 감사합니다.