Typescript Consuming and Producing Redpanda Messages<!-- --> | <!-- -->Patrick Desjardins Blog
Patrick Desjardins Blog
Patrick Desjardins picture from a conference

Typescript Consuming and Producing Redpanda Messages

Posted on: September 11, 2023

Redpanda

Redpanda is a C++ implementation of Kafka that is very performance and easy to install because it does not require Java or Apache Zookeeper. Redpanda, like Kafka, is a pub/sub system allowing messages to travel from a producer to one or many consumers.

Simple Goal

The goal is to start an instance of Redpanda, create a script that will consume messages as they go in, and use a second script to push a message that the consumer script will see.

1) Start Redpanda 2) Create a consumer 3) Create a producer

Redpanda Installation

Redpanda Command Line

Redpanda offers a single command line to start a Redpanda node or cluster of nodes).

Step #1: Open WSL2 on Windows Step #2: Download and install

1curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-amd64.zip
2mkdir -p ~/.local/bin
3unzip rpk-linux-amd64.zip -d ~/.local/bin/
4rpk container start

If you are using ZSH, edit the .zshrc by adding the path to the installation of Redpanda (rpk).

1path+=("~/.local/bin")

Docker Image

The former way did not work for me. However, using the docker Redpanda image worked.

1docker run -d --pull=always --name=redpanda-1 --rm \
2-p 8081:8081 \
3-p 8082:8082 \
4-p 9092:9092 \
5-p 9644:9644 \
6docker.redpanda.com/redpandadata/redpanda:latest \
7redpanda start \
8--overprovisioned \
9--smp 1 \
10--memory 1G \
11--reserve-memory 0M \
12--node-id 0 \
13--check=false

Then login into the container:

1docker exec -it redpanda-1 bash

Redpanda Topic

Messages need to travel into a topic. The producer will push the message into a topic. The consumer will pull the message from the topic.

1rpk topic create my-topic

Node Configurations

Redpanda is compatible with Kafka API. Thus, the TypeScript code will be using the kafkajs library. We will use the uuid library to have a unique identifier to consume the message.

1npm install --save uuid@latest kafkajs@latest
2npm install --save-dev @types/node @types/uuid typescript@latest

Consumer TypeScript Code

The first part is the consumer. The consumer will run first to receive the message from the producer. The consumer uses the Kafkajs library to connect to Redpanda. The main Kafka object connects to one or many brokers. They are instances of the Redpanda node.

Import and Kafka

1import { Kafka, logLevel } from "kafkajs";
2import { brokers, topicName } from "./shared.js";
3import { v4 as uuidv4 } from "uuid";
4
5const redpanda = new Kafka({
6 brokers: brokers,
7 logLevel: logLevel.DEBUG,
8});

The ./shared.js is a file shared between the consumer and producer that contains the brokers and topic name. Thus, one place to edit if we change it and both parts of the system are updated. The content is two lines:

1export const brokers = ["localhost:9092", "localhost:9644"];
2export const topicName = "my-topic";

Consumer Code

Using the uuid library, the consumer is unique, allowing Redpanda to know if a particular consumer read the messages. Using the same groupId would mean that the system would not push previously pushed messages in the case of a reconnect. Thus, it's essential to have a unique id per user or process group that work together to consume messages on a topic.

1const consumer = redpanda.consumer({ groupId: uuidv4() });
2
3try {
4 await consumer.connect();
5 await consumer.subscribe({
6 topic: topicName,
7 });
8 await consumer.run({
9 eachMessage: async ({ topic, partition, message }) => {
10 console.log(
11 `Received message: ${message.value} at ${topic}:${partition}, offset ${message.offset} and time ${message.timestamp}`
12 );
13 },
14 });
15} catch (error) {
16 console.error("Consumer send:", error);
17}

The run reads one message at a time. Using the batch function can consume more than one message at a time. In that case, we display the value, the offset (order in the topic), and the time it went into Redpanda.

Consumer Cleaning Code

Finally, we need to disconnect in case of an error or in case the process is closed.

1export async function disconnect() {
2 try {
3 await consumer.disconnect();
4 } catch (error) {
5 console.error("Consumer disconnect:", error);
6 }
7}
8
9process.on("SIGINT", async () => {
10 try {
11 await disconnect();
12 } catch (error) {
13 console.error("Consumer:", error);
14 } finally {
15 process.exit(0);
16 }
17});

Producer TypeScript Code

The producer is very similar. The first part is identical.

1import { Kafka, logLevel } from "kafkajs";
2import { brokers, topicName } from "./shared.js";
3
4const redpanda = new Kafka({
5 brokers: brokers,
6 logLevel: logLevel.DEBUG,
7});

The second part is to create a producer instead of a consumer. Then, connect, and instead of running, we are sending with send.

