7장으로 1부가 끝나고 2부에서는 시스템이 커졌을 때 이벤트 기반 아키텍처를 통해 애그리게이트와 애플리케이션을 서로 분리하는 방법을 알아볼 예정이다
재고 부족으로 주문 할당이 불가능 한경우 이메일로 이를 통지하는 새로운 (기존과 서비스와 관계 없는) 요구사항 추가된 상황
지저분해지지 않게 막기
웹 컨트롤러가 지저분해지는 일을 막자
- 핵심 도메인과 아무 관련없는 요구사항을 아무 생각없이 웹 컨트롤러에 넣기 쉽다
- 하지만 이렇게 이것저것 끼워넣으면 코드가 빠르게 더러워진다
모델이 지저분해지는 일을 막자
- 도메인 모델은 실제 할당할 수 있는 것보다 더 많은 상품을 할당 할 수 없다는 규칙에만 집중 할 것
- 도메인 모델의 규칙을 변경하지 않아도 이메일대신 SMS로도 통지를 보낼 수 있어야 함
서비스 계층이 지저분해지는 일을 막자
- 서비스 계층에서 예외를 잡아서 다시 email send를 발생시키는 것도 불편하다
단일 책임 원칙
- 위 방법들은 단일 책임 원칙(single responsibility principle)에 위배됨
- 처리하려는 유스케이스는 할당이기 때문에 이메일을 보내는 일은 새로운 책임으로 분리해야 한다
메시지 버스에 전부 다 싣자
- 도메인 이벤트와 메시지 버스 패턴으로 해결해보자
이벤트를 기록하는 모델
- 모델은 이메일을 신경쓰지 않고 이벤트 기록을 담당
- 이벤트에 응답하고 새로운 연산을 실행하기 위해 메시지 버스 사용
이벤트는 간단한 데이터 클래스다
- 이벤트는 값 객체에 속하는 순수 데이터 구조로 아무 동작이 없음
- 항상 이벤트를 도메인 모델 일부로 간주하고 도메인 언어로 이름 붙여야함
- 기존 model.py에서 domain 디렉토리를 추가하고 domain/model.py와 domain/events.py 로 분리하여 리팩토링하기 좋은 시점
모델은 이벤트를 발생시킨다
product 할당을 요청했을 때 할당이 불가능하면 이벤트가 발생
# src/allocation/domain/model.py
class Product:
def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
self.sku = sku
self.batches = batches
self.version_number = version_number
self.events = [] # 1. type: List[events.Event]
def allocate(self, line: OrderLine) -> str:
try:
#...
except StopIteration:
self.events.append(events.OutOfStock(line.sku)) # 2.
# 3. raise OutOfStock(f"Out of stock for sku {line.sku}")
return None
- 새로운 .events 애트리뷰트 사용
- 품절 이벤트가 발생한 시점에 이벤트를 기록
- 이벤트가 예외가 담당했던 일을 수행하기 때문에 예외는 더 이상 필요 없다
메시지 버스는 이벤트를 핸들러에 매핑한다
- 메시지 버스는 이 이벤트가 발생하면 다음 핸들러 함수를 호출해야한다고 말한다
- 핸들러는 수신된 이벤트를 구독한다
- 즉 간단한 발행/구독(pub/sub) 시스템
# src/allocate/service_layer/messagebus.py
def handle(event: events.Event):
for handler in HANDLERS.get(type(event), []):
handler(event)
def send_out_of_stock_notification(event: events.OutOfStock):
email.send_mail(
"stock@made.com",
f"Out of stock for {event.sku}",
)
HANDLERS = {
events.OutOfStock: [send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]
* 여기서 구현한 메시지 버스는 핸들러가 한번에 하나씩만 실행되기 때문에 동시성을 제공하지 못함
코드베이스 이해를 위해 개념적으로 작업을 분리하고 각 UoW를 가능한 한 작게 유지하는 예제
첫번째 선택지 : 서비스 계층에서 모델 이벤트를 가져와 메시지 버스에 싣기
# service_layer/service.py
from . import messagebus
...
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f"Invalid sku {line.sku}")
try: #(1)
batchref = product.allocate(line)
uow.commit()
return batchref
finally: #(1)
for event in product.events:
messagebus.handle(event)
- 이는 간단하게 서비스 finally 문에서 이벤트 핸들 코드를 추가해서 구현
- 하지만 못생긴 try/finally 문은 여전함
두번째 선택지 : 서비스 계층에서 자신만의 이벤트 발행
OutOfStock 이벤트를 직접 만들어서 발행
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f"Invalid sku {line.sku}")
batchref = product.allocate(line)
uow.commit() #(1)
if batchref is None:
messagebus.handle(events.OutOfStock(line.sku))
return batchref
세번째 선택지 : UoW에서 메시지 버스에 이벤트 발행
UoW에는 이미 try/finally 문이 있고 저장소에 대한 접근을 제공하므로 어떤 애그리게이트가 작업을 수행하는지 알고 있음
# service_layer.unit_of_word.py
class AbstractUnitOfWork(abc.ABC):
...
def commit(self):
self._commit() # 1.
self.publish_events() # 2.
def publish_events(self): # 2.
for product in self.products.seen: # 3.
while product.events:
event = product.events.pop(0)
messagebus.handle(event)
@abc.abstractmethod
def _commit(self):
raise NotImplementedError
...
class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
...
def _commit(self):
self.session.commit() # 1.
- 비공개 _commit() 으로 커밋 메서드 변경
- 커밋한 다음 저장소에 전달된 모든 객체를 살펴보고 그 중 이벤트를 메시지 버스에 전달
- 새로운 seen이란 값 추가해 모든 애그리게이트를 추적
# adapters/repository.py
class AbstractRepository(abc.ABC):
def __init__(self):
self.seen = set() # 1. type: Set[model.Product]
def add(self, product: model.Product): # 2.
self._add(product)
self.seen.add(product)
def get(self, sku) -> model.Product: # 3.
product = self._get(sku)
if product:
self.seen.add(product)
return product
@abc.abstractmethod
def _add(self, product: model.Product): # 2.
raise NotImplementedError
@abc.abstractmethod
def _get(self, sku) -> model.Product: # 3.
raise NotImplementedError
class SqlAlchemyRepository(AbstractRepository):
def __init__(self, session):
super().__init__()
self.session = session
def _add(self, product): # 2.
self.session.add(product)
def _get(self, sku): # 3.
return self.session.query(model.Product).filter_by(sku=sku).first()
- UoW에서 이벤트를 발행하기 위해 .seen이라는 집합을 통해 사용한 Product 객체를 저장
- 부모의 add() 메서드는 .seen에 객체 저장, 하위 클래스에서 ._add() 구현
- 비슷하게 .get()도 ._get() 함수에게 동작을 위임, 하위 클래스에서 ._get() 구현
UoW와 repository가 이런식으로 협력하여 자동으로 살아있는 객체를 추적하고 그로부터 발생한 이벤트 처리하면 서비스 계층은 다시 깨끗해진다
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f"Invalid sku {line.sku}")
batchref = product.allocate(line)
uow.commit()
return batchref
마치며
장점
|
단점
|
메시지 버스를 사용하면 어떤 요청에 대한 응답으로 여러 동작을 수행하는 경우 관심사를 멋지게 분리 가능
|
메시지 버스가 머리에 맴도는 또 다른 요소. 작업 단위가 이벤트를 발생시키는 방식의 구현을 깔끔하지만, 신비롭다. 사람들에게 이메일을 보내지만 작업을 계속 진행하고 싶은 경우 commit을 언제 호출해야 할지 불분명하다.
|
이벤트 핸들러는 핵심 애플리케이션 로직과 깔끔하게 분리되 ㄹ수 있다. 따라서 나중에 이벤트 핸들러 구현을 쉽게 변경할 수 있다.
|
더구나 감춰진 이벤트 처리 코드가 동기적으로 실행된다. 즉 어떤 이벤트에 대한 모든 핸들러가 끝나기 전에는 서비스 계층 함수도 끝날 수 없다는 의미 → 이건 어차피 실무에선 분리하기 때문에 별 의미 없다고 봄
|
도메인 이벤트는 실세계를 모델링하기에 아주 좋은 방법, 관계자들과 모델을 만들 때 도메인 이벤트를 비즈니스 언어의 일부분으로 사용할 수 있다.
|
일반적으로, 이벤트 기반 워크플로는 연속적으로 여러 핸들러로 분할된 후 시스템에서 요청을 어떻게 처리하는 지 살펴볼 수 있는 단일 지점이 존재하지 않아 혼란을 야기할 수 있다.
|
|
이벤트 핸들러들이 순환적으로 서로 의존해서 무한 루프가 발생할 여지가 생긴다.
|
트랜잭션으로 격리된 두 요소가 있다면 이벤트를 통해 이 둘을 서로 최종 일관성 있게 만들 수 있다
'책 & 스터디' 카테고리의 다른 글
[파이썬으로 살펴보는 아키텍처 패턴] 챕터 12 명령-질의 책임 분리(CQRS) (0) | 2024.05.19 |
---|---|
[파이썬으로 살펴보는 아키텍처 패턴] 챕터 9 메시지 버스를 타고 시내로 나가기 (1) | 2024.05.15 |
[파이썬으로 살펴보는 아키텍처 패턴] 챕터 7 애그리게이트와 일관성 경계 (1) | 2024.05.13 |
[파이썬으로 살펴보는 아키텍처 패턴] 챕터 6 작업단위 패턴 (0) | 2024.03.31 |
[파이썬으로 살펴보는 아키텍처 패턴] 챕터 5 높은 기어비와 낮은 기어비의 TDD (1) | 2024.03.31 |