PaaS/MQ

kafka consumer의 내부 동작 원리와 구현

armyost 2022. 6. 22. 16:01
728x90

컨슈머의 오프셋관리 

 

컨슈머의 동작중 가장 핵심은 오프셋 관리입니다. 컨슈머는 카프카에 저장된 메시지를 꺼내오는 역할을 하기 때문에 컨슈머가 메시지를 어디까지 가져왓는지를 표시하는 것은 매우 중요합니다. 예를 들어 코드 배포로 인해 컨슈머가 일시적으로 동작을 멈추고 재시작하는 경우나, 컨슈머가 구동중인 서버에서 문제가 발생해 새로운 컨슈머가 기존 컨슈머의 역할을 대신 하는 경우에는 기존 컨슈머의 마지막 메시지 위치부터 새로운 컨슈머가 메시지를 가져올 수 있어야만 장애로부터 빠르게 복구될 수 있습니다. kafka에서는 메시지의 위치를 나타내는 위치를 오프셋이라고 부르는데, 이 오프셋은 숫자 형태로 나타냅니다. 컨슈머 그룹은 자신의 오프셋 정보를 kafka에서 가장 안전한 저장소인 토픽에 저장합니다. __consumer_offsets 토픽에 각 컨슈머 그룹별로 오프셋 위치 정보가 기록됩니다. 

 

※ 파티션 수와 리플리케이션 팩터 수는 기본값만 사용해도 충분한데, kafka 클러스터 초기에 잘못 설정해 __consumer_offset 토픽의 리플리케이션 팩터 수가 1로 설정되어 있는 경우도 있습니다. 따라서 본인이 운영하는 kafka 클러스터에서 __consumer_offsets 토픽의 리플리케이션 팩터 수가 어떻게 설정되어 있는지를 꼭 확인해보시기 바랍니다. 

 

 

그룹 코디네이터

 

컨슈머들은 하나의 컨슈머 그룹의 구성원으로 속하며, 컨슈머 그룹내의 각 컨슈머들은 서로 자신의 정보를 공유하면서 하나의 공동체로 동작합니다. 컨슈머 그룹내의 컨슈머들은 언제든지 자신이 속한 컨슈머 그룹에서 떠날 수 있으며 새로운 컨슈머가 합류할 수도 있습니다. 따라서 컨슈머 그룹은 이러한 변화를 인지하고 각 컨슈머들에게 작업을 균등하게 분배해야 합니다. 컨슈머 그룹에서 각 컨슈머들에게 작업을 균등하게 분해하는 동작을 컨슈머 리벨런싱이라고 부릅니다. 

 

앞에서 '정확히 한번 전송'을 관리하기 위한 트랜젝션 코디네이터라는 것을 살펴봤습니다. 컨슈머 그룹 역시 안정적인 컨슈머 그룹 관리를 위해 별도의 코디네이터가 존재하는데 이를 kafka에서는 그룹 코디네이터라고 부릅니다. 그룹 코디네이터의 목적은 컨슈머 그룹이 구독한 토픽의 파티션들과 그룹의 멤버들을 트래킹 하는 것입니다. 따라서 파티션 또는 그룹의 멤버에 변화가 생기면 작업을 균등하게 재분배 하기 위해 컨슈머 리벨런싱 동작이 발생합니다. 그룹 코디네티어는 각 컨슈머 그룹별로 존재하며 이러한 그룹 코디네이터는 kafka클러스터내의 브로커중 하나에 위치합니다. 하트비트 옵션을 통해 컨슈머의 상태를 확인합니다.

 

# /root/kafka_2.12-3.2.0/bin/kafka-topics.sh  --bootstrap-server  peter-kafka01.foo.bar:9092 --group peter-consumer01 --describe

 

 

