PaaS/MQ

kafka 스키마 레지스트리

armyost 2022. 7. 8. 16:57
728x90

중앙 데이터 파이프라인 역할을 하는 카프카에서는 단 한명의 사용자가 단 하나의 토픽에만 접근해 읽고 쓰는것이 아니라 수많은 사용자가 수많은 토픽을 이용합니다. 그리고 그 토픽에는 애플리케이션 하나가 아니라 다수의 각기 다른 애플리케이션들이 접근할 것입니다. 따라서 만에 하나 누군가의 실수로 사전에 정의하지 않은 형태의 데이터를 해당 토픽으로 보내기라도 한다면 그와 연결된 모든 시스템이 영향을 받게 될 것이고 이는 매우 심각한 문제로 이어질 수 있습니다. 한가지 예를 더 들어보겠습니다. kafka의 데이터 흐름은 대부분 브로드캐스트 방식입니다. 다시 말하자면 kafka는 데이터를 전송하는 프로듀서를 일방적으로 신뢰할 수밖에 없는 방식입니다. 따라서 프로듀서 관리자는 kafka 토픽의 데이터를 컨슘하는 관리자에게 반드시 데이터 구조를 설명해야 합니다. 데이터를 컨슘하는 관리자가 한명이라면 데이터 구조를 설명하는 일이 어렵지 않겠지만, 대상 부서가 많고 관리자도 자주 바뀐다면 그때마다 데이터 구조를 설명하는 일이 쉬운일은 아닐 것입니다. 이와 같이 데이터를 컨슘하는 여러 부서에게 그 데이터에 대한 정확한 정의와 의미를 알려주는 역할을 하는 것이 바로 스키마 입니다. 

이렇게 스키마를 정의해두면 데이터 트러블슈팅 감소, 용이한 데이터 포맷확인, 데이터 스키마 관련 커뮤니케이션 감소 등 얻을 수 있는 이점들이 많기 때문에 kafka에서 스키마 사용은 권장사항에 속합니다. 

 

스키마 레지스트리 란?

스키마를 활용하는 방법은 스키마 레지스트리라는 별도의 애플리케이션을 이용하는 것입니다. 아파치 오픈소스 라이선스가 아닌 컨플루언트 커뮤니티 라이선스를 갖고 있는데 비상업적인 용도에 한해 스키마 레지스트리를 무료로 사용가능합니다. 

 

스키마 레지스트리는 kafka와 별도로 구성된 독립적 애플리케이션으로서 kafka로 메시지를 전송하는 프로듀서와 직접 통신하며 kafka로부터 메시지를 꺼내오는 컨슈머와도 직접 통신합니다. 

 

클라이언트들이 스키마 정보를 사용하기 위해서는 프로듀서와 컨슈머, 스키마 레지스트리간에 직접 통신이 이뤄져야 합니다. 프로듀서는 스키마 레지스트리에 스키마를 등록하고 스키마 레지스트리는 프로듀서에 의해 등록된 스키마 정보를 kafka의 내부 토픽에 저장합니다. 프로듀서는 스키마 레지스트리에 등록된 스키마의 ID와 메시지를 kafka로 전송하고 컨슈머는 스키마 ID를 스키마 레지스트리로부터 읽어온 후 프로듀서가 전송한 스키마ID와 메시지를 kafka로 전송하고 컨슈머는 스키마 ID를 스키마 레지스트리로부터 읽어온 후 프로듀서가 전송한 스키마ID와 메시지를 조합해 읽을 수 있습니다. 스키마 레지스트리가 지원하는 데이터 포맷을 사용해야 하는데, 가장 대표적인 포맷은 에이브로입니다. 

 

스키마 레지스트리의 에이브로 지원

에이브로(Avro)는 시스템, 프로그래밍 언어, 프로세싱 프레임워크 사이에서 데이터 교환을 도와주는 오픈소스 직렬화 시스템입니다. 아파치 하둡 프로젝트에서 처음 시작된 에이브로는 시스템, 프로그래밍 언어, 프로세싱 프레임워크 사이에서 데이터 교환을 도와주는 오픈소스 직렬화 시스템으로서 빠른 바이너리 데이터 포맷을 지원하며 JSON 형태의 스키마를 정의할 수 있는 매우 간결한 데이터 포맷입니다. 스키마 레지스트리는 에이브로 포맷을 가장먼저 지원했으며 최근에는 JSON, 프로토콜 버퍼 포맷도 지원하고 있습니다. 

