파티셔너
카프카의 토픽은 병렬처리가 가능하도록 하기 위해 파티션으로 나뉘고 최소 하나 또는 둘 이상의 파티션으로 구성됩니다. 그리고 프로듀서가 kafka로 전송한 메시지는 해당 토픽 내 각 파티션의 로그 세그먼트에 저장됩니다. 따라서 프로듀서는 토픽으로 메시지를 보낼 때 해당 토픽의 어느 파티션으로 메시지를 보내야할지 결정해야 하는데 이때 사용하는 것이 바로 파티셔너 입니다. 프로듀서가 프로듀서가 파티션을 경정하는 알고리즘은 기본적으로 메시지의 키를 해시처리해 파티션을 구하는 방식을 사용합니다. 따라서 메시지의 키값이 동일하면 해당 메시지들은 모두 같은 파티션으로 전송됩니다.
예상치 못한 많은 양의 메시지가 kafka로 인입되는 경우 kafka는 클라이언트의 처리량을 높이기 위해 토픽의 파티션을 늘릴 수 있는 기능을 제공합니다. 이때 파티션 수가 변경됨과 동시에 메시지의 키와 매핑된 해시 테이블도 변경됩니다. 따라서 프로듀서가 동일한 메시지의 키를 이용해 메시지를 전송하더라도 파티션의 수를 늘린 후에는 다른 파티션으로 전송될 수 있습니다. 이렇게 메시지의 키를 이용해 kafka로 메시지를 전송하는 경우 관리자의 의도와는 다른 방식으로 메시지 전송이 이뤄질 수 있으므로 되로록 파티션 수를 변경하지 않는 것을 권장합니다.
라운드 로빈 전략
프로듀서의 메시지 중 레코드의 키 값은 필숫값이 아니므로, 관리자는 별도의 레코드 키값을 지정하지 않고 메시지를 전송할 수 있습니다. 만약 키값을 지정하지 않는다면 키값은 null이 되고 기본값인 라운드로빈 알고리즘을 사용해 프로듀서는 목적지 토픽의 파티션들로 레코드들을 랜덤 전송합니다.
파티셔너를 거친 후의 레코더들은 배치처리를 위해 프로듀서의 버퍼 메모리 영역에서 잠시 대기한 후 카프카로 전송됩니다. 배치 처리를 위해 잠시 메시지들이 대기하는 과정에서 라운드로빈 전략은 효율을 떨어뜨릴 수 있습니다. 프로듀서가 배치전송을 위한 최소레코드 수를 충족하지 못할경우 프로듀서 내에서 대기하고 있습니다.
스티키 파티셔닝 전략
라운드 로빈 전략에서 지연시간이 불필요하게 증가되는 비효율적인 전송을 개선하고자 2019년 출시된 아파치 카프카 2.4 버전 부터는 스티키 파티셔닝 전략을 사용하게 됩니다. 라운드 로빈 전략에서 배치 전송을 위한 필요 레코드 수를 채우지 못해 카프카로 배치전송을 하지 못했던 것과 달리, 스티키 파티셔닝이란 하나의 파티션에 레코드 수를 먼저 채워서 카프카로 빠르게 배치 전송하는 전략을 말합니다.
파티셔너는 배치를 위한 레코드 수에 도달할 때까지 다른 파티션으로 보내지 않고, 동일한 파티션으로 레코드를 담아놓습니다.
프로듀서의 배치
kafka에서는 토픽의 처리량을 높이기 위한 방법으로 토픽을 파티션으로 나눠 처리하며, kafka 클라이언트인 프로듀서에서는 처리량을 높이기 위해 배치 전송을 권장합니다. 따라서 프로듀서에서는 카프카로 전송하기 전 토픽의 파티션별로 레코드들을 잠시 보관하고 있습니다. 사용자가 프로듀서의 높은 처리량을 목표로 배치 전송을 설정하는 경우 주의해야 할 사항이 있습니다. 바로 버퍼 메모리 크기가 충분히 커야 한다는 점입니다. 즉 buffer.memory 크기는 반드시 batch.size보다 커야 합니다.
프로듀서는 배치전송을 위해 다음과 같은 옵션들을 제공합니다.
- buffer.memory : 카프카로 메시지들을 전송하기 위해 담아두는 프로듀서의 버퍼 메모리 옵션입니다.
- batch.size : 배치 전송을 위해 메시지들을 묶는 단위를 설정하는 배치 크기 옵션입니다.
- linger.ms : 배치 전송을 위해 버퍼 메모리에서 대기하는 메시지들의 최대 대기시간을 설정하는 옵션입니다.
중복없는 전송(멱등성)
메시지 시스템들의 메시지 전송 방식에는 '적어도 한번 전송', '최대 한번 전송', '정확히 한번 전송'이 있습니다.
적어도 한번 전송 과정 : 프로듀서가 ACK를 받지 못하면 재전송을 함. kafka는 기본적으로 이 구조를 사용합니다.
최대 한번 전송 과정 : 프로듀서가 ACK를 받지 못하더라도 재전송을 하지 않습니다. 일부 메시지의 손실을 감안하더라도 중복전송은 하지 않는 경우를 뜻한다. 높은 처리량을 필요로하는 대량의 로그 수집이나 IoT 같은 환경에서 사용하곤 합니다.
중복없는 전송 과정 : 각 메시지에 시퀀스를 헤더에 추가하여 전송. 프로듀서가 재전송한 메시지 헤더에서 시퀀스를 비교하여 이미 수신한 것인지 확인하고 중복된다면 저장하지 않고 Ack만 보냄
이렇듯 중복 없는 전송방식은 매우 유용해 보이지만 사실 결국 중복을 피하기 위한 메시지 비교 동작에는 오버헤드가 존재할 수밖에 없습니다. 기존 대비 20%의 성능이 감소된다고 합니다. 따라서 프로듀서 전송 성능에 그다지 민감하지 않은 상황에서 중복없는 메시지 전송이 필요하다면 이 방식을 설정해 적용할 것을 권장합니다. 중복없는 전송을 위해서는 다음의 일부 설정을 변경해야합니다.
- enable.idempotence=true
- max.in.flight.requests.per.connection=1~5
- acks=all
- retries=5
# vi /root/kafka_2.12-3.2.0/tmp/producer.config
------------ 입력 ------------
enable.idempotence=true
max.in.flight.requests.per.connection=5
retries=5
acks=all
# ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic peter-test03 --producer.config /root/kafka_2.12-3.2.0/tmp/producer.config
# ls /tmp/kafka-logs/peter-test03-0/
정확히 한번전송
프로듀서가 kafka로 정확히 한번 전송할때 프로듀서가 보내는 메시지들은 원자적으로 처리되어 전송에 성공하거나 실패하게 됩니다. 이런 프로듀서의 전송을 위해 kafka에는 컨슈머 그룹 코디네이터와 동일한 개념으로 트랜젝션 코디네이터 라는 것이 서버 측에 존재합니다. 이 트랜젝션 코디네이터의 역할은 프로듀서에 의해 전송된 메시지를 관리하며 커밋 또는 중단 등을 표시합니다. kafka에서는 컨슈머 오프셋 관리를 위해 오프셋 정보를 kafka의 내부 토픽에 저장하는데, 트랜젝션도 동일하게 트랜젝션 로그를 kafka의 내부 토픽인 __transaction_state에 저장합니다. __transaction_state는 kafka의 내부 토픽이지만 이 역시 토픽이므로 파티션 수와 리플리케이션 팬터 수가 존재하며, 브로커의 설정을 통해 관리자가 설정할 수 있습니다.
- transaction.state.log.num.partitions=50
- transaction.state.log.replication.factor=3
이때 프로듀서가 해당 토픽에 트랜젝션 로그를 직접 기록하는 것이 아님을 참고하기 바랍니다. 프로듀서는 트랜젝션 관련 정보를 트랜젝션 코디네이터에게 알리고, 모든 정보의 로그는 트랜젝션 코디네이터가 직접 기록합니다. 정확히 한번 전송을 이용해 전송된 메시지들이 kafka에 저장되면 kafka의 메시지를 다루는 클라이언트들은 해당 메시지들이 정상적으로 커밋된 것인지 또는 실패한 것인지 식별할 수 있어야합니다. kafka에서는 이를 식별하기 위한 정보로서 컨트롤 메시지라는 불리는 특별한 타입의 메시지가 추가로 사용됩니다.
컨트롤 메시지는 페이로드에 애플리케이션 데이터를 포함하지 않으며, 애플리케이션들에게 노출되지 않습니다. 컨트롤 메시지는 오직 브로커와 클라이언트 통신에서만 사용됩니다.
java로 구현한 소스는 다음과 같습니다.
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ExactlyOnceProducer {
public static void main(String[] args) {
String bootstrapServers = "peter-kafka01.foo.bar:9092";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 정확히 한번 전송을 위한 설정
props.setProperty(ProducerConfig.ACKS_CONFIG, "all"); // 정확히 한번 전송을 위한 설정
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // 정확히 한번 전송을 위한 설정
props.setProperty(ProducerConfig.RETRIES_CONFIG, "5"); // 정확히 한번 전송을 위한 설정
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "peter-transaction-01"); // 정확히 한번 전송을 위한 설정
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // 프로듀서 트랜잭션 초기화
producer.beginTransaction(); // 프로듀서 트랜잭션 시작
try {
for (int i = 0; i < 1; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("peter-test05", "Apache Kafka is a distributed streaming platform - " + i);
producer.send(record);
producer.flush();
System.out.println("Message sent successfully");
}
} catch (Exception e){
producer.abortTransaction(); // 프로듀서 트랜잭션 중단
e.printStackTrace();
} finally {
producer.commitTransaction(); // 프로듀서 트랜잭션 커밋
producer.close();
}
}
}
consumer.config 파일을 생성하겠습니다.
# vi /root/kafka_2.12-3.2.0/tmp/consumer.config
------------ 입력 ------------
exclude.internal.topics=false
Topic을 만들고 실행시켜보겠습니다.
# /root/kafka_2.12-3.2.0/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --create --topic peter-test05 --partitions 1
// 한번 송신 로직 실행
# java -jar ExactlyOnceProducer.jar
// 생성된 Topic 확인
# /root/kafka_2.12-3.2.0/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --list
// __transaction_state 내용 확인
# /root/kafka_2.12-3.2.0/bin/kafka-console-consumer.sh --bootstrap-server peter-kafka01.foo.bar:9092 --topic __transaction_state --consumer.config /root/kafka_2.12-3.2.0/tmp/consumer.config --from-beginning --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter"
출력결과
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=7, producerEpoch=0, txnTimeoutMs=60000, state=Empty, pendingState=None, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1655231725821)
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=7, producerEpoch=0, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(peter-test05-0), txnStartTimestamp=1655231725890, txnLastUpdateTimestamp=1655231725890)
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=7, producerEpoch=0, txnTimeoutMs=60000, state=PrepareCommit, pendingState=None, topicPartitions=Set(peter-test05-0), txnStartTimestamp=1655231725890, txnLastUpdateTimestamp=1655231725923)
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=7, producerEpoch=0, txnTimeoutMs=60000, state=CompleteCommit, pendingState=None, topicPartitions=Set(), txnStartTimestamp=1655231725890, txnLastUpdateTimestamp=1655231725928)
'PaaS > MQ' 카테고리의 다른 글
kafka 운영과 모니터링 (0) | 2022.06.24 |
---|---|
kafka consumer의 내부 동작 원리와 구현 (0) | 2022.06.22 |
kafka 리플리케이션, 컨트롤러, 로그 (0) | 2022.06.14 |
kafka 프로듀서, 컨슈머 개념과 사용법 (0) | 2022.06.13 |
kafka 소개 및 개념 (0) | 2022.06.13 |