컨슈머 옵션

  • heartbeat.interval.ms : 기본값은 3000이며, 그룹 코디네이터와 하트비트 인터벌 시간입니다. 해당 시간은
  • session.timeout.ms보다 낮게 설정해야 하며, 1/3 수준이 적절합니다. 
  • session.timeout.ms : 기본값은 10000입니다. 어떤 컨슈머가 특정 시간안에 하트비드를 받지 못하면 문제가 발생했다고 판단해 컨슈머 그룹에서 해당 컨슈머는 제거되고 리벨런싱 동작이 일어납니다. 
  • max.poll.interval.ms : 기본값은 300000입니다. 컨슈머는 주기적으로 poll()을 호출해 토픽으로부터 레코드들을 가져오는데, poll()호출후 최대 5분간 poll()호출이 없다면 컨슈머가 문제가 있는것으로 판단해 리벨런싱 동작이 일어납니다. 

 

스태틱 멤버쉽

때로는 하드웨어 점검이나 소프트웨어 업데이트 등의 이유로 컨슈머 그룹내의 컨슈머들을 하나씩 순차적으로 재시작하고 싶은 경우가 있을 것입니다. 하지만 하트비트 주기, 세션 타임아웃 등의 설정으로 인해 하나의 컨슈머가 재시작될때마다 전체 리벨런싱이 일어나며, 리벨런싱 작업이 일어나는 동안 컨슈머들은 일시 중지하므로 이는 매우 번거로운 일이 아닐수 없습니다. 따라서 kafka는 이러한 불필요한 리벨런싱을 방어하기 위해 2.3버전부터 스태틱 멤버십이라는 개념을 도입했습니다. 스태틱 멤버십이란 컨슈머 그룹내에서 컨슈머가 재시작 등으로 그룹에서 나갔다가 다시 합류하더라도 리벨런싱이 일어나지 않게 합니다. 그 뿐 아니라 스태틱 멤버십 기능이 적용된 컨슈머는 그룹에서 떠날 때 그를 코디네이터에게 알리지 않으므로 불필요한 리벨런싱도 발생하지 않습니다. 

 

컨슈머 파티션 할당 전략

- 레인지 파티션 할당 전략

레인지 파티션 할당 전략인 RangeAssigner는 파티션 할당 전략중 기본 값으로서, 각 토픽별로 할당 전략을 사용하게 됩니다. 레인지 파티션 할당 전략은 먼저 구독하는 토픽에 대한 파티션을 순서대로 나열한 후 컨슈머를 순서대로 정렬합니다. 그런 다음 각 컨슈머가 몇개의 파티션을 할당해야 하는지 전체 파티션 수를 컨슈머 수로 나눕니다. 컨슈머 수와 파티션수가 일치하면 균등하게 할당될수 있지만 균등하게 나눠지지 않는 경우에는 앞쪽의 컨슈머들은 추가 파티션을 할당 받게 됩니다. 이렇게 불균형하게 할당되는 레인지 파티션 할당전략은 왜 이용할까요? 레인지 파티션 할당전략은 동일한 레코드 키를 사용하고 하나의 컨슈머 그룹이 동일한 파티션 수를 가진 2개 이상의 토픽을 컨슘할 때 유용할 수 있습니다. 결과적으로 동일한 키 값 abc를 갖고 있는 두 토픽의 파티션을 하나의 컨슈머가 컨슘하는 것입니다.  레인지 파티션 할당 전략은 방금 설명한 사례와 같이 특수한 환경에서 좋지만 컨슈머에 균등하게 파티션이 분배되지 않으므로 컨슈머 그룹은 불균형한 상태로 운영될 수 잇다는 점에 유의해야 합니다. 

 

라운드 로빈 파티션 할당 전략

라운드 로빈 파티션 할당 전략은 파티션 할당 전략 중 가장 간단한 할당 방식입니다. 먼저 컨슘해야 하는 모든 파티션과 컨슈머 그룹내 모든 컨슈머를 나열한 후 라운드 로빈으로 하나씩 파티션과 컨슈머를 할당하는 전략입니다. 토픽내 모든 파티션을 나열하고 컨슈머를 라운드로빈으로 나열하여 매핑합니다. 이로써 균등하게 매핑됩니다. 

 