따라서 관리자는 이 세가지 포맷중 한가지를 선택해 스키마 레지스트리를 사용할 수 있습니다. 대중적으로 가장 많이 사용하는 포맷은 JSON이겟지만, 컨플루언트는 다음과 같은 이유로 스키마 레지스트리에서 에이브로 포맷사용을 권장하고 있습니다. 

  • 에이브로는 JSON과 매핑된다. 
  • 에이브로는 매우 간결한 데이터 포맷이다. 
  • JSON은 메시지마다 필드 네임들이 포함되어 전송되므로 효율이 떨어진다.
  • 에이브로는 바이너리 형태이므로 매우 빠르다.

대략적인 개념만 살펴봤지만, 이제부터 스키마 레지스트리와 에이브로 스키마 실습을 직접 진행해보면 지금 언급하는 장점들이 어떤 의미인지 이해할 수 있을 것입니다. 에이브로를 활용해 스키마 예제 파일을 만들겠습니다. 우리는 한반의 학생 명단을 작성할 것인데, 학생의 이름은 문자형으로 학생이 속한 반은 정수형으로 정의하는 예제 파일을 만들어 보겠습니다. 지금 만드는 예제 파일은 이번 장 전반에 걸쳐 조금씩 응용하면서 실습에 이용할 예정입니다. 

{"namespace": "student.avro",
 "type": "record",
 "doc": "This is an example of Avro.",
 "name": "Student",
 "fields": [
 	{"name": "name", "type": "string", "doc": "Name of the student"},
  	{"name": "class", "type": "int", "doc": "Class of the student"}
 ]
}

JSON과 유사해 보이지만, 데이터 필드마다 데이터 타입을 정의할 수 있고, doc를 이용해 각 필드의 의미를 데이터를 사용하고자 하는 사용자들에게 정확하게 전달할 수 있습니다. 이러한 doc 기능을 활용하면 데이터 필드를 정의한 엑셀 문서나 위키 페이지 등을 각 부서가 서로 공유하지 않아도 됩니다. 또한 담당자의 퇴사로 인해 인수인계가 제대로 이뤄지지 않았더라도 다른 사용자나 담당자가 스키마 파일의 정의 내용만 확인하면 필드의 의미를 쉽게 파악할 수 있습니다. 

 

스키마 레지스트리의 설치

스키마 레지스트리 설치에 앞서 kafka 설정을 초기화 하고 진행하려합니다. 참고바랍니다.

 

스키마 레지스트리 파일을 다운로드 합니다. 압축 해제후 심볼릭 링크를 설정합니다.

# wget http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz -0 /opt/confluent-community-6.1.0.tar.gz
# tar zxf /opt/confluent-community-6.1.0.tar.gz -C /usr/local
# ln -s /usr/local/confluent-6.1.0 /usr/local/confluent

 

다음으로는 스키마 레지스트리를 설정해봅시다. 

# vi /usr/local/confluent/etc/schema-registry/schema-registry.properties
---------------------------------
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092
kafkastore.topic=_schemas
schema.compatibility.level=full

schema.compatibility.level이란 스키마 호환성 레벨입니다. 

 

브로커의 _schemas 토픽이 스키마 레지스트리의 저장소로 활용되며, 모든 스키마의 제폭, 버전, ID등이 저장됩니다. 스키마 관리 목적으로 사용되는 메시지들은 순서가 중요하기 때문에 _schemas 토픽의 파티션 수는 항상 1입니다. 이때 주의 깊게 살펴봐야 할 토픽 설정은 cleanup.policy가 compact로 설정된 부분입니다. 

이제 설정이 마무리 됐으니 다음과 같이 systemd를 이용해 reload와 start 명령어로 스키마 레지스트리를 실행해보겠습니다. 

# vi /etc/systemd/system/schema-registry.service

 

