PaaS/MQ

TypeScript, NestJS로 Confluent Kafka 연결 2-Producer 개발

armyost 2025. 9. 1. 23:51
728x90

1.NestJS는 설치

npm i -g @nestjs/cli

 

2. Consumer 프로젝트를 생성

nest new consumer

 

 

3. 필요 패키지 설치

yarn add @nestjs/microservices
yarn add kafkajs

 

 

4. 개발

https://github.com/armyost/kafka-sample/tree/main/producer

 

kafka-sample/producer at main · armyost/kafka-sample

Contribute to armyost/kafka-sample development by creating an account on GitHub.

github.com

 

src/kafka-producer.service.ts

import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ClientKafka, Transport } from '@nestjs/microservices';

@Injectable()
export class KafkaProducerService implements OnModuleInit, OnModuleDestroy {
  private readonly kafkaClient: ClientKafka;

  constructor() {
    this.kafkaClient = new ClientKafka({
      client: {
        // clientId: 'ccloud-nodejs-client-9e5ae753-136c-4a34-bcb1-0fdfa8f0d579',
        brokers: ['pkc-921jm.us-east-2.aws.confluent.cloud:9092'], // Replace with your Confluent Kafka broker address(es)
        // Optional: Add SASL/SSL configuration for Confluent Cloud
        sasl: {
          mechanism: 'plain', // or 'scram-sha-256', 'scram-sha-512'
          username: 'XXXXXXXXXXXX',
          password: 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
        },
        ssl: true,
      },
      producer: {
        allowAutoTopicCreation: false, // Set to false if topics are managed externally
      },
    });
  }

  async onModuleInit() {
    await this.kafkaClient.connect();
  }

  async onModuleDestroy() {
    await this.kafkaClient.close();
  }

  async sendMessage(topic: string, message: any): Promise<void> {
    // KafkaJS expects Buffer or string payloads. Stringify objects.
    const payload = JSON.stringify(message); 
    await this.kafkaClient.emit(topic, payload).toPromise(); 
  }
}

 

 

src/app.module.ts

import { Module } from '@nestjs/common';
import { KafkaProducerService } from './kafka-producer.service';
import { AppController } from './app.controller'; // Example controller

@Module({
  imports: [],
  controllers: [AppController], // Example controller
  providers: [KafkaProducerService],
})
export class AppModule {}

 

 

src/app.controller.ts

import { Controller, Post, Body } from '@nestjs/common';
import { KafkaProducerService } from './kafka-producer.service';

@Controller('messages')
export class AppController {
  constructor(private readonly kafkaProducerService: KafkaProducerService) {}

  @Post('send')
  async sendMessageToKafka(@Body() message: any) {
    const topic = 'topic_armyost'; // Your target Kafka topic
    await this.kafkaProducerService.sendMessage(topic, message);
    return { status: 'Message sent to Kafka' };
  }
}

 

 

완료. 이 3개 TS만 있으면 된다.

 

Consumer Group이 등록되면 아래와 같이 Clients에 표시된다.