Read More on Telerik Blogs
August 08, 2025 Web
Get A Free Trial

Learn how to use event-driven architecture to create microservices using Kafka, Docker and NestJS.

In this post, we will use event-driven architecture (EDA) to create two microservices using Kafka, Docker and NestJS: one that processes HTTP orders and sends Kafka events, and another that listens for and records notifications. You will learn how to set up Kafka with Docker and integrate it with NestJS, along with essential concepts such as event patterns, offset management, retries and best practices for developing resilient and decoupled systems.

What Is Event-Driven Architecture?

EDA is a design approach that aims to maximize scalability and efficiency. Unlike traditional request-response architecture, services communicate by emitting and listening to events. This promotes loose coupling between services, making systems more flexible and scalable. Because of its benefits for scalability and efficiency, EDA is widely used in modern applications at companies like Netflix, Uber and Airbnb.

Some of these benefits are:

  • Decoupling: With EDA, services don’t need to know about each other’s existence. This means that each service can evolve independently without being tightly integrated.
  • Resilience: Because services communicate asynchronously, a temporary failure in one service does not crash the entire system.
  • Scalability: Each service can independently scale and handle increased loads without affecting others.
  • Better responsiveness: Offloading tasks to background consumers keeps user-facing services fast and efficient.

What Is Kafka?

Apache Kafka, also known as Kafka, is a distributed event streaming platform. It allows systems to publish and subscribe to real-time streams of events in a highly scalable way. Kafka was developed by LinkedIn but is now being adopted by many major companies to power data pipelines and event-driven systems.

By serving as a central hub for events, Kafka helps decouple services and enable reliable communication between them. This makes it a robust foundation for building responsive, resilient and scalable microservices using event-driven architecture.

Important Kafka Concepts

Kafka operates on three fundamental concepts: topics, brokers and consumer groups.

  • Topics are streams of categorized messages that are partitioned to enable parallel processing and replicated to improve data resilience. This structure not only allows for rapid read and write operations but also helps protect against data loss through replication.
  • Brokers serve as the individual servers responsible for storing topics. They work together within a cluster to distribute workloads and recover from failures with ease. Thanks to brokers, Kafka can maintain fault tolerance even at scale, processing millions of messages each second.
  • Consumer groups enable collaborative processing of topics, where various services can share the load by handling different subsets of partitions. This approach optimizes throughput by balancing workloads and monitoring progress, transforming Kafka into a highly efficient system for real-time data pipelines.

How Does NestJS Help Build Event-Driven Microservices?

NestJS makes it easy to implement event-driven microservices by providing powerful abstractions and built-in support for message-based communication. With its microservices module and transport layer system, developers can quickly wire up Kafka and start emitting or listening to events. This means we don’t have to write low-level setup code and can focus on business logic instead. By combining modular architecture with features like dependency injection and decorators, NestJS delivers both structure and flexibility for building distributed systems.

Key features that make NestJS an excellent choice for event-driven systems are:

  • Microservices module: NestJS offers a dedicated module for registering and configuring microservices with ease.
  • Kafka transport layer: Kafka integration is built in, with support for emitting and handling events using ClientKafka and @MessagePattern().
  • Dependency injection (DI): NestJS’s DI system makes it simple to share Kafka clients and services across your app.
  • Less boilerplate: With decorators, lifecycle hooks and automatic message serialization, you write less code and stay productive.

Spin Up Kafka Locally

Create a docker-compose.yaml file and add the following to it:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
    healthcheck:
      test:
        [
          "CMD",
          "kafka-topics",
          "--bootstrap-server",
          "localhost:9092",
          "--list",
        ]
      interval: 10s
      timeout: 5s
      retries: 5

The docker-compose.yaml file defines two services:

  • Zookeeper (Kafka’s dependency for coordination)
  • Kafka (the actual message broker)

Kafka depends on Zookeeper to maintain order in a distributed system. Zookeeper constantly tracks which Kafka brokers are active and healthy; without it, Kafka is unable to manage partitions, elect leaders or recover from failures.