스키마 레지스트리가 잘 실행됐는지 알아보려면 리스닝 포트를 확인하는 등 여러방법이 있지만 여기서는 스키마 레지스트리가 제공하는 API를 이용해 확인해보겠습니다. 다음 명령어를 이용해 스키마 레지스트리 설정 정보를 가져옵니다. 

# curl -X GET http://peter-kafka03.foo.bar:8081/config

이러한 API를 잘활용하면 애플리케이션 개발을 간소화할 수 있으며 시간도 절약할 수 있다는 장점이 있습니다. 

옵션 설명
GET /schemas 현재 스키마 레지스트리에 등록된 전체 스키마 리스트 조회
GET /schemas/ids/id 스키마 ID로 조회
GET /schemas/ids/id/versions 스키마 ID의 버전
GET /subjects 스키마 레지스트리에 등록된 subject 리스트 subject는 토픽이름-key, 토픽이름-value 형태로 쓰임
GET /subjects/서브젝트 이름/versions 특정 서브젝트의 버전 리스트 조회
GET /config 전역으로 설정된 호환성 레벨 조회
GET /config/서브젝트 이름 서브젝트에 설정된 호환성 조회
DELETE /subjects/서브젝트 이름 특정 서브젝트 전체 삭제
DELETE /subjects/서브젝트 이름/versions/버전 특정 서브젝트에서 특정 버전만 삭제

 

스키마 레지스트리와 클라이언트 동작

1. 에이브로 프로듀서는 컨플루언트에서 제공하는 io.confluent,kafka.serializers.KafkaAvroSerializer 라는 새로운 직렬화를 사용해 스키마 레지스크리의 스키마가 유효한지 여부를 확인합니다. 만약 스키마가 확인되지 않으면 에비르로 프로듀서는 스키마를 등록하고 캐시합니다. 

 

2. 스키마 레지스트리는 현 스키마가 저장소에 저장된 스키마와 동일한 것인지 전화한 스키마인지를 확인합니다. 스키마 레지스트리 자체적으로 각 스키마에 대해 고유 ID를 할당하게 됩니다. 이 ID는 순차적으로 1씩 증가하지만 반드시 연속적이진 않습니다. 스키마에 문제가 없다면 스키마 레지스트리는 프로듀서에게 고유 id를 응답합니다. 

 

3. 이제 프로듀서는 스키마 레지스트리로 부터 받은 스키마 ID를 참고해 메시지를 카프카로 전송합니다. 이때 프로듀서는 스키마의 전체 내용이 아닌 오로지 메시지와 스키마 ID만 보닙니다. JSON은 키:벨류 형태로 전체 메시지를 전송해야 하지만 에이브로를 사용하면 프로듀서가 스키마 ID와 벨류만 메시지로 보내게 되어 kafka로 전송하는 전체 메시지의 크기를 줄일 수 있으며, 이는 JSON보다 에이브로를 사용하는 편이 더 효율적인 이유이기도 합니다. 

 

4. 에이브로 컨슈머는 스키마 ID로 컨플루언트에서 제공하는 io.confluent.kafka.seriallizer.KafkaAvroDeserializer 라는 새로운 역직렬화를 사용해 kafka의 토픽에 저장된 메시지를 읽습니다. 이때 컨슈머가 스키마 ID를 갖고 있지 않다면 스키마 레지스트리로부터 가져옵니다. 

 

이와 같이 프로듀서와 컨슈머는 직접 스키마를 주고받는 등의 통신을 하지는 않지만 각자 스키마 레지스트리와 통신하면서 스키마의 정보를 주고받을 수 있습니다. 프로듀서가 스키마 정보를 스키마 레지스트리에 등록함으로써 프로듀서가 전송하는 메시지의 크기도 줄일 수 있고, 컨슈머가 읽는 메시지의 크기도 줄일 수 있습니다. 그뿐 아니라 사전에 정의되지 않은 형식의 메시지는 전송할 수 없으므로 데이터를 처리하는 쪽에서 협의되지 않은 메시지가 들어오는 경우에 대해 고민하지 않아도 된다는 장점이 있습니다. 

 

파이썬을 이용한 스키마 레지스트리 활용

# pip install confluent-kafka[avro]

 

