728x90
    
    
  1.NestJS는 설치
npm i -g @nestjs/cli
2. Consumer 프로젝트를 생성
nest new consumer
3. 필요 패키지 설치
yarn add @nestjs/microservices
yarn add kafkajs
4. 개발
https://github.com/armyost/kafka-sample/tree/main/producer
kafka-sample/producer at main · armyost/kafka-sample
Contribute to armyost/kafka-sample development by creating an account on GitHub.
github.com
src/kafka-producer.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ClientKafka, Transport } from '@nestjs/microservices';
@Injectable()
export class KafkaProducerService implements OnModuleInit, OnModuleDestroy {
  private readonly kafkaClient: ClientKafka;
  constructor() {
    this.kafkaClient = new ClientKafka({
      client: {
        // clientId: 'ccloud-nodejs-client-9e5ae753-136c-4a34-bcb1-0fdfa8f0d579',
        brokers: ['pkc-921jm.us-east-2.aws.confluent.cloud:9092'], // Replace with your Confluent Kafka broker address(es)
        // Optional: Add SASL/SSL configuration for Confluent Cloud
        sasl: {
          mechanism: 'plain', // or 'scram-sha-256', 'scram-sha-512'
          username: 'XXXXXXXXXXXX',
          password: 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
        },
        ssl: true,
      },
      producer: {
        allowAutoTopicCreation: false, // Set to false if topics are managed externally
      },
    });
  }
  async onModuleInit() {
    await this.kafkaClient.connect();
  }
  async onModuleDestroy() {
    await this.kafkaClient.close();
  }
  async sendMessage(topic: string, message: any): Promise<void> {
    // KafkaJS expects Buffer or string payloads. Stringify objects.
    const payload = JSON.stringify(message); 
    await this.kafkaClient.emit(topic, payload).toPromise(); 
  }
}
src/app.module.ts
import { Module } from '@nestjs/common';
import { KafkaProducerService } from './kafka-producer.service';
import { AppController } from './app.controller'; // Example controller
@Module({
  imports: [],
  controllers: [AppController], // Example controller
  providers: [KafkaProducerService],
})
export class AppModule {}
src/app.controller.ts
import { Controller, Post, Body } from '@nestjs/common';
import { KafkaProducerService } from './kafka-producer.service';
@Controller('messages')
export class AppController {
  constructor(private readonly kafkaProducerService: KafkaProducerService) {}
  @Post('send')
  async sendMessageToKafka(@Body() message: any) {
    const topic = 'topic_armyost'; // Your target Kafka topic
    await this.kafkaProducerService.sendMessage(topic, message);
    return { status: 'Message sent to Kafka' };
  }
}
완료. 이 3개 TS만 있으면 된다.
Consumer Group이 등록되면 아래와 같이 Clients에 표시된다.

'PaaS > MQ' 카테고리의 다른 글
| TypeScript, NestJS로 Confluent Kafka 연결 3-Consumer 개발 (0) | 2025.09.01 | 
|---|---|
| TypeScript, NestJS로 Confluent Kafka 연결 1- Confluent 세팅 (0) | 2025.09.01 | 
| kafka 내, 외부 IP모두에서 접속할수 있게 세팅 (0) | 2023.03.08 | 
| Kafka Oauth 인증 추가하기 (0) | 2023.02.22 | 
| kafka 서버에 SSL 혹은 SASL 을 적용하면 성능 저하가 있을까? (0) | 2022.07.14 |