Patrick Desjardins Blog
Patrick Desjardins picture from a conference

Typescript Consuming and Producing Redpanda Messages

Posted on: 2023-09-11

Redpanda

Redpanda is a C++ implementation of Kafka that is very performance and easy to install because it does not require Java or Apache Zookeeper. Redpanda, like Kafka, is a pub/sub system allowing messages to travel from a producer to one or many consumers.

Simple Goal

The goal is to start an instance of Redpanda, create a script that will consume messages as they go in, and use a second script to push a message that the consumer script will see.

  1. Start Redpanda
  2. Create a consumer
  3. Create a producer

Redpanda Installation

Redpanda Command Line

Redpanda offers a single command line to start a Redpanda node or cluster of nodes).

Step #1: Open WSL2 on Windows Step #2: Download and install

curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-amd64.zip
mkdir -p ~/.local/bin
unzip rpk-linux-amd64.zip -d ~/.local/bin/
rpk container start

If you are using ZSH, edit the .zshrc by adding the path to the installation of Redpanda (rpk).

path+=("~/.local/bin")

Docker Image

The former way did not work for me. However, using the docker Redpanda image worked.

docker run -d --pull=always --name=redpanda-1 --rm \
-p 8081:8081 \
-p 8082:8082 \
-p 9092:9092 \
-p 9644:9644 \
docker.redpanda.com/redpandadata/redpanda:latest \
redpanda start \
--overprovisioned \
--smp 1  \
--memory 1G \
--reserve-memory 0M \
--node-id 0 \
--check=false

Then login into the container:

docker exec -it redpanda-1 bash

Redpanda Topic

Messages need to travel into a topic. The producer will push the message into a topic. The consumer will pull the message from the topic.

rpk topic create my-topic

Node Configurations

Redpanda is compatible with Kafka API. Thus, the TypeScript code will be using the kafkajs library. We will use the uuid library to have a unique identifier to consume the message.

npm install --save uuid@latest kafkajs@latest    
npm install --save-dev @types/node @types/uuid typescript@latest  

Consumer TypeScript Code

The first part is the consumer. The consumer will run first to receive the message from the producer. The consumer uses the Kafkajs library to connect to Redpanda. The main Kafka object connects to one or many brokers. They are instances of the Redpanda node.

Import and Kafka

import { Kafka, logLevel } from "kafkajs";
import { brokers, topicName } from "./shared.js";
import { v4 as uuidv4 } from "uuid";

const redpanda = new Kafka({
  brokers: brokers,
  logLevel: logLevel.DEBUG,
});

The ./shared.js is a file shared between the consumer and producer that contains the brokers and topic name. Thus, one place to edit if we change it and both parts of the system are updated. The content is two lines:

export const brokers = ["localhost:9092", "localhost:9644"];
export const topicName = "my-topic";

Consumer Code

Using the uuid library, the consumer is unique, allowing Redpanda to know if a particular consumer read the messages. Using the same groupId would mean that the system would not push previously pushed messages in the case of a reconnect. Thus, it's essential to have a unique id per user or process group that work together to consume messages on a topic.

const consumer = redpanda.consumer({ groupId: uuidv4() });

try {
  await consumer.connect();
  await consumer.subscribe({
    topic: topicName,
  });
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log(
        `Received message: ${message.value} at ${topic}:${partition}, offset ${message.offset} and time ${message.timestamp}`
      );
    },
  });
} catch (error) {
  console.error("Consumer send:", error);
}

The run reads one message at a time. Using the batch function can consume more than one message at a time. In that case, we display the value, the offset (order in the topic), and the time it went into Redpanda.

Consumer Cleaning Code

Finally, we need to disconnect in case of an error or in case the process is closed.

export async function disconnect() {
  try {
    await consumer.disconnect();
  } catch (error) {
    console.error("Consumer disconnect:", error);
  }
}

process.on("SIGINT", async () => {
  try {
    await disconnect();
  } catch (error) {
    console.error("Consumer:", error);
  } finally {
    process.exit(0);
  }
});

Producer TypeScript Code

The producer is very similar. The first part is identical.

import { Kafka, logLevel } from "kafkajs";
import { brokers, topicName } from "./shared.js";

const redpanda = new Kafka({
  brokers: brokers,
  logLevel: logLevel.DEBUG,
});

The second part is to create a producer instead of a consumer. Then, connect, and instead of running, we are sending with send.

