Learn how to use event-driven architecture to create microservices using Kafka, Docker and NestJS.
In this post, we will use event-driven architecture (EDA) to create two microservices using Kafka, Docker and NestJS: one that processes HTTP orders and sends Kafka events, and another that listens for and records notifications. You will learn how to set up Kafka with Docker and integrate it with NestJS, along with essential concepts such as event patterns, offset management, retries and best practices for developing resilient and decoupled systems.
EDA is a design approach that aims to maximize scalability and efficiency. Unlike traditional request-response architecture, services communicate by emitting and listening to events. This promotes loose coupling between services, making systems more flexible and scalable. Because of its benefits for scalability and efficiency, EDA is widely used in modern applications at companies like Netflix, Uber and Airbnb.
Some of these benefits are:
Apache Kafka, also known as Kafka, is a distributed event streaming platform. It allows systems to publish and subscribe to real-time streams of events in a highly scalable way. Kafka was developed by LinkedIn but is now being adopted by many major companies to power data pipelines and event-driven systems.
By serving as a central hub for events, Kafka helps decouple services and enable reliable communication between them. This makes it a robust foundation for building responsive, resilient and scalable microservices using event-driven architecture.
Kafka operates on three fundamental concepts: topics, brokers and consumer groups.
NestJS makes it easy to implement event-driven microservices by providing powerful abstractions and built-in support for message-based communication. With its microservices module and transport layer system, developers can quickly wire up Kafka and start emitting or listening to events. This means we don’t have to write low-level setup code and can focus on business logic instead. By combining modular architecture with features like dependency injection and decorators, NestJS delivers both structure and flexibility for building distributed systems.
Key features that make NestJS an excellent choice for event-driven systems are:
ClientKafka
and @MessagePattern()
.Create a docker-compose.yaml
file and add the following to it:
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
healthcheck:
test:
[
"CMD",
"kafka-topics",
"--bootstrap-server",
"localhost:9092",
"--list",
]
interval: 10s
timeout: 5s
retries: 5
The docker-compose.yaml
file defines two services:
Kafka depends on Zookeeper to maintain order in a distributed system. Zookeeper constantly tracks which Kafka brokers are active and healthy; without it, Kafka is unable to manage partitions, elect leaders or recover from failures.
Now, let’s take a look at how the instructions in the file translate into running containers. The file sets up several key configurations for both Zookeeper and Kafka:
images
: This tells Docker which specific software blueprint to use for each service. In this case, confluentinc/cp-zookeeper:latest
and confluentinc/cp-kafka:latest
. The :latest
flag means Docker will get the newest version available, although for production it’s advised to use a specific version.environments
: Here, we inject configuration settings directly into the running containers as environment variables:
ZOOKEEPER_CLIENT_PORT
: Defines the port Zookeeper is listening to.ZOOKEEPER_TICK_TIME
: Sets a basic time unit (in milliseconds) for Zookeeper’s internal operations, like how often it checks connections.KAFKA_BROKER_ID
: Gives this particular Kafka broker a unique ID of 1.KAFKA_ZOOKEEPER_CONNECT
: Tells Kafka how to find Zookeeper. Notice that it uses the service name zookeeper
. Docker Compose provides internal networking, allowing these services to find each other by name.KAFKA_ADVERTISED_LISTENERS
: Tells clients (and other brokers) the correct address to use for communicating with Kafka. We have two addresses defined here:
PLAINTEXT://kafka:29092
: For communication within the Docker network (using the name kafka and internal port 29092).PLAINTEXT_HOST://localhost:9092
: For communication from outside the Docker network (using localhost and the port 9092 that we mapped earlier).KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
: Maps the listener names (PLAINTEXT
, PLAINTEXT_HOST
) to the security protocol (PLAINTEXT
, meaning no encryption).KAFKA_INTER_BROKER_LISTENER_NAME
: This tells Kafka which of its communication addresses it should use if it needs to talk to other Kafka brokers that might be part of the same cluster. Since we only have one broker in this example, it’s set to use the internal PLAINTEXT
address.KAFKA_NUM_PARTITIONS
: This tells Kafka that, by default, any new message category (topic) should be split into three sections (partitions).KAFKA_DEFAULT_REPLICATION_FACTOR
: This sets the default number of data copies for new topics to 1.KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
: Kafka internally tracks how far applications have read messages. This means only one copy of that tracking information is maintained.KAFKA_AUTO_CREATE_TOPICS_ENABLE
: This allows Kafka to create new topics on the fly if an application tries to use one that doesn’t exist yet. It’s beneficial during development stages.KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
: When several consumers work together to read messages as a consumer group, Kafka organizes their work. This setting controls whether there is a delay before Kafka performs the first coordination (0 means no delay).KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
and KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
: These settings determine how Kafka manages transactions. They define how many copies of the transaction tracking data should be kept.ports
: Here we map ports between our host machine (outside Docker) and the containers (inside Docker). The format is HOST_PORT:CONTAINER_PORT
.
Zookeeper 2181:2181
makes it accessible on port 2181 of our local machine.Kafka 9092:9092
makes it accessible on port 9092 of our local machine.depends_on
: This tells Docker to start the zookeeper container before starting the kafka container. It just means Zookeeper starts first, not necessarily that Zookeeper is fully ready before Kafka starts.healthcheck
: This section defines a command that Docker runs periodically inside the Kafka container to verify it is working. It runs kafka-topics --bootstrap-server localhost:9092 --list
. If it is successful, then the container is marked as healthy.Once you’ve added the code to the docker-compose.yaml
file, it’s time to start our containers. In the same project directory, run the command below:
docker compose up --build -d
The --build
flag is used for the first time to build the images and can be removed on subsequent starts, as long as the Docker Compose file hasn’t been changed. The -d
flag starts the containers in detached mode, thereby freeing up our terminal.
For subsequent starts without changes, use:
docker compose up -d
Next, run this command to verify that both containers are running:
docker compose ps
First, create the two projects:
nest new order-service
nest new notification-service
Next, install the dependencies in each project:
npm install @nestjs/microservices kafkajs @nestjs/config
@nestjs/microservices
: This turns NestJS from an HTTP server into a message consumer/producer, which makes it understand Kafka messages. It also provides decorators like @MessagePattern()
for Kafka topics and handles connection lifecycle (connect/disconnect hooks).kafkajs
: This is the official Kafka client for Node.js. It manages broker connections and message serialization, and handles retries and errors. NestJS’s microservices module needs a Kafka driver underneath, just like Express needs HTTP.@nestjs/config
: Loads our .env
files automatically, validates environment variables and makes them injectable via the ConfigService.Next, add a .env
file in each project:
# order-service/.env
KAFKA_BROKERS=localhost:9092
# notification-service/.env
KAFKA_BROKERS=localhost:9092
KAFKA_CONSUMER_GROUP_ID=notification-consumer-group
KAFKA_BROKERS
tells our services where to find Kafka, while KAFKA_CONSUMER_GROUP_ID
sets the unique name for the notification service’s consumer group.
First, navigate to the order-service
project and create an order module with a controller file and service file using the command below:
nest generate module order
nest generate controller order
nest generate service order
Next, update the order.module.ts
file with the following:
// order-service/src/order/order.module.ts
import { Module, Logger } from "@nestjs/common";
import { ClientsModule, Transport } from "@nestjs/microservices";
import { ConfigService } from "@nestjs/config";
import { OrderService } from "./order.service";
import { OrderController } from "./order.controller";
@Module({
imports: [
ClientsModule.registerAsync([
// Use async registration for ConfigService
{
name: "KAFKA_PRODUCER_SERVICE", // Injection token for the Kafka client
useFactory: (configService: ConfigService) => ({
transport: Transport.KAFKA,
options: {
client: {
clientId: "order-service-producer", // Unique client ID
brokers: configService
.getOrThrow<string>("KAFKA_BROKERS")
.split(","), // Get brokers from .env(just one in this case)
},
producer: {
allowAutoTopicCreation: true, // Convenient for development
idempotent: true, // Prevents the same message from being saved multiple times
retry: {
retries: 5,
maxRetryTime: 30000,
},
},
},
}),
inject: [ConfigService], // Inject ConfigService into the factory
},
]),
],
providers: [OrderService, Logger], // Provide Logger
controllers: [OrderController],
})
export class OrderModule {}
The code above configures a NestJS producer client for Kafka. It uses ClientsModule.registerAsync
to inject broker addresses from the ConfigService
dynamically and sets the clientId
, along with auto-topic
creation options. The registered client (KAFKA_PRODUCER_SERVICE
) is exposed for dependency injection, while the module declares OrderService
to handle business logic and OrderController
for incoming HTTP requests.
Next, update the order.service.ts
file with the following:
// order-service/src/order/order.service.ts
import {
Injectable,
Inject,
OnModuleInit,
OnModuleDestroy,
Logger,
} from "@nestjs/common";
import { ClientKafka } from "@nestjs/microservices";
@Injectable()
export class OrderService implements OnModuleInit, OnModuleDestroy {
constructor(
@Inject("KAFKA_PRODUCER_SERVICE") private readonly kafkaClient: ClientKafka,
private readonly logger: Logger
) {}
async onModuleInit() {
// Connect the client when the module initializes
try {
await this.kafkaClient.connect();
this.logger.log(
"Kafka Producer Client connected successfully",
OrderService.name
);
} catch (error) {
this.logger.error(
"Failed to connect Kafka Producer Client",
error,
OrderService.name
);
}
}
async onModuleDestroy() {
// Disconnect the client when the application shuts down
await this.kafkaClient.close();
this.logger.log("Kafka Producer Client disconnected", OrderService.name);
}
async createOrder(orderData: any) {
const topic = "order_created";
const eventPayload = {
orderId: `ORD-${Date.now()}`, // Simple unique ID
...orderData,
timestamp: new Date().toISOString(),
};
try {
this.logger.log(
`Publishing event to topic [${topic}]`,
OrderService.name
);
this.kafkaClient.emit(topic, JSON.stringify(eventPayload));
return { success: true, publishedEvent: eventPayload };
} catch (error) {
this.logger.error(
`Failed to publish event to topic [${topic}]`,
error,
OrderService.name
);
return { success: false, error: error.message };
}
}
}
The code above manages the production of Kafka messages for order events. In the constructor, it injects the Kafka client instance registered with the token KAFKA_PRODUCER_SERVICE
and also injects NestJS’s Logger
service to log messages for debugging and monitoring.
The onModuleInit
method creates the Kafka connection when the service starts, while the onModuleDestroy
method sets resources to release when the application terminates.
The createOrder
method constructs an event payload with a unique ID and publishes it to Kafka using the kafkaClient.emit
method. Because Kafka only accepts string or Buffer payloads, we use the JSON.stringify
method on our payload before publishing it to Kafka.
Next, update the order.controller.ts
file with the following:
// order-service/src/order/order.controller.ts
import { Controller, Post, Body, Logger } from "@nestjs/common";
import { OrderService } from "./order.service";
@Controller("orders")
export class OrderController {
constructor(
private readonly orderService: OrderService,
private readonly logger: Logger
) {}
@Post()
async createOrder(@Body() createOrderDto: any) {
this.logger.log(
`Received create order request: ${JSON.stringify(createOrderDto)}`,
OrderController.name
);
const result = await this.orderService.createOrder(createOrderDto);
return result;
}
}
The code above handles HTTP POST requests to the /orders
endpoint, forwarding order data to Kafka via the OrderService
.
Next, update the app.module.ts
file with the following:
// order-service/src/app.module.ts
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { OrderModule } from "./order/order.module";
import { AppController } from "./app.controller";
import { AppService } from "./app.service";
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }), // Make config global
OrderModule,
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
Then update the main.ts
file with the following:
// order-service/src/main.ts
import { NestFactory } from "@nestjs/core";
import { AppModule } from "./app.module";
import { Logger } from "@nestjs/common";
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const port = 3001; // Use a distinct port
await app.listen(port);
Logger.log(
`Order Service (Producer) is running on: http://localhost:${port}`,
"Bootstrap"
);
}
bootstrap();
Now, run this command in your terminal to start the server:
npm run start:dev
Now, in another terminal window, navigate to the notification-service
directory and update the main.ts
file with the following:
import { NestFactory } from "@nestjs/core";
import { AppModule } from "./app.module";
import { MicroserviceOptions, Transport } from "@nestjs/microservices";
import { ConfigService } from "@nestjs/config";
import { Logger } from "@nestjs/common";
import { Partitioners } from "kafkajs";
async function bootstrap() {
const logger = new Logger("KafkaBootstrap");
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: {
brokers: process.env.KAFKA_BROKERS?.split(",") ?? ["localhost:9092"],
retry: {
initialRetryTime: 1000,
retries: 8,
maxRetryTime: 5000,
factor: 2,
},
},
consumer: {
groupId:
process.env.KAFKA_CONSUMER_GROUP_ID ??
"notification-consumer-group-server",
allowAutoTopicCreation: true,
sessionTimeout: 30000,
rebalanceTimeout: 60000,
heartbeatInterval: 5000,
},
producer: {
createPartitioner: Partitioners.LegacyPartitioner,
},
run: {
autoCommit: false,
},
},
}
);
// Add error handling for Kafka connection
try {
await app.listen();
logger.log("Notification Service successfully connected to Kafka");
} catch (error) {
logger.error("Failed to connect to Kafka:", error);
process.exit(1);
}
}
bootstrap();
The code above sets up a NestJS microservice as a Kafka consumer. The retry
setting uses an exponential backoff strategy—where the wait time doubles after each failure, starting at 1 second and maxing out at 5 seconds, with a total of 8 retries—to manage temporary network disruptions without crashing.
The groupId
helps balance message processing among service instances, while sessionTimeout (30s)
, rebalanceTimeout (60s)
and heartbeatInterval (5s)
enable a stable cluster. This setup quickly identifies failed consumers and provides enough time for reassigning partitions.
The createPartitioner: Partitioners.LegacyPartitioner
setting enables consistent message routing to partitions. In our configuration, we’ve set autoCommit:false
, opting for manual offset commits and preventing message loss if processing fails.
Next, update the app.controller.ts
file with the following:
// notification-service/src/app.controller.ts
import { Controller, Logger } from "@nestjs/common";
import {
MessagePattern,
Payload,
Ctx,
KafkaContext,
} from "@nestjs/microservices";
@Controller()
export class AppController {
constructor(private readonly logger: Logger) {}
@MessagePattern("order_created") // Listen to the 'order_created' topic
async handleOrderCreated(
@Payload() payload: any, // Inject the message payload
@Ctx() context: KafkaContext // Inject the Kafka message context
) {
const { offset } = context.getMessage();
const topic = context.getTopic();
const partition = context.getPartition();
this.logger.log(
`Received message from topic [${topic}] partition [${partition}] offset [${offset}]`,
AppController.name
);
this.logger.log(`Payload: ${JSON.stringify(payload)}`, AppController.name);
try {
// --- Your Business Logic Here ---
this.logger.log(
`Simulating notification dispatch for Order ID: ${payload.orderId}...`,
AppController.name
);
// In a real app: call email service, push notification service, etc.
this.logger.log(
`Notification for Order ID: ${payload.orderId} processed.`,
AppController.name
);
await context
.getConsumer()
.commitOffsets([
{ topic, partition, offset: (parseInt(offset) + 1).toString() },
]);
} catch (error) {
this.logger.error(
`Error processing notification for order ${payload?.orderId}: ${error.message}`,
error.stack,
AppController.name
);
}
}
}
The code above implements a Kafka message handler for the order_created
topic using the @MessagePattern
decorator. The handler receives two critical parameters through decorators:
@Payload()
: Injects the deserialized message content, which is typically a JavaScript object when using JSON.@Ctx()
: Supplies the Kafka message context, which includes metadata such as topic, partition and offset.Finally, it manually commits the offset after successful processing using commitOffsets()
, which is necessary when autoCommit
is disabled. Error handling captures and logs processing failures without crashing the consumer, thereby maintaining service availability.
Next, update the app.module.ts
file with the code below:
// notification-service/src/app.module.ts
import { Module, Logger } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { AppController } from "./app.controller";
import { AppService } from "./app.service";
@Module({
imports: [ConfigModule.forRoot({ isGlobal: true })],
controllers: [AppController],
providers: [AppService, Logger], // Provide Logger
})
export class AppModule {}
Now, you can start the server by running this command:
npm run start:dev
To test our project, send a test order with the curl request below:
curl -X POST http://localhost:3001/orders \
-H "Content-Type: application/json" \
-d '{"productId": "prod_123", "quantity": 2}'
Let’s check the logs in both services.
The order-service
logs should look like this:
[Nest] 11204 - 28/04/2025, 21:49:09 LOG [OrderController] Received create order request: {"productId":"prod_123","quantity":2}
[Nest] 11204 - 28/04/2025, 21:49:09 LOG [OrderService] Publishing event to topic [order_created]
The notification-service
logs should look like this:
[Nest] 20876 - 28/04/2025, 21:49:09 LOG [AppController] Received message from topic [order_created] partition [1] offset [0]
[Nest] 20876 - 28/04/2025, 21:49:09 LOG [AppController] Payload: {"orderId":"ORD-1745873349101","productId":"prod_123","quantity":2,"timestamp":"2025-04-28T20:49:09.101Z"}
[Nest] 20876 - 28/04/2025, 21:49:09 LOG [AppController] Simulating notification dispatch for Order ID: ORD-1745873349101...
[Nest] 20876 - 28/04/2025, 21:49:09 LOG [AppController] Notification for Order ID: ORD-1745873349101 processed.
In this post, we built two NestJS microservices that communicate via Kafka—an order service (our producer) and a notification service (our consumer).
Our setup enables resilience with retries and error handling, as well as scalability through consumer groups and partitions. With this foundation, we’ve covered the basics of event-driven architecture with Kafka while avoiding common pitfalls.
Chris Nwamba is a Senior Developer Advocate at AWS focusing on AWS Amplify. He is also a teacher with years of experience building products and communities.