1const producer = redpanda.producer();
2
3try {
4 await producer.connect();
5 await producer.send({
6 topic: topicName,
7 messages: [{ value: "Hello World!" }],
8 });
9} catch (error) {
10 console.error("Producer send:", error);
11}

Finally, similarly, we are handling the disconnect.

1export async function disconnect() {
2 try {
3 await producer.disconnect();
4 } catch (error) {
5 console.error("Producer disconnect:", error);
6 }
7}
8
9process.on("SIGINT", async () => {
10 try {
11 await disconnect();
12 } catch (error) {
13 console.error("Disconnect:", error);
14 } finally {
15 process.exit(0);
16 }
17});

Executing

Running the system requires two consoles open: one for the consumer and one for the producer.

Console #1:

1npm run start:consumer

Console #2:

1npm run start:producer

Once the second npm script starts, the first console will display the message.

Debugging

If you have an issue, you can always use the rpk command line to get more information. Here are some commands that can be useful:

Listening to a topic

If you want to debug your producer, you can listen to the topic. The goal is to isolate the potential issue of not receiving from the consumer by removing from the equation that the consumption code is failing.

1rpk topic consume my-topic

Check the offset of a consumer id

We can use the private __consumer_offsets topic to read information about the offset.

1rpk topic consume __consumer_offsets

The output is a list of consumer with they groupId and the value is their offset within a topic. It's encoded in Unicode. I hardcoded the groupid to patrickgroupid. I know the offset is 4 in my example since the consumer script displays the offset and I ran the producer script four times. We can validate by fetching all the messages using:

1rpk topic consume my-topic -o "start"

Which returns:

1{
2 "topic": "my-topic",
3 "value": "Hello World!",
4 "timestamp": 1694128832293,
5 "partition": 0,
6 "offset": 0
7}
8{
9 "topic": "my-topic",
10 "value": "Hello World!",
11 "timestamp": 1694128870160,
12 "partition": 0,
13 "offset": 1
14}
15{
16 "topic": "my-topic",
17 "value": "Hello World!",
18 "timestamp": 1694129115833,
19 "partition": 0,
20 "offset": 2
21}
22{
23 "topic": "my-topic",
24 "value": "Hello World!",
25 "timestamp": 1694129182939,
26 "partition": 0,
27 "offset": 3
28}
29{
30 "topic": "my-topic",
31 "value": "Hello World!",
32 "timestamp": 1694129515917,
33 "partition": 0,
34 "offset": 4
35}

We can see the private topic __consumer_offsets has these three entries for the group id patrickgroupdid:

1{
2 "topic": "__consumer_offsets",
3 "key": "patrickgroupid",
4 "value": "consumerRoundRobinAssigner,kafkajs-a4eb9d4d-370c-4e98-a7df-4c08d94d9a33�q��<,kafkajs-a4eb9d4d-370c-4e98-a7df-4c08d94d9a33��kafkajs%n172.17.0.1�`u0my-topicmy-topic",
5 "timestamp": 1694129504832,
6 "partition": 0,
7 "offset": 0
8}
9{
10 "topic": "__consumer_offsets",
11 "key": "patrickgroupidmy-topic",
12 "value": "�����q���",
13 "timestamp": 1694129515933,
14 "partition": 0,
15 "offset": 1
16}
17{
18 "topic": "__consumer_offsets",
19 "key": "patrickgroupid",
20 "value": "consumer�����q�(U",
21 "timestamp": 1694129530965,
22 "partition": 0,
23 "offset": 2
24}

The offset is not the offset of the group id, but the offset of the message into the __consumer_offsets topic. In that case, the value is "value": "consumer�����q�(U", which is in a binary format. Thus, some code is needed:

1const admin = redpanda.admin();
2try {
3 await admin.connect();
4 const groupPartitionOffset = await admin.fetchOffsets({
5 groupId: "patrickgroupid",
6 topics: [topicName],
7 });
8 for (const partitions of groupPartitionOffset) {
9 console.log(`Topic ${partitions.topic}`);
10 for (const p of partitions.partitions) {
11 console.log(`Offset for ${p.offset} at partition ${p.partition}`);
12 }
13 }
14} catch (error) {
15 console.error("Admin offset:", error);
16}

In this particular case, we are targetting the patrickgroupid which output:

1Topic my-topic
2Offset for 5 at partition 0

The value is 5, meaning the following value to read would be the fifth offset.

Conclusion

The road to this simple example was relatively easy to achieve. Redpanda has good but minimal documentation, and Kafkajs library is prevalent (but needs to be more supported; they are looking for a maintainer). Nonetheless, we can produce and consume messages from Redpanda with minimal lines.

The source code of this article is available in Github.