스티키 파티션 할당 전략

컨슈머 그룹의 리벨런싱 동작으로 인해 파티션이 재할당된다면 어떤 상황이 벌어질까요? 레인지 파티션 할당 전략과 라운드 로빈 파티션 할당 전략 모두 파티션 재할당 작업이 발생하면 기존에 매핑됐던 파티션과 동일한 컨슈머가 다시 매핑되리라고는 보장할 수 없습니다. 따라서 이러한 재할당 작업이 발생하더라도 기존에 매핑됐던 파티션과 컨슈머를 최대한 유지하려고 하는 전략이 바로 스티키 파티션 할당 전략입니다. 스티키 파티션 할당 전략은 두가지 목적으로 컨슈머에 파티션을 할당합니다. 첫번째 목적은 가능한 한 균형 잡힌 파티션 할당이고, 두번째 목적은 재할당이 발생할때 되도록 기존의 할당된 파티션 정보를 보장하는 것입니다. 둘 중에서는 두번째 목적보다는 첫번째 목적의 우선순위가 더 높습니다. 따라서 스티키 파티션 할당 전략이라고 해서 무조건 기존의 파티션과 컨슈머를 유지하지는 않습니다. 최대한 컨슈머를 균등하게 분배하는 것을 우선하므로 일부 파티션은 기존의 컨슈머와 매핑을 유지하지 못하고 새로운 컨슈머와 연결될 수 도 있습니다. 스티키 파티션 할당 전략에서 최초의 배치 전략은 라운드 로빈할당 전략과 매우 흡사합니다. 하지만 리벨런싱이 일어났을 때와 비교하면 스티키 파티션 할당 전략은 라운드 로빈 할당 전략과 확연한 차이가 있습니다. 스티키 파티션할당시 하나의 컨슈머가 없어지는 케이스가 발생하더라도 다른 컨슈머에 할당된 것은 유지된체 사라진 컨슈머에 할당되었었던 파티션들만 재할당이 일어납니다. 이런 로직이 가능한 이유는

  • 컨슈머 들의 최대 할당된 파티션수의 차이는 1
  • 기존에 존재하는 파티션 할당은 최대한 유지함
  • 재할당 동작 시 유효하지 않은 모든 파티션 할당은 제거
  • 할당되지 않은 파티션들은 균형을 맞추는 방법으로 컨슈머들에게 할당

협력적 스티키 파티션 할당 전략

기존의 스티키 파티션 전략과 한가지 차이점이 있는데, 바로 컨슈머 그룹 내부의 리벨런싱 동작이 한층 더 고도화 되었다는 것입니다. 지금까지의 컨슈머 리벨런싱 동작에는 내부저긍로 EAGER라는 리벨런스 프로토콜을 사용했고, EAGER 프로토콜은 컨슈머 리벨런싱 동작 시 컨슈머에 할당된 모든 파티션을 항상 취소 했습니다. 이렇게 리벨런싱 동작에서 모든 파티션을 항상 취소하는 이유로는 다음과 같은 두가지를 꼽을 수 있습니다. 첫째로 컨슈머들의 파티션 소유권 변경 때문입니다. 예를들어 A 컨슈머가 갖고 있는 0번 파티션의 소유권을 B 컨슈머에게 할당해야 하는 경우를 생각해봅시다. 하지만 하나의 컨슈머 그룹 내에서는 둘이상의 컨슈머가 동일한 피티션을 소유할 수 없으므로 0번 파티션의 소유권을 B 컨슈머에게 넘길 수 없습니다. 결국 0번 파티션의 소유권을 넘기려면 0번 파티션에 대해 어떤 컨슈머도 소유권을 갖고 있어서는 안됩니다. 둘째로 그룹 내에서 여러 파티션들에 대해 소유권 변경작업 이 동시에 이뤄져야 하므로 이러한 로직을 단순하게 구현하기 위해서 입니다. 

 

