책 & 스터디

[파이썬으로 살펴보는 아키텍처 패턴] 챕터 9 메시지 버스를 타고 시내로 나가기

haong_ 2024. 5. 15. 22:20
이번 장에서는 애플리케이션의 내부 구조에서 이벤트를 더 근본적인 요소를 만들면서 시작한다 

새로운 아키텍처가 필요한 새로운 요구 사항

현실에서는 예기지 못한 상황이 발생할 수 있다 

  • 재고 조사를 하는 동안 물이 새서 매트리스 3개가 손상된 것을 발견했다 
  • 포크가 배송 지연 되어 몇 주 동안 세관에 머물러야 했다. 그 중 3개가 안전 검사에서 실패해 폐기했다.
  • 금속이 부족해져서 다음 책장을 배치를 생산할 수 없게 됐다. 

배치 수량이 변경되면 할당을 해제하고 재할당 해야 한다 

구조 변경 상상해보기: 모든 것이 이벤트 핸들러

  • 서비스 계층 함수에 의해 처리되는 API 호출
  • 내부 이벤트와 그 이벤트에 대한 핸들러 

모든 것이 이벤트 핸들러라면? > 서비스 계층 함수도 이벤트라고 생각할 수 있다 

  • services.allocate() 는 AllocationRequired 이벤트의 핸들러이거나 Allocate 이벤트를 출력으로 보낼 수 있다
  • services.add_batch() 도 BatchCreated 이벤트의 핸들러라고 볼 수 있다

새로운 요구 사항도 같은 패턴으로 볼 수 있다

  • BatchQuantityChanged 이벤트는 change_batch_quantity() 라는 핸들러를 호출할 수 있다
  • 새로운 AllocationRequired 이벤트가 services.allocate() 를 호출할 수 있다 

서비스 함수를 메시지 핸들러로 리팩토링하기

  • AllocationRequired 와 BatchCreated 추가 
'''
events.py
'''
# domain/events.py

@dataclass
class BatchCreated(Event):
    ref: str
    sku: str
    qty: int
    eta: Optional[date] = None

...

@dataclass
class AllocationRequired(Event):
    orderid: str
    sku: str
    qty: int
  • service.py 를 handler.py로 변경 
  • 기존 메시지 핸들러인 send_out_of_stock_notification 추가 
# service_layer/handler.py

def add_batch(
    event: events.BatchCreated,
    uow: unit_of_work.AbstractUnitOfWork,
):
    with uow:
        product = uow.products.get(sku=event.sku)
        if product is None:
            product = Product(event.sku, batches=[])
            uow.products.add(product)
        product.batches.append(
            Batch(event.ref, event.sku, event.qty, event.eta)
        )
        uow.commit()


