이 내용은 “파이썬으로 살펴보는 아키텍처 패턴” 을 읽고 작성한 내용입니다. 블로그 게시글과, 작성한 코드를 함께 보시면 더욱 좋습니다.
11장은 해당 코드를 살펴봐주세요. 코드 링크
제목 길다ㅋㅋ
이전 장에서는 실제로 ‘배치 수량이 변경됨’ 이라는 이벤트를 어떻게 받을 수 있는지, 재할당에 대해 외부 세계에 어떻게 통지할 수 있는지에 대해서는 논하지 않았다.
현재까지 만든건 웹 API가 있는 마이크로서비스 한 개다. 다른 시스템과 이야기하는 다른 방법을 생각해보자. 선적이 지연되거나, 수량이 변경되거나 하는건 시스템이 어떻게 알 수 있을까? 시스템이 창고 시스템에게 주문이 할당되었고 다른 고객에게 운송되어야 한다고 어떻게 이야기할 수 있을까?
이번 장에서는 이벤트 비유를 확장하여 시스템으로 들어오거나 시스템에서 나가는 메시지까지 포용하는 방안을 살펴본다. 여지껏 애플리케이션의 핵심은 메시지 처리기가 되도록 바꾸었다. 이제는 외부로도 이를 처리할 수 있도록 작업해보자.
외부 이벤트가 들어오는 것은 외부 메시지 버스(이 책에서는 Redis의 pub/sub 대기열을 예제로 사용한다)를 통해 subscribe 하고, 출력은 이벤트 형태로 외부 메시지 버스에 publish 한다.
책의 저자는 마이크로서비스 아키텍처(이하 MSA)를 구축하는 사람과 자주 이야기하며 기존 앱을 마이그레이션하는 논의를 자주 한다고 한다. 이 때 본능적으로 하는 일은 시스템을 명사화 하는 것이다.
현재까지의 시스템에 도입된 명사들을 생각해보자. 재고 배치, 주문, 상품, 고객 등이 있다. 이를 그림과 같이 나누었다. (참고: ‘할당’이라는 동작 대신, ‘배치’라는 명사를 기준으로 이름이 붙어졌음)
이 시스템의 ‘물건’ 마다 연관된 서비스가 있고, 그 서비스는 HTTP API를 노출한다.
아래 command flow 1을 통해 정상경로(happy path) 를 진행해보자.
3
번째 주문일 경우 고객 레코드를 변경하여 일반 고객을 VIP로 승격시킨다각 단계를 이런 커맨드로 생각해볼 수 있겠다:
ReserveStock
ConfirmReservation
DispatchGoods
MakeCustomerVIP
이런 스타일의 아키텍처에서는 DB 테이블 단위로 마이크로서비스를 만들고, HTTP API를 빈약한(비즈니스 로직이 없는) 모델에 대한 CRUD 인터페이스로 취급하며, 서비스 중심의 설계를 처음 하는 사람들이 취하는 방식이다.
간단하면 잘 돌겠지만, 금방 복잡해진다(!!!). 왜냐면 실패 케이스에 대한 고려가 없기 때문이다. 이런 케이스에 대해 살펴보자:
이런 기능들을 어디에 넣어야할까? 대충 봤을 땐(!) 창고 시스템이 하면 될 것 같다. 아래와 같은 command flow 2가 나올 것이다.
잘 돌아간다. 그렇지만 의존성 그래프가 지저분해진다. 왜인지 보자:
이 경우 시스템이 제공해야 하는 다른 워크플로우의 숫자만큼 곱한다. 이래선 빠르게 결과를 만들어내는 것만 못한 시스템이 나온다..!
‘모든 것은 망가진다(Things break)‘는 소프트웨어 엔지니어링에서 일반적인 규칙이다. 어떤 요청이 실패하면 시스템에 어떤 일이 발생하는지 살펴보자.
예를 들어, 사용자가 MISBEGOTTEN-RUG
에 대해 3개를 주문받고 네트워크 오류가 발생했다 가정하자:
이에 대한 두 가지 처리방법이 있다.
본 책에서는 결합(coupling) 이란 말을 사용하나, 시스템 상 현재 예제와 같은 관계를 동시생산(connascence)1 라고도 일컫는다. 이는 다른 유형의 결합을 묘사할 때 사용하는 용어다.
동시생산은 나쁘지 않다. 그렇지만 어떤 동시생산 케이스는 다른 케이스보다 더 강하다. 보통은 두 클래스가 밀접하게 연관되어(closely related)있으면 강한 동시생산을 지역적으로만 한정시키고 그렇지 않으면 약한 생산으로 떼어놓고자 한다.
위에서 살펴본 커맨드 플로우 2 예시에서는 실행의 동시생산(connascence of execution)을 살펴볼 수 있다. 연산이 성공하려면 여러 구성요소의 정확한 작업 순서를 알고 있어야 한다.
여기서는 오류가 발생하는 경우에서는 타이밍의 동시생산(connascence of timing)을 살펴볼 수 있다. 한 가지 일이 일어난 직후 바로 다음 일이 일어나야 한다.
RPC 이야기는 이해못해서 기재하지 않음 이름의 동시생산(connascence of name)에 대해 언급한다.
소프트웨어가 다른 소프트웨어와 통신하지 않는 경우를 제외하고는 결합을 완전히 피할 수 없다. 다만 부적절한 결합만큼은 피해야 한다. 동시생산은 서로 다른 아키텍처 스타일에 내재된 결합의 강도와 유형을 이해하기 위한 멘탈 모델(mental model)을 제공한다.
적절한 결합을 얻기 위해선 명사가 아니라 동사로 생각해야 한다는 점을 살펴봤다. 도메인 모델은 비즈니스 프로세스를 모델링하기 위함이다. 도메인 모델은 어떤 물건에 대한 정적인 데이터 모델이 아닌 동사에 관한 모델이다.
따라서 주문에 대한 시스템과 배치에 대한 시스템을 생각하는 것이 아니라, 주문 행위(ordering) 에 대한 시스템과 할당행위(allocating)에 대한 시스템을 생각한다.
이런 식으로 사물을 구별하면 어떤 시스템이 어떤 일을 하는지에 대해 생각하기 쉽다. 주문 행위에 대해 생각해보면, 주문을 넣었을 때 주문이 들어간다는 무조건 되어야 한다. 다른 모든 일은 언젠가 발생한다는 것만 보장할 수 있다면 나중에 발생할 수 있다.
📒 NOTE 📒
애그리게이트와 커맨드 설계 시 수행했던 책임 분리가 바로 이것이다.
마이크로서비스 또한 일관성 경계(consistency boundaries)여야 한다. 두 서비스에는 최종 일관성을 받아들일 수 있고, 이는 동기화된 호출에 의존하지 않아도 된다는 뜻이다. 각 서비스는 외부 세게에서 커맨드를 받고 결과를 저장하기 위해 이벤트를 발생시킨다. 이런 이벤트를 수신하는 다른 서비스는 워크플로우의 다음 단계를 트리거링한다.
이런 식으로 쉽게 복잡해지기 쉬운 구조2를 방지하기 위해 시간적으로 결합된(temporally coupled) 메시지가 업스트림 시스템으로부터 외부 메시지로 도착하길 바란다. 시스템은 이벤트를 리슨하는 다운스트림 시스템을 위해 Allocated
이벤트를 publish한다.
왜 이런 구조가 더 나은지에 대한 근거는 아래와 같다:
그렇다면 이를 할 수 있는 메시지 버스가 필요하다. 이는 이벤트를 안팎으로 처리할 수 있는 인프라를 의미한다. 흔히들 메시지 브로커(message broker) 라고 부른다. 메시지 브로커의 역할은 publisher로부터 메시지를 받아서 subscriber에게 전달3(deliver)하는 것이다.
made.com(진짜 영국의 가구서비스임)에서는 Event Store 라는 서비스를 쓴다. Kafka나 RabbitMQ도 좋은 대안이다. 책에서는 Redis pub/sub channel를 사용할 것이다.
📒 NOTE 📒
메시징 플랫폼을 선택하는 주요 방안으로는 주로 메시지 순서, 실패 처리, 멱등성(idempotency) 등이 있다. 이는 14.8절에서 다시 살펴보자!
그렇다면 새로운 흐름에 대한 시퀀스 다이어그램을 살펴보자. Redis는 전체 프로세스를 시작하는 BatchQuantityChanged
를 제공하고, 마지막에는 Allocated
이벤트를 Redis에 publish 한다.
어떤 식으로 수행하는지 살펴보자. API를 사용하여 배치를 만들고, 인바운드-아웃바운드 메시지를 테스트할 것이다
여기서 잠깐!
나는 현재 DB 커넥션도 Dependency Injector를 통해 구현했는데, Redis도 마찬가지일 것이다. 이런 예시를 프로젝트에 맞게 구현할 것이다. 아래 방안을 작성하고자 한다:
이 로직을 테스트하기 위해선 아래 테스트코드가 필요하다:
@pytest.mark.asyncio
async def test_change_batch_quantity_leading_to_reallocation(client):
# 두 배치와 할당을 수행하여 한 쪽에 할당하는 주문으로 시작한다.
orderid, sku = random_orderid(), random_sku()
earlier_batch, later_batch = random_batchref("old"), random_batchref("newer")
await post_to_add_batch(client, earlier_batch, sku, qty=10, eta="2023-01-01")
await post_to_add_batch(client, later_batch, sku, qty=10, eta="2023-01-02")
response = await post_to_allocate(client, orderid, sku, 10)
assert response.json()["batchref"] == earlier_batch
subscription = await redis_client.subscribe_to("line_allocated") # 1)
await redis_client.publish_message( # 2)
"change_batch_quantity",
{"batchref": earlier_batch, "qty": 20},
)
messages = []
async with async_timeout.timeout(3): # 3)
message = await subscription.get_message(timeout=1)
if message:
messages.append(message)
print(messages)
assert len(messages) == 1
data = json.loads(messages[-1]["data"])
assert data['order_id'] == orderid
assert data["batchref"] == later_batch
line_allocated
라는 채널을 listen 한다.change_batch_quantity
라는 채널에 아래 dict 값을 가진 이벤트를 전송함을 의미한다.tenacity
를 사용해서 3번 정도를 더 수신하도록 기다린다Redis pub/sub 리스너, 혹은 이벤트 소비자(event consumer)는 외부 서비스로부터 메시지를 받고 변환하여 이를 이벤트로 만든다.
아래는 Redis메시지 리스너의 간단한 버전이다:
async def main():
orm.start_mappers()
pubsub = r.pubsub(ignore_subscribe_messages=True)
await pubsub.subscribe("change_batch_quantity") # 1)
async for m in pubsub.listen():
await handle_change_batch_quantity(m)
async def handle_change_batch_quantity(m):
logging.debug("handling %s", m)
data = json.loads(m["data"]) # 2)
cmd = commands.ChangeBatchQuantity(ref=data["batchref"], qty=data["qty"])
await messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())
if __name__ == "__main__":
asyncio.run(main())
change_batch_quantity
채널을 subscribe 한다그렇다면 Redis 이미지 publisher 또한 살펴보자.
async def publish(channel, event: events.Event): # 1)
logging.debug("publishing: channel=%s, event=%s", channel, event)
await r.publish(channel, json.dumps(asdict(event)))
Allocated
라는 이벤트를 살펴보자
@dataclass
class Allocated(Event):
orderid: str
sku: str
qty: int
batchref: str
이 이벤트로는 주문 라인 상세정보, 어떤 배치에 주문라인이 할당되었는지 등 할당에 대해 알아야 할 필요가 있는 모든 내용을 저장한다.
이를 모델의 allocate()
메소드에 추가한다. 이를 위한 테스트를 함께 추가하자.
def test_product_allocate_should_emit_an_event():
batch = Batch('batch1', 'SMALL-FORK', 10, eta=today)
product = Product(sku='SMALL-FORK', batches=[batch])
allocation = product.allocate(OrderLine('order1', 'SMALL-FORK', 1))
expected_event = events.Allocated(
orderid="order1",
sku="SMALL-FORK",
qty=1,
batchref=batch.reference,
)
assert product.messages[-1] == expected_event
assert allocation is "batch1"
이런 류의 테스트가 있어야겠고, Product
애그리게이트에는 이벤트 emit하는 로직이 있어야 할 것이다.
class Product:
def __init__(
self,
sku: str,
batches: List[Batch],
version_number: int = 0,
):
...
def allocate(
self,
line: OrderLine,
) -> str:
...
self.version_number += 1
self.messages.append( # 1)
events.Allocated(
orderid=line.orderid,
sku=line.sku,
qty=line.qty,
batchref=batch.reference,
)
)
return batch.reference
...
Allocated
이벤트를 담아야한다.그 후에는 메시지 버스에도 관련 핸들러를 추가해주고, 이벤트 발행할 때는 레디스 wrapper가 제공하는 헬퍼함수를 쓰자.
class MessageBus:
EVENT_HANDLERS = {
events.Allocated: [handlers.publish_allocate_event], # 1)
events.OutOfStock: [handlers.send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]
async def publish_allocate_event(
event: events.Allocated,
uow: unit_of_work.AbstractUnitOfWork,
channel: redis.AsyncRedis,
):
await channel.publish("line_allocated", event) # 2)
레디스 커넥션 처리는 Dependency Injector로 이렇게 했다
class Container(containers.DeclarativeContainer):
__self__ = providers.Self()
config = providers.Configuration()
config.from_pydantic(Settings())
wiring_config = containers.WiringConfiguration( # 1)
packages=[
"pt2.ch11.src.allocation.entrypoints",
]
)
redis_pool = providers.Resource( # 2)
redis.init_redis_pool,
redis_uri=config.broker.REDIS_URI,
)
redis = providers.Factory( # 3)
redis.AsyncRedis,
session=redis_pool,
)
AsyncRedis
라는 클래스의 의존성을 주입한다class AsyncRedis:
def __init__(
self,
session: redis.Redis,
):
self._session = session # 1)
async def publish( # 2)
self,
channel,
event: events.Event,
):
await self._session.publish(channel, json.dumps(asdict(event)))
from fastapi import Depends # 3)
@app.post(
"/allocate",
status_code=status.HTTP_201_CREATED,
)
@inject
async def allocate_endpoint(
order_line: OrderLineRequest,
channel: redis.AsyncRedis = Depends(Provide[Container.redis]), # 3)
):
....
batchref = await messagebus.handle(
...
channel=channel,
)
@inject
async def signup_user(
channel: redis.AsyncRedis = Provide[Container.cache], # 4)
):
return jsonify({"status": await cache.ping()})
redis.publish(channel, event)
형식으로 사용하면 된다. 이 때 Wiring을 위해 아래 3, 4와 같은 구문을 사용한다from fastapi import Depends
를 해주고 의존성 주입을 한다내부 외부 이벤트의 구분이 명확할 필요가 있다. 일부 이벤트는 밖에서 들어오지만, 일부 이벤트는 승격되며 외부에 이벤트를 publish 할 수도 있다. 하지만 모든 이벤트가 외부에 이벤트를 emit하지는 않는다. 이벤트 소싱에 대해서는 저자가 작성한 글을 함께 읽어보자.
TIP
외부로 나가는 이벤트는 검증을 적용하는 것이 중요한 부분에 속한다. Appendix E 도 함께 살펴보자.
이벤트는 외부에서 들어올 수도, 외부로 emit할 수도 있다. publish
핸들러는 이벤트를 Redis 메시지 채널의 메시지로 변환한다. 이런즉 이벤트를 사용해 외부 세계와 이야기를 나누는 식의 시간적인 결합을 이용하자. 그렇다면 애플리케이션 통합 시 상당한 유연성을 얻을 수 있다. 하지만 흐름이 명시적이지 않고 디버깅이나 변경이 어려워질 수도 있다. 이 말을 누가했냐고? 마틴 파울러가 했다.
이벤트 기반 마이크로서비스 통합의 트레이드오프를 살펴보자:
장점 | 단점 |
---|---|
분산된 큰 진흙 공을 피할 수 있다 | 전체 정보 흐름을 알아보기 어렵다 |
서비스가 서로 결합되지 않는다. 개별 서비스 변경 및 새 서비스 추가가 쉽다 | 일관성은 처리할 필요가 있는 새로운 개념이다 |
(이걸 해결하기 위해 SAGA 패턴이나 이벤트 소싱, CQRS가 있다고 하는데, 더 공부해보자) | |
메시지 신뢰성과 at-least-once(최소 한 번) versus at-most-once(최대 한 번)을 서로 생각해봐야 한다 |