728x90
1.NestJS는 설치
npm i -g @nestjs/cli
2. Consumer 프로젝트를 생성
nest new consumer
3. 필요 패키지 설치
yarn add @nestjs/microservices
yarn add kafkajs
4. 개발
https://github.com/armyost/kafka-sample/tree/main/consumer
kafka-sample/consumer at main · armyost/kafka-sample
Contribute to armyost/kafka-sample development by creating an account on GitHub.
github.com
src/main.ts
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: {
// clientId: 'ccloud-nodejs-client-9e5ae753-136c-4a34-bcb1-0fdfa8f0d579',
brokers: ['pkc-921jm.us-east-2.aws.confluent.cloud:9092'], // Replace with your Confluent Kafka broker address(es)
// Optional: Add SASL/SSL configuration for Confluent Cloud
sasl: {
mechanism: 'plain', // or 'scram-sha-256', 'scram-sha-512'
username: 'XXXXXXXXXXX',
password: 'XXXXXXXXXXXXXXXXXXXXXXXX',
},
ssl: true,
},
consumer: {
groupId: 'armyost-consumer-group', // Unique consumer group ID
},
},
},
);
await app.listen();
}
bootstrap();
src/kafka-consumer.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices';
@Controller()
export class KafkaConsumerController {
@MessagePattern('topic_armyost') // Replace with your Kafka topic name
async handleMyTopicMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
console.log(`Received message`, message);
// Process the message here
}
// You can add more @MessagePattern decorators for other topics
// @MessagePattern('another-topic')
// async handleAnotherTopicMessage(@Payload() data: any) {
// console.log('Received message from another-topic:', data);
// }
}
src/app.module.ts
import { Module } from '@nestjs/common';
import { KafkaConsumerController } from './kafka-consumer.controller';
@Module({
imports: [],
controllers: [KafkaConsumerController],
providers: [],
})
export class AppModule {}
완료. 이 3개의 TS만 있으면 된다.
이제 Producer에 curl 메시지를 보내게 되면.
curl -d "{""key1"":""value1"", ""key2"":""value2""}" -H "Content-Type: application/json" -X POST curl -d "{""key1"":""value1"", ""key2"":""value2""}" -H "Content-Type: application/json" -X POST http://localhost:3000/messages/send
아래와 같이 Topic에 Message가 지나가는 것을 볼 수 있다.

그리고 로그상 Consuming이 잘 되고 있는것을 알 수 있다.

'PaaS > MQ' 카테고리의 다른 글
| TypeScript, NestJS로 Confluent Kafka 연결 2-Producer 개발 (0) | 2025.09.01 |
|---|---|
| TypeScript, NestJS로 Confluent Kafka 연결 1- Confluent 세팅 (0) | 2025.09.01 |
| kafka 내, 외부 IP모두에서 접속할수 있게 세팅 (0) | 2023.03.08 |
| Kafka Oauth 인증 추가하기 (0) | 2023.02.22 |
| kafka 서버에 SSL 혹은 SASL 을 적용하면 성능 저하가 있을까? (0) | 2022.07.14 |