const producer = redpanda.producer();

try {
  await producer.connect();
  await producer.send({
    topic: topicName,
    messages: [{ value: "Hello World!" }],
  });
} catch (error) {
  console.error("Producer send:", error);
}

Finally, similarly, we are handling the disconnect.

export async function disconnect() {
  try {
    await producer.disconnect();
  } catch (error) {
    console.error("Producer disconnect:", error);
  }
}

process.on("SIGINT", async () => {
  try {
    await disconnect();
  } catch (error) {
    console.error("Disconnect:", error);
  } finally {
    process.exit(0);
  }
});

Executing

Running the system requires two consoles open: one for the consumer and one for the producer.

Console #1:

npm run start:consumer

Console #2:

npm run start:producer

Once the second npm script starts, the first console will display the message.

Debugging

If you have an issue, you can always use the rpk command line to get more information. Here are some commands that can be useful:

Listening to a topic

If you want to debug your producer, you can listen to the topic. The goal is to isolate the potential issue of not receiving from the consumer by removing from the equation that the consumption code is failing.

rpk topic consume my-topic

Check the offset of a consumer id

We can use the private __consumer_offsets topic to read information about the offset.

rpk topic consume __consumer_offsets

The output is a list of consumer with they groupId and the value is their offset within a topic. It's encoded in Unicode. I hardcoded the groupid to patrickgroupid. I know the offset is 4 in my example since the consumer script displays the offset and I ran the producer script four times. We can validate by fetching all the messages using:

rpk topic consume my-topic -o "start"

Which returns:

{
  "topic": "my-topic",
  "value": "Hello World!",
  "timestamp": 1694128832293,
  "partition": 0,
  "offset": 0
}
{
  "topic": "my-topic",
  "value": "Hello World!",
  "timestamp": 1694128870160,
  "partition": 0,
  "offset": 1
}
{
  "topic": "my-topic",
  "value": "Hello World!",
  "timestamp": 1694129115833,
  "partition": 0,
  "offset": 2
}
{
  "topic": "my-topic",
  "value": "Hello World!",
  "timestamp": 1694129182939,
  "partition": 0,
  "offset": 3
}
{
  "topic": "my-topic",
  "value": "Hello World!",
  "timestamp": 1694129515917,
  "partition": 0,
  "offset": 4
}

We can see the private topic __consumer_offsets has these three entries for the group id patrickgroupdid:

{
  "topic": "__consumer_offsets",
  "key": "patrickgroupid",
  "value": "consumerRoundRobinAssigner,kafkajs-a4eb9d4d-370c-4e98-a7df-4c08d94d9a33�q��<,kafkajs-a4eb9d4d-370c-4e98-a7df-4c08d94d9a33��kafkajs%n172.17.0.1�`u0my-topicmy-topic",
  "timestamp": 1694129504832,
  "partition": 0,
  "offset": 0
}
{
  "topic": "__consumer_offsets",
  "key": "patrickgroupidmy-topic",
  "value": "�����q���",
  "timestamp": 1694129515933,
  "partition": 0,
  "offset": 1
}
{
  "topic": "__consumer_offsets",
  "key": "patrickgroupid",
  "value": "consumer�����q�(U",
  "timestamp": 1694129530965,
  "partition": 0,
  "offset": 2
}

The offset is not the offset of the group id, but the offset of the message into the __consumer_offsets topic. In that case, the value is "value": "consumer�����q�(U", which is in a binary format. Thus, some code is needed:

const admin = redpanda.admin();
try {
  await admin.connect();
  const groupPartitionOffset = await admin.fetchOffsets({
    groupId: "patrickgroupid",
    topics: [topicName],
  });
  for (const partitions of groupPartitionOffset) {
    console.log(`Topic ${partitions.topic}`);
    for (const p of partitions.partitions) {
      console.log(`Offset for ${p.offset} at partition ${p.partition}`);
    }
  }
} catch (error) {
  console.error("Admin offset:", error);
}

In this particular case, we are targetting the patrickgroupid which output:

Topic my-topic
Offset for 5 at partition 0

The value is 5, meaning the following value to read would be the fifth offset.

Conclusion

The road to this simple example was relatively easy to achieve. Redpanda has good but minimal documentation, and Kafkajs library is prevalent (but needs to be more supported; they are looking for a maintainer). Nonetheless, we can produce and consume messages from Redpanda with minimal lines.

The source code of this article is available in Github.