visit
Synchronous: stranded sub-processes, the system involves isolating each microservice as much as feasible.
Asynchronous: unlocked sub-processes and protocols that are interoperable with a wide range of operating systems and cloud settings are employed.
Single – each request will be processed by a separate receiver or service.
Many – the structure of event-driven microservices is used.
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
`nest new event-producer-service`
At this stage, let’s add a package for Apache Kafka called ‘kafkajs’, and also ‘@nestjs/microservices’ to support microservices with Nest.
yarn add kafkajs
yarn add @nestjs/microservices
import { Transport, ClientProxyFactory } from "@nestjs/microservices";
import { Producer } from "kafkajs";
const brokerURLs = ['localhost:29092'];
export const KafkaProducerProvider = {
provide: "KafkaProducer",
useFactory: (): Promise<Producer> => {
const kafkaClient = ClientProxyFactory.create({
transport: Transport.KAFKA,
options: {
client: {
brokers: brokerURLs,
},
producer: {
allowAutoTopicCreation: true,
},
},
});
return kafkaClient.connect();
},
};
For now, we have hard-coded brokerUrls. But also, we used useFactory. This way in the future we will be able to inject a Config Service instance, for example, in order to provide currently hard-coded data.
import { Inject, Module, OnModuleDestroy } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { KafkaProducerProvider } from './providers/kafka-producer.provider';
import { Producer } from "kafkajs";
@Module({
imports: [],
controllers: [AppController],
providers: [
AppService,
KafkaProducerProvider,
],
})
export class AppModule implements OnModuleDestroy {
constructor(
@Inject("KafkaProducer")
private readonly kafkaProducer: Producer,
) {}
async onModuleDestroy(): Promise<void> {
await this.kafkaProducer.disconnect();
}
}
import { Inject, Injectable } from '@nestjs/common';
import { Producer } from "kafkajs";
@Injectable()
export class AppService {
constructor(
@Inject("KafkaProducer")
private readonly kafkaProducer: Producer,
) {
}
getHello(): string {
return 'Hello World!';
}
async sendMessage(topic, data, key?) {
return this.kafkaProducer.send({
topic,
messages: [
{
value: JSON.stringify(data),
key,
},
],
});
}
}
import { Body, Controller, Get, Post } from '@nestjs/common';
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@Get()
getHello(): string {
return this.appService.getHello();
}
@Post('message')
async sendMessage(@Body() body) {
const { topic, data, key } = body;
return this.appService.sendMessage(topic, data, key);
}
}
Let’s generate a new app. For that, just use the command nest new event-consumer-service
and install the same packages as for producer service.
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:29092'],
}
}
});
app.listen(() => { console.log('Microservices started.'); })
}
bootstrap();
import { Controller, Get } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@Get()
getHello(): string {
return this.appService.getHello();
}
@MessagePattern('test-topic')
respondToTestTopic(@Payload() message) {
console.log(message.value);
return 'Hello World';
}
}
Now let’s run our services. If everything is done well when we send POST requests to the producer app using the topic test-topic
, we’ll see the logs with our data in the consumer app.
And then the request will appear to the producer service.
Hence, if your requests are running, you are successfully set up the communication between microservices. Just like that.
Also published on