일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- 재테크
- 자바스크립트
- 서평
- 책을알려주는남자
- 자바
- 채권
- Java
- C
- 알고리즘트레이닝
- 주식
- C++
- 책알남
- 프로그래머스 알고리즘 공부
- 독서
- algorithmStudy
- 알고리즘 공부
- algorithmtraining
- algorithmTest
- 성분
- 경제
- 돈
- 화장품
- 투자
- 다독
- JavaScript
- 독후감
- 알고리즘공부
- 백준알고리즘
- 프로그래밍언어
- 지혜를가진흑곰
- Today
- Total
탁월함은 어떻게 나오는가?
[Kafka] Nest.js 로 kafka 통신 테스트 해보기 본문
오늘은 Nest.js 로 kafka 통신 테스트를 진행해 보겠다. 진행해보기에 앞서 docker-compose로 yml 파일을 사용할 것이기 때문에 "[Kafka] Docker Compose - Single Broker 사용으로 통신해보기" 를 참고하면 좋다.
* Producer: 메시지 생성에 대한 책임
* Consumer: 메시지 소비에 대한 책임
* Connector: Producer/Cosumer API 및 link topics 2가지 모두를 "재사용" 가능
* Streams: 입력을 출력 결과로 전환하는 책임
* Admin: Kafka topics를 관리하는데 사용
Test Start!!
1) nest js CLI install
1
|
npm i -g @nestjs/cli
|
cs |
2) Producer API를 위한 nest.js 프로젝트 생성
- 개인적으로 npm을 선호하기 때문에 npm 사용 yarn을 사용해도 무관하다.
1
|
nest new producer
|
cs |
3) 생성된 프로젝트에 kafkajs 와 microservies 설치해준다.
1
|
npm i --save kafkajs
|
cs |
1
|
npm i --save @nestjs/microservices
|
cs |
Producer 프로젝트
4) app.module.ts 파일에서 모듈을 설정해준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'name_jung',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'client_id_jung',
brokers: ['localhost:29092'],
},
},
},
]),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
|
cs |
위의 brokers: ['localhost:29092']는 docker-compose.yml 에 설정한 포트이다. 실서비스에서 사용할때는 .env로 관리하는걸 추천한다.
5) Controller에서 @Get('kafka-test')를 추가해준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import { Controller, Get, Inject } from '@nestjs/common';
import { AppService } from './app.service';
import { ClientKafka } from '@nestjs/microservices';
@Controller()
export class AppController {
constructor(
private readonly appService: AppService,
@Inject('name_jung') private readonly client: ClientKafka,
) {}
@Get()
getHello(): string {
return this.appService.getHello();
}
@Get('kafka-test')
testKafka() {
return this.client.emit('medium.rock', {
foo: 'bar',
data: new Date().toString(),
str: 'kafka-test',
});
}
}
|
cs |
6) localhost:300/kafka-test 를 Get 쿼리를 날리면 Controller의 'medium.rock' 항목에 JSON 메시지를 내보내려고 한다.
1
|
curl --location --request GET 'http://localhost:3000/kafka-test'
|
cs |
curl을 사용해서 query를 날리면 응답이 오는것을 확인 할 수 있다.
Consumer 프로젝트
7) Producer 프로젝트를 만들었을 때와 마찬가지로 cli를 사용해준다.
1
|
nest new consumer
|
cs |
8) 생성된 프로젝트에 kafkajs 와 microservies 설치해준다.
1
|
npm i --save kafkajs
|
cs |
1
|
npm i --save @nestjs/microservices
|
cs |
9) app.module.ts 에 아래와 같이 코드를 입력해준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'name_jung',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'client_id_jung',
brokers: ['localhost:29092'],
},
consumer: {
groupId: 'unique_id_jung',
},
},
},
]),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
|
cs |
10) main.ts 에 아래와 같이 코드를 입력해준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import {
KafkaOptions,
MicroserviceOptions,
Transport,
} from '@nestjs/microservices';
async function bootstrap() {
const microserviceOptions: KafkaOptions = {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:29092'],
},
},
};
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
microserviceOptions,
);
await app.listen();
}
bootstrap();
|
cs |
11) app.controller.ts 에 아래와 같이 코드를 입력해준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import { Controller } from '@nestjs/common';
import {
Ctx,
KafkaContext,
MessagePattern,
Payload,
} from '@nestjs/microservices';
@Controller()
export class AppController {
constructor() {}
@MessagePattern('medium.rock')
readMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
const response =
'Receiving a new message from topic : ' +
JSON.stringify(originalMessage.value);
console.log(response);
return response;
}
}
|
cs |
12) Producer/Consumer 프로젝트를 npm run start:dev 를 해준다. 그리고 나서 curl 을 사용해서 query를 날려준다.
그러면 cmd > producer에게 날린 쿼리가 consumer에서 출력되는것을 확인할 수 있다.
13) Producer 의 app.controller를 다음과 같이 수정해준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
import { Controller, Get, Inject } from '@nestjs/common';
import { AppService } from './app.service';
import { ClientKafka } from '@nestjs/microservices';
@Controller()
export class AppController {
constructor(
private readonly appService: AppService,
@Inject('name_jung') private readonly client: ClientKafka,
) {}
async onModuleInit() {
['medium.rock'].forEach((key: string) =>
this.client.subscribeToResponseOf(`${key}`),
);
await this.client.connect();
}
async onModuleDestroy() {
await this.client.close();
}
@Get()
getHello(): string {
return this.appService.getHello();
}
@Get('kafka-test')
testKafka() {
return this.client.emit('medium.rock', {
foo: 'bar',
data: new Date().toString(),
str: 'kafka-test',
});
}
@Get('kafka-test-with-response')
testKafkaWithResponse() {
return this.client.send('medium.rock', {
foo: 'bar',
data: new Date().toString(),
str: 'kafka-test-with-response',
});
}
}
|
cs |
14) curl을 이용해서 새로 추가된 'kafka-test-with-response'를 사용해본다.
1
|
curl --location --request GET 'http://localhost:3000/kafka-test-with-response'
|
cs |
그러면 Conumer 프로젝트에서는 다음과 같이 출력된다.
reference
* Marcos Henrique da Silva - An Introduction to Kafka with TypeScript using NestJS
'[Snow-ball]server > devOps' 카테고리의 다른 글
[Rabbitmq] Nest.js로 rabbitmq 테스트 해보기 (0) | 2023.05.22 |
---|---|
[Kafka] Docker Compose - Single Broker 사용으로 통신해보기 (2) | 2023.05.17 |
[DevOps] 메시지 큐잉이란? (0) | 2023.05.16 |