에이브로 메시지 전송을 위한 기본 파이썬 에이브로 프로듀서

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

value_schema_str = """
{"namespace": "student.avro",
 "type": "record",
 "doc": "This is an example of Avro.",
 "name": "Student",
 "fields": [
     {"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
     {"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
 ]
}
"""

value_schema = avro.loads(value_schema_str)
value = {"name": "Peter", "class": 1} # 전송할 메시지

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

avroProducer = AvroProducer({
    'bootstrap.servers': 'peter-kafka01.foo.bar,peter-kafka02.foo.bar,peter-kafka03.foo.bar',
    'on_delivery': delivery_report,
    'schema.registry.url': 'http://peter-kafka03.foo.bar:8081'
    }, default_value_schema=value_schema)

avroProducer.produce(topic='peter-avro2', value=value)
avroProducer.flush()
# python python-avro-producer.py

 

에이브로 메시지를 가져오기 위한 기본 파이썬 에이브로 컨슈머

from confluent_kafka import avro
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

value_schema_str = """
{"namespace": "student.avro",
 "type": "record",
 "doc": "This is an example of Avro.",
 "name": "Student",
 "fields": [
     {"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
     {"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
 ]
}
"""

value_schema = avro.loads(value_schema_str)

c = AvroConsumer({
    'bootstrap.servers': 'peter-kafka01.foo.bar,peter-kafka02.foo.bar,peter-kafka03.foo.bar',
    'group.id': 'python-groupid01',
    'auto.offset.reset': 'earliest',
    'schema.registry.url': 'http://peter-kafka03.foo.bar:8081'},reader_value_schema=value_schema)

c.subscribe(['peter-avro2'])

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

c.close()
# python python-avro-consumer.py

 

에이브로 컨슈머는 에이브로 프로듀서가 보낸 메시지를 잘 가져온것을 확인할 수 있습니다. 스키마 레지스트리의 API를 이용해 스키마가 스키마 레지스트리에 잘 기록되었는지 확인해보겠습니다. 

# curl http://peter-kafka03.foo.bar:8081/schemas | python -m json.tool

 

스키마 레지스트리 호환성

스키마 레지스트리는 버전별 스키마에 대한 관리를 효율적으로 해주며, 각 스키마에 대해 고유한 ID와 버전 정보를 관리합니다. 바로 앞서 확인한 것처럼 스키마 레지스트리에서는 하나의 서비ㅡ젝트에 대한 버전 정보별로 진화하는 각 스키마를 관리해줍니다. 또한 스키마가 진화함에 따라 호환성 레벨을 검사해야 하는데, 스키마 레지스트리에서는 대표적으로 BACKWARD, FORWARD, FULL 등의 호환성 레벨을 제공합니다. 

 

BACKWARD 호환성

스키마 레지스트리를 사용하다 보면 데이터 포맷의 변경이 필요하며, 데이터 포맷을 변경하면 스키마는 진화하게 됩니다. 이렇게 스키마가 진화함에 따라 스키마 레지스트리 내에는 버전별 스키마를 갖게 됩니다. 첫번째로 BACKWARD 호환성을 살펴보겠습니다. BACKWARD 호환성이란 진화된 스키마를 적용한 컨슈머가 진화 전의 스키마가 적용된 프로듀서가 보낸 메시지를 읽을 수 있도록 허용하는 호환성을 말합니다.

 

FORWARD 호환성

FORWARD 호환성이란 BACKWARD와는 대비되는 성질을 지니며, 진화된 스키마가 적용된 프로듀서가 보낸 메시지를 진화 전의 스키마가 적용된 컨슈머가 읽을 수 있게 하는 호환성을 말합니다. 

 

FULL 호환성

BACKWARD, FORWARD 모두를 지원합니다.

'PaaS > MQ' 카테고리의 다른 글

엔터프라이즈용 kafka의 환경구성  (0) 2022.07.08
kafka 커넥트  (0) 2022.07.08
kafka systemctl 등록  (0) 2022.07.08
kafka와 보안 - 권한(ACL)  (0) 2022.07.08
kafka와 보안 - 인증(SASL)  (0) 2022.07.08