PaaS/MQ

TypeScript, NestJS로 Confluent Kafka 연결 3-Consumer 개발

armyost 2025. 9. 1. 23:59
728x90

1.NestJS는 설치

npm i -g @nestjs/cli

 

2. Consumer 프로젝트를 생성

nest new consumer

 

 

3. 필요 패키지 설치

yarn add @nestjs/microservices
yarn add kafkajs

 

 

4. 개발

https://github.com/armyost/kafka-sample/tree/main/consumer

 

kafka-sample/consumer at main · armyost/kafka-sample

Contribute to armyost/kafka-sample development by creating an account on GitHub.

github.com

 

src/main.ts

import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          // clientId: 'ccloud-nodejs-client-9e5ae753-136c-4a34-bcb1-0fdfa8f0d579',
          brokers: ['pkc-921jm.us-east-2.aws.confluent.cloud:9092'], // Replace with your Confluent Kafka broker address(es)
          // Optional: Add SASL/SSL configuration for Confluent Cloud
          sasl: {
            mechanism: 'plain', // or 'scram-sha-256', 'scram-sha-512'
            username: 'XXXXXXXXXXX',
            password: 'XXXXXXXXXXXXXXXXXXXXXXXX',
          },
          ssl: true,
        },
        consumer: {
          groupId: 'armyost-consumer-group', // Unique consumer group ID
        },
      },
    },
  );
  await app.listen();
}
bootstrap();

 

 

src/kafka-consumer.controller.ts

import { Controller } from '@nestjs/common';
import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices';

@Controller()
export class KafkaConsumerController {
  @MessagePattern('topic_armyost') // Replace with your Kafka topic name
  async handleMyTopicMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
    const originalMessage = context.getMessage();
    console.log(`Received message`, message);
    // Process the message here
  }

  // You can add more @MessagePattern decorators for other topics
//   @MessagePattern('another-topic')
//   async handleAnotherTopicMessage(@Payload() data: any) {
//     console.log('Received message from another-topic:', data);
//   }
}

 

 

src/app.module.ts

import { Module } from '@nestjs/common';
import { KafkaConsumerController } from './kafka-consumer.controller';

@Module({
  imports: [],
  controllers: [KafkaConsumerController],
  providers: [],
})
export class AppModule {}

 

완료. 이 3개의 TS만 있으면 된다.

 

이제 Producer에 curl 메시지를 보내게 되면.

curl -d "{""key1"":""value1"", ""key2"":""value2""}" -H "Content-Type: application/json" -X POST curl -d "{""key1"":""value1"", ""key2"":""value2""}" -H "Content-Type: application/json" -X POST http://localhost:3000/messages/send

 

 

아래와 같이 Topic에 Message가 지나가는 것을 볼 수 있다.

 

그리고 로그상 Consuming이 잘 되고 있는것을 알 수 있다.