책 & 스터디
[파이썬으로 살펴보는 아키텍처 패턴] 챕터 9 메시지 버스를 타고 시내로 나가기
haong_
2024. 5. 15. 22:20
이번 장에서는 애플리케이션의 내부 구조에서 이벤트를 더 근본적인 요소를 만들면서 시작한다
새로운 아키텍처가 필요한 새로운 요구 사항
현실에서는 예기지 못한 상황이 발생할 수 있다
- 재고 조사를 하는 동안 물이 새서 매트리스 3개가 손상된 것을 발견했다
- 포크가 배송 지연 되어 몇 주 동안 세관에 머물러야 했다. 그 중 3개가 안전 검사에서 실패해 폐기했다.
- 금속이 부족해져서 다음 책장을 배치를 생산할 수 없게 됐다.
배치 수량이 변경되면 할당을 해제하고 재할당 해야 한다
구조 변경 상상해보기: 모든 것이 이벤트 핸들러
- 서비스 계층 함수에 의해 처리되는 API 호출
- 내부 이벤트와 그 이벤트에 대한 핸들러
모든 것이 이벤트 핸들러라면? > 서비스 계층 함수도 이벤트라고 생각할 수 있다
- services.allocate() 는 AllocationRequired 이벤트의 핸들러이거나 Allocate 이벤트를 출력으로 보낼 수 있다
- services.add_batch() 도 BatchCreated 이벤트의 핸들러라고 볼 수 있다
새로운 요구 사항도 같은 패턴으로 볼 수 있다
- BatchQuantityChanged 이벤트는 change_batch_quantity() 라는 핸들러를 호출할 수 있다
- 새로운 AllocationRequired 이벤트가 services.allocate() 를 호출할 수 있다
서비스 함수를 메시지 핸들러로 리팩토링하기
- AllocationRequired 와 BatchCreated 추가
'''
events.py
'''
# domain/events.py
@dataclass
class BatchCreated(Event):
ref: str
sku: str
qty: int
eta: Optional[date] = None
...
@dataclass
class AllocationRequired(Event):
orderid: str
sku: str
qty: int
- service.py 를 handler.py로 변경
- 기존 메시지 핸들러인 send_out_of_stock_notification 추가
# service_layer/handler.py
def add_batch(
event: events.BatchCreated,
uow: unit_of_work.AbstractUnitOfWork,
):
with uow:
product = uow.products.get(sku=event.sku)
if product is None:
product = Product(event.sku, batches=[])
uow.products.add(product)
product.batches.append(
Batch(event.ref, event.sku, event.qty, event.eta)
)
uow.commit()
def allocate(
event: events.AllocationRequired,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
line = OrderLine(event.orderid, event.sku, event.qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f"Invalid sku {line.sku}")
batchref = product.allocate(line)
uow.commit()
return batchref
def send_out_of_stock_notification(
event: events.OutOfStock,
uow: unit_of_work.AbstractUnitOfWork,
):
email.send(
"stock@made.com",
f"Out of stock for {event.sku}",
)
메시지 버스는 이제 이벤트를 UoW로부터 수집한다
# service_layer/messagebus.py
def handle(
event: events.Event,
uow: unit_of_work.AbstractUnitOfWork
): # 1.
queue = [event] # 2.
while queue:
event = queue.pop(0) # 3.
for handler in HANDLERS[type(event)]:
handler(event, uow=uow) # 4.
queue.extend(uow.collect_new_events()) # 5.
- 메시지 버스는 시작할때마다 uow 전달 받음
- 첫번재 이벤트 처리 할때 대기열 시작
- 대기열 앞에서 이벤트 pop해서 적절한 핸들러에 넘긴다
- 메시지 버스는 UoW를 각 핸들러에게 전달
- 핸들러 종료되면 새롭게 생성된 이벤트를 수집하고 이 이벤트들을 대기열에 추가
# service_layer/unit_of_work.py
from . import messagebus # 1.
class AbstractUnitOfWork(abc.ABC):
...
def commit(self):
self._commit()
# self.publish_events() # 2.
def collect_new_events(self):
for product in self.products.seen:
while product.events:
yield product.events.pop(0) # 3.
- uow 모듈은 더이상 메시지 버스에 의존하지 않는다
- 커밋이 일어나도 publish_events를 자동으로 호출하지 않고 베시지 버스에서 이벤트 대기열을 추적
- UoW는 메시지 버스에 능동적으로 추가하지 않고 이벤트를 제공
모든 테스트를 이벤트 바탕으로 다시 쓰기
# tests/unit/test_handler.py
class TestAddBatch:
def test_for_new_product(self):
uow = FakeUnitOfWork()
messagebus.handle(
events.BatchCreated("b1", "CRUNCHY-ARMCHAIR", 100, None), uow
)
assert uow.products.get("CRUNCHY-ARMCHAIR") is not None
assert uow.committed
def test_for_existing_product(self):
uow = FakeUnitOfWork()
messagebus.handle(events.BatchCreated("b1", "GARISH-RUG", 100, None), uow)
messagebus.handle(events.BatchCreated("b2", "GARISH-RUG", 99, None), uow)
assert "b2" in [b.reference for b in uow.products.get("GARISH-RUG").batches]
class TestAllocate:
def test_returns_allocation(self):
uow = FakeUnitOfWork()
messagebus.handle(
events.BatchCreated("batch1", "COMPLICATED-LAMP", 100, None), uow
)
results = messagebus.handle(
events.AllocationRequired("o1", "COMPLICATED-LAMP", 10), uow
)
assert results.pop(0) == "batch1"
임시 : 결과를 반환해야 하는 메시지 버스
이벤트를 반환하도록 return 추가 (읽기 쓰기 책임 혼합 > 12장에서 CQRS 패턴 알아볼 예정)
def handle(
event: events.Event,
uow: unit_of_work.AbstractUnitOfWork,
):
results = []
queue = [event]
while queue:
event = queue.pop(0)
for handler in HANDLERS[type(event)]:
results.append(handler(event, uow=uow))
queue.extend(uow.collect_new_events())
return results
이벤트로 작동하도록 API 변경
# src/allocation/entrypoints/flask_app.py
@app.route("/allocate", methods=["POST"])
def allocate_endpoint():
try:
event = events.AllocationRequired( # 1.
request.json["orderid"], request.json["sku"], request.json["qty"]
)
results = messagebus.handle(event, unit_of_work.SqlAlchemyUnitOfWork()) # 2.
batchref = results.pop(0)
except InvalidSku as e:
return {"message": str(e)}, 400
return {"batchref": batchref}, 201
- 이벤트를 인스턴스화 하고
- 메시지 버스에 이벤트 전달
새로운 이벤트
배치수량의 변경을 알려주는 이벤트
@dataclass
class BatchQuantityChanged(Event):
ref: str
qty: int
구현
# service_layer/handler.py
def change_batch_quantity(
event: events.BatchQuantityChanged,
uow: unit_of_work.AbstractUnitOfWork,
):
with uow:
product = uow.products.get_by_batchref(batchref=event.ref)
product.change_batch_quantity(ref=event.ref, qty=event.qty)
uow.commit()
도메인 모델의 새 메서드
- 모델에 새 메서드 추가
- 수량을 변경하자마자 인라인으로 할당을 해제하고 새 이벤트 발행
- 기존 할당 함수를 이벤트가 발생하도록 변경
# domain/model.py
class Product:
...
def change_batch_quantity(self, ref: str, qty: int):
batch = next(b for b in self.batches if b.reference == ref)
batch._purchased_quantity = qty
while batch.available_quantity < 0:
line = batch.deallocate_one()
self.events.append(
events.AllocationRequired(line.orderid, line.sku, line.qty)
)
class Batch:
...
def deallocate_one(self) -> OrderLine:
return self._allocations.pop()
마치며
- 이벤트는 시스템 안의 내부 메시지와 입력에 대한 데이터 구조를 정의하는 간단한 데이터 클래스
- 핸들러는 이벤트에 반응하는 방법으로 모델을 호출하거나 외부 서비스를 호출 할 수 잇다
- 이런 아키텍처 패턴의 목적은 아키텍처의 복잡도가 증가하지만 필요한 작업을 수행하기 위해 더 이상 개념적으로나 아키텍처적으로 코드를 변경할 필요 없이 복잡한 요구사항 거의 대부분을 거의 다 처리할 수 있다
장점 | 단점 |
핸들러와 서비스가 똑같은 객체라서 더 단순하다 | 웹 이라는 관점에서 메시지 버스를 보면 여전히 예측하기 어려운 처리 방법이다. 작업이 언제 끝나는지 예측하기 어렵다. |
시스템 입력을 처리하기 좋은 데이터 구조가 있다 | 모델 객체와 이벤트 사이에 필드 구조 중복이 있고, 이에 대한 유지보수가 필요하다. 한 쪽에 필드를 추가한다면 다른 쪽에 속한 객체에 두 개 이상 필드를 추가해야 한다. |