Now, let’s take a look at how the instructions in the file translate into running containers. The file sets up several key configurations for both Zookeeper and Kafka:

  • images: This tells Docker which specific software blueprint to use for each service. In this case, confluentinc/cp-zookeeper:latest and confluentinc/cp-kafka:latest. The :latest flag means Docker will get the newest version available, although for production it’s advised to use a specific version.
  • environments: Here, we inject configuration settings directly into the running containers as environment variables:
    • ZOOKEEPER_CLIENT_PORT: Defines the port Zookeeper is listening to.
    • ZOOKEEPER_TICK_TIME: Sets a basic time unit (in milliseconds) for Zookeeper’s internal operations, like how often it checks connections.
    • KAFKA_BROKER_ID: Gives this particular Kafka broker a unique ID of 1.
    • KAFKA_ZOOKEEPER_CONNECT: Tells Kafka how to find Zookeeper. Notice that it uses the service name zookeeper. Docker Compose provides internal networking, allowing these services to find each other by name.
    • KAFKA_ADVERTISED_LISTENERS: Tells clients (and other brokers) the correct address to use for communicating with Kafka. We have two addresses defined here:
      • PLAINTEXT://kafka:29092: For communication within the Docker network (using the name kafka and internal port 29092).
      • PLAINTEXT_HOST://localhost:9092: For communication from outside the Docker network (using localhost and the port 9092 that we mapped earlier).
    • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: Maps the listener names (PLAINTEXT, PLAINTEXT_HOST) to the security protocol (PLAINTEXT, meaning no encryption).
    • KAFKA_INTER_BROKER_LISTENER_NAME: This tells Kafka which of its communication addresses it should use if it needs to talk to other Kafka brokers that might be part of the same cluster. Since we only have one broker in this example, it’s set to use the internal PLAINTEXT address.
    • KAFKA_NUM_PARTITIONS: This tells Kafka that, by default, any new message category (topic) should be split into three sections (partitions).
    • KAFKA_DEFAULT_REPLICATION_FACTOR: This sets the default number of data copies for new topics to 1.
    • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: Kafka internally tracks how far applications have read messages. This means only one copy of that tracking information is maintained.
    • KAFKA_AUTO_CREATE_TOPICS_ENABLE: This allows Kafka to create new topics on the fly if an application tries to use one that doesn’t exist yet. It’s beneficial during development stages.
    • KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: When several consumers work together to read messages as a consumer group, Kafka organizes their work. This setting controls whether there is a delay before Kafka performs the first coordination (0 means no delay).
    • KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR and KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: These settings determine how Kafka manages transactions. They define how many copies of the transaction tracking data should be kept.
  • ports: Here we map ports between our host machine (outside Docker) and the containers (inside Docker). The format is HOST_PORT:CONTAINER_PORT.
    • For Zookeeper 2181:2181 makes it accessible on port 2181 of our local machine.
    • For Kafka 9092:9092 makes it accessible on port 9092 of our local machine.
  • depends_on: This tells Docker to start the zookeeper container before starting the kafka container. It just means Zookeeper starts first, not necessarily that Zookeeper is fully ready before Kafka starts.
  • healthcheck: This section defines a command that Docker runs periodically inside the Kafka container to verify it is working. It runs kafka-topics --bootstrap-server localhost:9092 --list. If it is successful, then the container is marked as healthy.

Once you’ve added the code to the docker-compose.yaml file, it’s time to start our containers. In the same project directory, run the command below:

docker compose up --build -d

The --build flag is used for the first time to build the images and can be removed on subsequent starts, as long as the Docker Compose file hasn’t been changed. The -d flag starts the containers in detached mode, thereby freeing up our terminal.

For subsequent starts without changes, use:

docker compose up -d

Next, run this command to verify that both containers are running:

docker compose ps

Set Up NestJS Projects

First, create the two projects:

nest new order-service
nest new notification-service

Next, install the dependencies in each project:

npm install @nestjs/microservices kafkajs @nestjs/config
  • @nestjs/microservices: This turns NestJS from an HTTP server into a message consumer/producer, which makes it understand Kafka messages. It also provides decorators like @MessagePattern() for Kafka topics and handles connection lifecycle (connect/disconnect hooks).
  • kafkajs: This is the official Kafka client for Node.js. It manages broker connections and message serialization, and handles retries and errors. NestJS’s microservices module needs a Kafka driver underneath, just like Express needs HTTP.
  • @nestjs/config: Loads our .env files automatically, validates environment variables and makes them injectable via the ConfigService.

