Kafka를 사용하다 보면 특정 토픽의 파티션 개수에 의존하여 로직을 설계하는 경우가 있다. 예를 들어, 특정 이벤트를 사용자 ID로 나누어 모듈로 연산 후 적절한 파티션에 메시지를 전송하는 로직이 있다.
partition_count = 3 # 하드코딩된 파티션 개수
partition = user_id % partition_count
이 코드는 단순하고 명확하지만, 운영 중에 Kafka 토픽의 파티션 개수가 변경되면 문제가 발생할 수 있다. 새로운 파티션이 추가되더라도, 애플리케이션이 이를 반영하지 못해 메시지가 올바르게 분배되지 않을 위험이 있어 동적으로 파티션 개수를 가져오는 방법이 필요하다.
Kafka 클라이언트 라이브러리의 AdminClient를 사용하면, Kafka 클러스터에서 메타데이터를 조회하여 특정 토픽의 파티션 개수를 동적으로 가져올 수 있다.
아래는 Python의 confluent_kafka 라이브러리의 AdminClient를 사용한 구현 예제이다.
from confluent_kafka.admin import AdminClient
def get_partition_count(broker_url, topic_name):
admin_client = AdminClient({"bootstrap.servers": broker_url})
# 해당 서버에 있는 모든 토픽이 보여짐
metadata = admin_client.list_topics(timeout=10).topics
# 특정 토픽의 파티션 개수
return len(metadata[topic_name].partitions)
broker_url = "your.kafka.broker:9092" # Kafka 브로커 URL
topic_name = "your_topic" # 대상 토픽 이름
partition_count = get_partition_count(broker_url, topic_name)
partition = user_id % partition_count
- AdminClient
- Kafka 클러스터와 통신하여 메타데이터를 조회하는 역할
- list_topics()
- 카프카 클러스터의 모든 토픽 정보를 가져옴
- 반환된 metadata는 클러스터의 모든 토픽 이름과 각 토픽의 메타데이터를 포함하는 딕셔너리 형태
- 특정 토픽의 partitions 속성을 확인하여 파티션 개수를 반환
이 방법을 사용하면 운영 중에 Kafka 토픽의 파티션 개수가 변경되어도 애플리케이션 코드 수정 없이 대응할 수 있다. 애플리케이션 Pod(컨슈머 또는 프로듀서)를 재시작하면 변경된 파티션 정보를 자동으로 가져오게 된다.
'프로그래밍' 카테고리의 다른 글
InnoDB에서 쿼리 분할해 성능 개선하기 (1) | 2025.02.06 |
---|---|
파이썬 가비지 컬렉터 (3) | 2024.10.17 |
print vs logging (0) | 2023.12.01 |
빅쿼리 와일드카드(*) 사용 (0) | 2023.07.11 |
copy-on-write & defensive copy(함수형 프로그래밍) (1) | 2023.01.23 |