이렇게 리벨런싱 동작에서 컨슈머에 할당된 모든 파티션을 취소했으므로 컨슈머 그룹 전체 에서 컨슈머와 파티션 재할당이 가능했고 한번의 리벨런싱 동작으로 모든 컨슈머와 파티션 매핑을 할 수 있었습니다. 하지만 리벨런싱에서 모든 파티션 할당을 취소하는 동작은 리소스를 많이 사용하는 컨슈머 그룹에서는 큰문제가 됩니다. 바로 컨슈머들의 다운타임 입니다. 다운타임동안에는 LAG이 급격하게 증가합니다. 

 

이러한 이슈를 개선코자 kafka2.3버전부터 kafka 커넥트에 새로운 협력적 스티키 파니션 할당 전략이 적용됐고 kafka 2.4 버전부터 컨슈머 클라이언트에도 적용 됐습니다. 혁력적 스티키는 내부 리벨런싱 프로토콜인 EAGER가 아닌 COOPERATIVE 프로토콜을 적용하기 시작했고 이 프로토콜은 리벨런싱이 동작하기 전의 컨슈머 상태를 유지할 수 있게 했습니다. 협력적 스티키 파티션 할당 전략은 안전하게 파티션의 소유권 이동을 위해 리벨런싱 작업이 수차례에 결쳐 진행하는 것도 나쁘지 않다는 아이디어에서 출발했습니다. 순서는 다음과 같습니다. 

 

1. 컨슈머 그룹에 새로운 컨슈머가 합류하면서 리벨런싱이 트리거 됩니다.

2. 컨슈머 그룹 내 컨슈머들은 그룹 합류 요청과 자신들이 컨슘하는 토픽의 파티션 정보를 그룹 코디네이터로 전송합니다. 

3. 그룹 코디네이터는 해당 정보를 조합해 컨슈머 그룹의 리더에게 전송합니다. 

4. 컨슈머 그룹의 리더는 현재 컨슈머들이 소유한 파티션 정보를 활용해 제외해야 할 파티션정보를 담은 새로운 파티션 할당정보를 컨슈머 그룹 멤버들에게 전달합니다. 

5. 새로운 파티션 할당 정보를 받은 컨슈머 그룹 멤버들은 현재의 파티션 할당 전략과 차이를 비교해보고 필요없는 파티션을 골라 제외합니다. 

6. 제외된 파티션 할당을 위해 컨슈머들은 다시 합류 요청을 합니다. 여기서 두번째 리벨런싱이 트리거 됩니다. 

7. 컨슈머 그룹의 리더는 제외된 파티션을 적절한 컨슈머에게 할당합니다. 

 

기존 리벨런싱 방식이었던 EAGER와 COOPERATIVE 프로토콜 방식의 성능 비교 결과도 공개됐는데 한번에 일괄 작업하는 EAGER 방식보다 COOPERATIVE 프로토콜 방식이 더 빠른 시간안에 짧은 다운타임을 가지고 리벨런싱을 완료할 수 있었습니다. 협력적 스티키 전략은 이렇게 장점이 많은 전략이므로 향후 릴리즈될 아파치 3.0버전에서는 파티션 할당 전략의 기본값으로 채택될 가능성이 높습니다. 

 

정확히 한번 컨슈머 동작

프로듀서의 '정확히 한번 전송' 동작을 위해 브로커 측에서는 전체 트랜젝션을 관리하면서 프로듀서의 동작을 보조하는 별도의 트랜젝션 코디네이터가 필요했습니다. 트랜젝션 코디네이터는 프로듀서의 정확히 한번 전송이 성공하면 해당 레코드의 트랜젝션 성공을 표시하는 특수한 메시지를 추가합니다. 따라서 컨슈머는 트랜잭션 코디네이터가 특수한 메시지를 표시한 레코드만 읽는다면, 정확히 한번 읽을 수 있습니다.