Next, add a .env file in each project:

# order-service/.env
KAFKA_BROKERS=localhost:9092
# notification-service/.env
KAFKA_BROKERS=localhost:9092
KAFKA_CONSUMER_GROUP_ID=notification-consumer-group

KAFKA_BROKERS tells our services where to find Kafka, while KAFKA_CONSUMER_GROUP_ID sets the unique name for the notification service’s consumer group.

Producer Service (order-service)

First, navigate to the order-service project and create an order module with a controller file and service file using the command below:

nest generate module order
nest generate controller order
nest generate service order

Next, update the order.module.ts file with the following:

// order-service/src/order/order.module.ts
import { Module, Logger } from "@nestjs/common";
import { ClientsModule, Transport } from "@nestjs/microservices";
import { ConfigService } from "@nestjs/config";
import { OrderService } from "./order.service";
import { OrderController } from "./order.controller";

@Module({
  imports: [
    ClientsModule.registerAsync([
      // Use async registration for ConfigService
      {
        name: "KAFKA_PRODUCER_SERVICE", // Injection token for the Kafka client
        useFactory: (configService: ConfigService) => ({
          transport: Transport.KAFKA,
          options: {
            client: {
              clientId: "order-service-producer", // Unique client ID
              brokers: configService
                .getOrThrow<string>("KAFKA_BROKERS")
                .split(","), // Get brokers from .env(just one in this case)
            },
            producer: {
              allowAutoTopicCreation: true, // Convenient for development
              idempotent: true, // Prevents the same message from being saved multiple times
              retry: {
                retries: 5,
                maxRetryTime: 30000,
              },
            },
          },
        }),
        inject: [ConfigService], // Inject ConfigService into the factory
      },
    ]),
  ],
  providers: [OrderService, Logger], // Provide Logger
  controllers: [OrderController],
})
export class OrderModule {}

The code above configures a NestJS producer client for Kafka. It uses ClientsModule.registerAsync to inject broker addresses from the ConfigService dynamically and sets the clientId, along with auto-topic creation options. The registered client (KAFKA_PRODUCER_SERVICE) is exposed for dependency injection, while the module declares OrderService to handle business logic and OrderController for incoming HTTP requests.

Next, update the order.service.ts file with the following:

// order-service/src/order/order.service.ts
import {
  Injectable,
  Inject,
  OnModuleInit,
  OnModuleDestroy,
  Logger,
} from "@nestjs/common";
import { ClientKafka } from "@nestjs/microservices";

@Injectable()
export class OrderService implements OnModuleInit, OnModuleDestroy {
  constructor(
    @Inject("KAFKA_PRODUCER_SERVICE") private readonly kafkaClient: ClientKafka,
    private readonly logger: Logger
  ) {}

  async onModuleInit() {
    // Connect the client when the module initializes
    try {
      await this.kafkaClient.connect();
      this.logger.log(
        "Kafka Producer Client connected successfully",
        OrderService.name
      );
    } catch (error) {
      this.logger.error(
        "Failed to connect Kafka Producer Client",
        error,
        OrderService.name
      );
    }
  }

  async onModuleDestroy() {
    // Disconnect the client when the application shuts down
    await this.kafkaClient.close();
    this.logger.log("Kafka Producer Client disconnected", OrderService.name);
  }

  async createOrder(orderData: any) {
    const topic = "order_created";
    const eventPayload = {
      orderId: `ORD-${Date.now()}`, // Simple unique ID
      ...orderData,
      timestamp: new Date().toISOString(),
    };

    try {
      this.logger.log(
        `Publishing event to topic [${topic}]`,
        OrderService.name
      );
      this.kafkaClient.emit(topic, JSON.stringify(eventPayload));
      return { success: true, publishedEvent: eventPayload };
    } catch (error) {
      this.logger.error(
        `Failed to publish event to topic [${topic}]`,
        error,
        OrderService.name
      );
      return { success: false, error: error.message };
    }
  }
}