def allocate(
    event: events.AllocationRequired,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(event.orderid, event.sku, event.qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        batchref = product.allocate(line)
        uow.commit()
        return batchref
        
        
 def send_out_of_stock_notification(
    event: events.OutOfStock,
    uow: unit_of_work.AbstractUnitOfWork,
):
    email.send(
        "stock@made.com",
        f"Out of stock for {event.sku}",
    )

메시지 버스는 이제 이벤트를 UoW로부터 수집한다 

# service_layer/messagebus.py

def handle(
    event: events.Event,
    uow: unit_of_work.AbstractUnitOfWork
): # 1. 
    queue = [event]  # 2. 
    while queue:
        event = queue.pop(0)  # 3. 
        for handler in HANDLERS[type(event)]:  
            handler(event, uow=uow)  # 4. 
            queue.extend(uow.collect_new_events())  # 5.
  1. 메시지 버스는 시작할때마다 uow 전달 받음 
  2. 첫번재 이벤트 처리 할때 대기열 시작
  3. 대기열 앞에서 이벤트 pop해서 적절한 핸들러에 넘긴다 
  4. 메시지 버스는 UoW를 각 핸들러에게 전달
  5. 핸들러 종료되면 새롭게 생성된 이벤트를 수집하고 이 이벤트들을 대기열에 추가 
# service_layer/unit_of_work.py

from . import messagebus # 1. 

class AbstractUnitOfWork(abc.ABC):
	...

    def commit(self):
        self._commit()
        # self.publish_events() # 2. 

    def collect_new_events(self):
        for product in self.products.seen:
            while product.events:
                yield product.events.pop(0) # 3.
  1. uow 모듈은 더이상 메시지 버스에 의존하지 않는다
  2. 커밋이 일어나도 publish_events를 자동으로 호출하지 않고 베시지 버스에서 이벤트 대기열을 추적
  3. UoW는 메시지 버스에 능동적으로 추가하지 않고 이벤트를 제공 

모든 테스트를 이벤트 바탕으로 다시 쓰기 

# tests/unit/test_handler.py

class TestAddBatch:
    def test_for_new_product(self):
        uow = FakeUnitOfWork()
        messagebus.handle(
            events.BatchCreated("b1", "CRUNCHY-ARMCHAIR", 100, None), uow
        )
        assert uow.products.get("CRUNCHY-ARMCHAIR") is not None
        assert uow.committed

    def test_for_existing_product(self):
        uow = FakeUnitOfWork()
        messagebus.handle(events.BatchCreated("b1", "GARISH-RUG", 100, None), uow)
        messagebus.handle(events.BatchCreated("b2", "GARISH-RUG", 99, None), uow)
        assert "b2" in [b.reference for b in uow.products.get("GARISH-RUG").batches]


class TestAllocate:
    def test_returns_allocation(self):
        uow = FakeUnitOfWork()
        messagebus.handle(
            events.BatchCreated("batch1", "COMPLICATED-LAMP", 100, None), uow
        )
        results = messagebus.handle(
            events.AllocationRequired("o1", "COMPLICATED-LAMP", 10), uow
        )
        assert results.pop(0) == "batch1"

임시 : 결과를 반환해야 하는 메시지 버스 

이벤트를 반환하도록 return 추가 (읽기 쓰기 책임 혼합 > 12장에서 CQRS 패턴 알아볼 예정) 

def handle(
    event: events.Event,
    uow: unit_of_work.AbstractUnitOfWork,
):
    results = []
    queue = [event]
    while queue:
        event = queue.pop(0)
        for handler in HANDLERS[type(event)]:
            results.append(handler(event, uow=uow))
            queue.extend(uow.collect_new_events())
    return results

이벤트로 작동하도록 API 변경

# src/allocation/entrypoints/flask_app.py

@app.route("/allocate", methods=["POST"])
def allocate_endpoint():
    try:
        event = events.AllocationRequired( # 1. 
            request.json["orderid"], request.json["sku"], request.json["qty"]
        )
        results = messagebus.handle(event, unit_of_work.SqlAlchemyUnitOfWork()) # 2.
        batchref = results.pop(0)
    except InvalidSku as e:
        return {"message": str(e)}, 400

    return {"batchref": batchref}, 201
  1. 이벤트를 인스턴스화 하고
  2. 메시지 버스에 이벤트 전달 

새로운 이벤트 

배치수량의 변경을 알려주는 이벤트 

@dataclass
class BatchQuantityChanged(Event):
    ref: str
    qty: int

구현

# service_layer/handler.py

def change_batch_quantity(
    event: events.BatchQuantityChanged,
    uow: unit_of_work.AbstractUnitOfWork,
):
    with uow:
        product = uow.products.get_by_batchref(batchref=event.ref)
        product.change_batch_quantity(ref=event.ref, qty=event.qty)
        uow.commit()

도메인 모델의 새 메서드 

  • 모델에 새 메서드 추가 
  • 수량을 변경하자마자 인라인으로 할당을 해제하고 새 이벤트 발행 
  • 기존 할당 함수를 이벤트가 발생하도록 변경
    # domain/model.py
    
class Product:
    ...
    def change_batch_quantity(self, ref: str, qty: int):
        batch = next(b for b in self.batches if b.reference == ref)
        batch._purchased_quantity = qty
        while batch.available_quantity < 0:
            line = batch.deallocate_one()
            self.events.append(
                events.AllocationRequired(line.orderid, line.sku, line.qty)
            )
 
 class Batch:
 	...
    def deallocate_one(self) -> OrderLine:
        return self._allocations.pop()

마치며 

  • 이벤트는 시스템 안의 내부 메시지와 입력에 대한 데이터 구조를 정의하는 간단한 데이터 클래스 
  • 핸들러는 이벤트에 반응하는 방법으로 모델을 호출하거나 외부 서비스를 호출 할 수 잇다 
  • 이런 아키텍처 패턴의 목적은 아키텍처의 복잡도가 증가하지만 필요한 작업을 수행하기 위해 더 이상 개념적으로나 아키텍처적으로 코드를 변경할 필요 없이 복잡한 요구사항 거의 대부분을 거의 다 처리할 수 있다 
장점 단점
핸들러와 서비스가 똑같은 객체라서 더 단순하다 웹 이라는 관점에서 메시지 버스를 보면 여전히 예측하기 어려운 처리 방법이다. 작업이 언제 끝나는지 예측하기 어렵다.
시스템 입력을 처리하기 좋은 데이터 구조가 있다 모델 객체와 이벤트 사이에 필드 구조 중복이 있고, 이에 대한 유지보수가 필요하다. 한 쪽에 필드를 추가한다면 다른 쪽에 속한 객체에 두 개 이상 필드를 추가해야 한다.