250x250
Notice
Recent Posts
Recent Comments
관리 메뉴

탁월함은 어떻게 나오는가?

[Kafka] Nest.js 로 kafka 통신 테스트 해보기 본문

[Snow-ball]server/devOps

[Kafka] Nest.js 로 kafka 통신 테스트 해보기

Snow-ball 2023. 5. 18. 14:47
반응형

오늘은 Nest.js 로 kafka 통신 테스트를 진행해 보겠다. 진행해보기에 앞서 docker-compose로 yml 파일을 사용할 것이기 때문에 "[Kafka] Docker Compose - Single Broker 사용으로 통신해보기" 를 참고하면 좋다.

 

 

* Producer: 메시지 생성에 대한 책임

* Consumer: 메시지 소비에 대한 책임

* Connector: Producer/Cosumer API 및 link topics 2가지 모두를 "재사용" 가능 

* Streams: 입력을 출력 결과로 전환하는 책임

* Admin: Kafka topics를 관리하는데 사용

 

 

 


 

 

 

Test Start!!

 

1) nest js CLI install

1
npm i -g @nestjs/cli
cs

 

 

2) Producer API를 위한 nest.js 프로젝트 생성 

- 개인적으로 npm을 선호하기 때문에 npm 사용 yarn을 사용해도 무관하다.

1
nest new producer
cs

 

 

3) 생성된 프로젝트에 kafkajs 와 microservies 설치해준다.

1
npm i --save kafkajs
cs
1
npm i --save @nestjs/microservices
cs

 

 

 

Producer 프로젝트

4) app.module.ts 파일에서 모듈을 설정해준다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
 
@Module({
  imports: [
    ClientsModule.register([
      {
        name'name_jung',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'client_id_jung',
            brokers: ['localhost:29092'],
          },
        },
      },
    ]),
  ],
 
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
 
cs

 

위의 brokers: ['localhost:29092']는 docker-compose.yml 에 설정한 포트이다. 실서비스에서 사용할때는 .env로 관리하는걸 추천한다.

 

 

5) Controller에서 @Get('kafka-test')를 추가해준다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import { Controller, Get, Inject } from '@nestjs/common';
import { AppService } from './app.service';
import { ClientKafka } from '@nestjs/microservices';
 
@Controller()
export class AppController {
  constructor(
    private readonly appService: AppService,
    @Inject('name_jung'private readonly client: ClientKafka,
  ) {}
 
  @Get()
  getHello(): string {
    return this.appService.getHello();
  }
 
  @Get('kafka-test')
  testKafka() {
    return this.client.emit('medium.rock', {
      foo: 'bar',
      data: new Date().toString(),
      str: 'kafka-test',
    });
  }
 
}
cs

 

 

6) localhost:300/kafka-test 를 Get 쿼리를 날리면 Controller의 'medium.rock' 항목에 JSON 메시지를 내보내려고 한다. 

1
curl --location --request GET 'http://localhost:3000/kafka-test'
cs

 

curl을 사용해서 query를 날리면 응답이 오는것을 확인 할 수 있다.

 

 

 

Consumer 프로젝트

7) Producer 프로젝트를 만들었을 때와 마찬가지로 cli를 사용해준다.

1
nest new consumer
cs

 

 

8) 생성된 프로젝트에 kafkajs 와 microservies 설치해준다.

1
npm i --save kafkajs
cs
1
npm i --save @nestjs/microservices
cs

 

 

9) app.module.ts 에 아래와 같이 코드를 입력해준다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
 
@Module({
  imports: [
    ClientsModule.register([
      {
        name'name_jung',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'client_id_jung',
            brokers: ['localhost:29092'],
          },
          consumer: {
            groupId: 'unique_id_jung',
          },
        },
      },
    ]),
  ],
 
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
 
cs

 

 

10) main.ts 에 아래와 같이 코드를 입력해준다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import {
  KafkaOptions,
  MicroserviceOptions,
  Transport,
from '@nestjs/microservices';
 
async function bootstrap() {
  const microserviceOptions: KafkaOptions = {
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: ['localhost:29092'],
      },
    },
  };
 
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    microserviceOptions,
  );
  await app.listen();
}
bootstrap();
cs

 

 

11) app.controller.ts 에 아래와 같이 코드를 입력해준다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { Controller } from '@nestjs/common';
import {
  Ctx,
  KafkaContext,
  MessagePattern,
  Payload,
from '@nestjs/microservices';
 
@Controller()
export class AppController {
  constructor() {}
 
  @MessagePattern('medium.rock')
  readMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
    const originalMessage = context.getMessage();
    const response =
      'Receiving a new message from topic : ' +
      JSON.stringify(originalMessage.value);
    console.log(response);
 
    return response;
  }
}
cs

 

 

12) Producer/Consumer 프로젝트를 npm run start:dev 를 해준다. 그리고 나서 curl 을 사용해서 query를 날려준다.

그러면 cmd > producer에게 날린 쿼리가 consumer에서 출력되는것을 확인할  수 있다.

 

 

13) Producer 의 app.controller를 다음과 같이 수정해준다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import { Controller, Get, Inject } from '@nestjs/common';
import { AppService } from './app.service';
import { ClientKafka } from '@nestjs/microservices';
 
@Controller()
export class AppController {
  constructor(
    private readonly appService: AppService,
    @Inject('name_jung'private readonly client: ClientKafka,
  ) {}
 
  async onModuleInit() {
    ['medium.rock'].forEach((key: string) =>
      this.client.subscribeToResponseOf(`${key}`),
    );
 
    await this.client.connect();
  }
 
  async onModuleDestroy() {
    await this.client.close();
  }
 
  @Get()
  getHello(): string {
    return this.appService.getHello();
  }
 
  @Get('kafka-test')
  testKafka() {
    return this.client.emit('medium.rock', {
      foo: 'bar',
      data: new Date().toString(),
      str: 'kafka-test',
    });
  }
 
  @Get('kafka-test-with-response')
  testKafkaWithResponse() {
    return this.client.send('medium.rock', {
      foo: 'bar',
      data: new Date().toString(),
      str: 'kafka-test-with-response',
    });
  }
}
cs

 

 

14)  curl을 이용해서 새로 추가된 'kafka-test-with-response'를 사용해본다.

1
curl --location --request GET 'http://localhost:3000/kafka-test-with-response'
cs

 

그러면 Conumer 프로젝트에서는 다음과 같이 출력된다.

 

 

 

 

 

 

 

 

reference

* Marcos Henrique da Silva - An Introduction to Kafka with TypeScript using NestJS

 

 

 

반응형
Comments