The code above manages the production of Kafka messages for order events. In the constructor, it injects the Kafka client instance registered with the token KAFKA_PRODUCER_SERVICE and also injects NestJS’s Logger service to log messages for debugging and monitoring.

The onModuleInit method creates the Kafka connection when the service starts, while the onModuleDestroy method sets resources to release when the application terminates.

The createOrder method constructs an event payload with a unique ID and publishes it to Kafka using the kafkaClient.emit method. Because Kafka only accepts string or Buffer payloads, we use the JSON.stringify method on our payload before publishing it to Kafka.

Next, update the order.controller.ts file with the following:

// order-service/src/order/order.controller.ts
import { Controller, Post, Body, Logger } from "@nestjs/common";
import { OrderService } from "./order.service";

@Controller("orders")
export class OrderController {
  constructor(
    private readonly orderService: OrderService,
    private readonly logger: Logger
  ) {}

  @Post()
  async createOrder(@Body() createOrderDto: any) {
    this.logger.log(
      `Received create order request: ${JSON.stringify(createOrderDto)}`,
      OrderController.name
    );
    const result = await this.orderService.createOrder(createOrderDto);
    return result;
  }
}

The code above handles HTTP POST requests to the /orders endpoint, forwarding order data to Kafka via the OrderService.

Next, update the app.module.ts file with the following:

// order-service/src/app.module.ts
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { OrderModule } from "./order/order.module";
import { AppController } from "./app.controller";
import { AppService } from "./app.service";

@Module({
  imports: [
    ConfigModule.forRoot({ isGlobal: true }), // Make config global
    OrderModule,
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Then update the main.ts file with the following:

// order-service/src/main.ts
import { NestFactory } from "@nestjs/core";
import { AppModule } from "./app.module";
import { Logger } from "@nestjs/common";

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const port = 3001; // Use a distinct port
  await app.listen(port);
  Logger.log(
    `Order Service (Producer) is running on: http://localhost:${port}`,
    "Bootstrap"
  );
}
bootstrap();

Now, run this command in your terminal to start the server:

npm run start:dev

Consumer Service (notification-service)

Now, in another terminal window, navigate to the notification-service directory and update the main.ts file with the following:

import { NestFactory } from "@nestjs/core";
import { AppModule } from "./app.module";
import { MicroserviceOptions, Transport } from "@nestjs/microservices";
import { ConfigService } from "@nestjs/config";
import { Logger } from "@nestjs/common";
import { Partitioners } from "kafkajs";

async function bootstrap() {
  const logger = new Logger("KafkaBootstrap");

  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: process.env.KAFKA_BROKERS?.split(",") ?? ["localhost:9092"],
          retry: {
            initialRetryTime: 1000,
            retries: 8,
            maxRetryTime: 5000,
            factor: 2,
          },
        },
        consumer: {
          groupId:
            process.env.KAFKA_CONSUMER_GROUP_ID ??
            "notification-consumer-group-server",
          allowAutoTopicCreation: true,
          sessionTimeout: 30000,
          rebalanceTimeout: 60000,
          heartbeatInterval: 5000,
        },
        producer: {
          createPartitioner: Partitioners.LegacyPartitioner,
        },
        run: {
          autoCommit: false,
        },
      },
    }
  );

  // Add error handling for Kafka connection
  try {
    await app.listen();
    logger.log("Notification Service successfully connected to Kafka");
  } catch (error) {
    logger.error("Failed to connect to Kafka:", error);
    process.exit(1);
  }
}

bootstrap();

The code above sets up a NestJS microservice as a Kafka consumer. The retry setting uses an exponential backoff strategy—where the wait time doubles after each failure, starting at 1 second and maxing out at 5 seconds, with a total of 8 retries—to manage temporary network disruptions without crashing.

The groupId helps balance message processing among service instances, while sessionTimeout (30s), rebalanceTimeout (60s) and heartbeatInterval (5s) enable a stable cluster. This setup quickly identifies failed consumers and provides enough time for reassigning partitions.

The createPartitioner: Partitioners.LegacyPartitioner setting enables consistent message routing to partitions. In our configuration, we’ve set autoCommit:false, opting for manual offset commits and preventing message loss if processing fails.

Next, update the app.controller.ts file with the following:

// notification-service/src/app.controller.ts
import { Controller, Logger } from "@nestjs/common";
import {
  MessagePattern,
  Payload,
  Ctx,
  KafkaContext,
} from "@nestjs/microservices";

@Controller()
export class AppController {
  constructor(private readonly logger: Logger) {}

  @MessagePattern("order_created") // Listen to the 'order_created' topic
  async handleOrderCreated(
    @Payload() payload: any, // Inject the message payload
    @Ctx() context: KafkaContext // Inject the Kafka message context
  ) {
    const { offset } = context.getMessage();
    const topic = context.getTopic();
    const partition = context.getPartition();
    this.logger.log(
      `Received message from topic [${topic}] partition [${partition}] offset [${offset}]`,
      AppController.name
    );

    this.logger.log(`Payload: ${JSON.stringify(payload)}`, AppController.name);

    try {
      // --- Your Business Logic Here ---
      this.logger.log(
        `Simulating notification dispatch for Order ID: ${payload.orderId}...`,
        AppController.name
      );
      // In a real app: call email service, push notification service, etc.
      this.logger.log(
        `Notification for Order ID: ${payload.orderId} processed.`,
        AppController.name
      );
      await context
        .getConsumer()
        .commitOffsets([
          { topic, partition, offset: (parseInt(offset) + 1).toString() },
        ]);
    } catch (error) {
      this.logger.error(
        `Error processing notification for order ${payload?.orderId}: ${error.message}`,
        error.stack,
        AppController.name
      );
    }
  }
}

The code above implements a Kafka message handler for the order_created topic using the @MessagePattern decorator. The handler receives two critical parameters through decorators:

  • @Payload(): Injects the deserialized message content, which is typically a JavaScript object when using JSON.
  • @Ctx(): Supplies the Kafka message context, which includes metadata such as topic, partition and offset.

Finally, it manually commits the offset after successful processing using commitOffsets(), which is necessary when autoCommit is disabled. Error handling captures and logs processing failures without crashing the consumer, thereby maintaining service availability.

Next, update the app.module.ts file with the code below:

// notification-service/src/app.module.ts
import { Module, Logger } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { AppController } from "./app.controller";
import { AppService } from "./app.service";

@Module({
  imports: [ConfigModule.forRoot({ isGlobal: true })],
  controllers: [AppController],
  providers: [AppService, Logger], // Provide Logger
})
export class AppModule {}

Now, you can start the server by running this command:

npm run start:dev

Testing

To test our project, send a test order with the curl request below:

curl -X POST http://localhost:3001/orders \
  -H "Content-Type: application/json" \
  -d '{"productId": "prod_123", "quantity": 2}'

Let’s check the logs in both services.

The order-service logs should look like this:

[Nest] 11204  - 28/04/2025, 21:49:09     LOG [OrderController] Received create order request: {"productId":"prod_123","quantity":2}
[Nest] 11204  - 28/04/2025, 21:49:09     LOG [OrderService] Publishing event to topic [order_created]

The notification-service logs should look like this:

[Nest] 20876  - 28/04/2025, 21:49:09     LOG [AppController] Received message from topic [order_created] partition [1] offset [0]
[Nest] 20876  - 28/04/2025, 21:49:09     LOG [AppController] Payload: {"orderId":"ORD-1745873349101","productId":"prod_123","quantity":2,"timestamp":"2025-04-28T20:49:09.101Z"}
[Nest] 20876  - 28/04/2025, 21:49:09     LOG [AppController] Simulating notification dispatch for Order ID: ORD-1745873349101...
[Nest] 20876  - 28/04/2025, 21:49:09     LOG [AppController] Notification for Order ID: ORD-1745873349101 processed.

Conclusion

In this post, we built two NestJS microservices that communicate via Kafka—an order service (our producer) and a notification service (our consumer).

Our setup enables resilience with retries and error handling, as well as scalability through consumer groups and partitions. With this foundation, we’ve covered the basics of event-driven architecture with Kafka while avoiding common pitfalls.


About the Author

Christian Nwamba

Chris Nwamba is a Senior Developer Advocate at AWS focusing on AWS Amplify. He is also a teacher with years of experience building products and communities.

Related Posts