Typescript Consuming and Producing Redpanda Messages
Posted on: 2023-09-11
Redpanda
Redpanda is a C++ implementation of Kafka that is very performance and easy to install because it does not require Java or Apache Zookeeper. Redpanda, like Kafka, is a pub/sub system allowing messages to travel from a producer to one or many consumers.
Simple Goal
The goal is to start an instance of Redpanda, create a script that will consume messages as they go in, and use a second script to push a message that the consumer script will see.
- Start Redpanda
- Create a consumer
- Create a producer
Redpanda Installation
Redpanda Command Line
Redpanda offers a single command line to start a Redpanda node or cluster of nodes).
Step #1: Open WSL2 on Windows Step #2: Download and install
curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-amd64.zip
mkdir -p ~/.local/bin
unzip rpk-linux-amd64.zip -d ~/.local/bin/
rpk container start
If you are using ZSH, edit the .zshrc
by adding the path to the installation of Redpanda (rpk
).
path+=("~/.local/bin")
Docker Image
The former way did not work for me. However, using the docker Redpanda image worked.
docker run -d --pull=always --name=redpanda-1 --rm \
-p 8081:8081 \
-p 8082:8082 \
-p 9092:9092 \
-p 9644:9644 \
docker.redpanda.com/redpandadata/redpanda:latest \
redpanda start \
--overprovisioned \
--smp 1 \
--memory 1G \
--reserve-memory 0M \
--node-id 0 \
--check=false
Then login into the container:
docker exec -it redpanda-1 bash
Redpanda Topic
Messages need to travel into a topic. The producer will push the message into a topic. The consumer will pull the message from the topic.
rpk topic create my-topic
Node Configurations
Redpanda is compatible with Kafka API. Thus, the TypeScript code will be using the kafkajs
library. We will use the uuid
library to have a unique identifier to consume the message.
npm install --save uuid@latest kafkajs@latest
npm install --save-dev @types/node @types/uuid typescript@latest
Consumer TypeScript Code
The first part is the consumer. The consumer will run first to receive the message from the producer. The consumer uses the Kafkajs library to connect to Redpanda. The main Kafka object connects to one or many brokers. They are instances of the Redpanda node.
Import and Kafka
import { Kafka, logLevel } from "kafkajs";
import { brokers, topicName } from "./shared.js";
import { v4 as uuidv4 } from "uuid";
const redpanda = new Kafka({
brokers: brokers,
logLevel: logLevel.DEBUG,
});
The ./shared.js
is a file shared between the consumer and producer that contains the brokers and topic name. Thus, one place to edit if we change it and both parts of the system are updated. The content is two lines:
export const brokers = ["localhost:9092", "localhost:9644"];
export const topicName = "my-topic";
Consumer Code
Using the uuid
library, the consumer is unique, allowing Redpanda to know if a particular consumer read the messages. Using the same groupId
would mean that the system would not push previously pushed messages in the case of a reconnect. Thus, it's essential to have a unique id per user or process group that work together to consume messages on a topic.
const consumer = redpanda.consumer({ groupId: uuidv4() });
try {
await consumer.connect();
await consumer.subscribe({
topic: topicName,
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(
`Received message: ${message.value} at ${topic}:${partition}, offset ${message.offset} and time ${message.timestamp}`
);
},
});
} catch (error) {
console.error("Consumer send:", error);
}
The run
reads one message at a time. Using the batch
function can consume more than one message at a time. In that case, we display the value, the offset (order in the topic), and the time it went into Redpanda.
Consumer Cleaning Code
Finally, we need to disconnect in case of an error or in case the process is closed.
export async function disconnect() {
try {
await consumer.disconnect();
} catch (error) {
console.error("Consumer disconnect:", error);
}
}
process.on("SIGINT", async () => {
try {
await disconnect();
} catch (error) {
console.error("Consumer:", error);
} finally {
process.exit(0);
}
});
Producer TypeScript Code
The producer is very similar. The first part is identical.
import { Kafka, logLevel } from "kafkajs";
import { brokers, topicName } from "./shared.js";
const redpanda = new Kafka({
brokers: brokers,
logLevel: logLevel.DEBUG,
});
The second part is to create a producer instead of a consumer. Then, connect, and instead of running, we are sending with send
.
const producer = redpanda.producer();
try {
await producer.connect();
await producer.send({
topic: topicName,
messages: [{ value: "Hello World!" }],
});
} catch (error) {
console.error("Producer send:", error);
}
Finally, similarly, we are handling the disconnect.
export async function disconnect() {
try {
await producer.disconnect();
} catch (error) {
console.error("Producer disconnect:", error);
}
}
process.on("SIGINT", async () => {
try {
await disconnect();
} catch (error) {
console.error("Disconnect:", error);
} finally {
process.exit(0);
}
});
Executing
Running the system requires two consoles open: one for the consumer and one for the producer.
Console #1:
npm run start:consumer
Console #2:
npm run start:producer
Once the second npm script starts, the first console will display the message.
Debugging
If you have an issue, you can always use the rpk
command line to get more information. Here are some commands that can be useful:
Listening to a topic
If you want to debug your producer, you can listen to the topic. The goal is to isolate the potential issue of not receiving from the consumer by removing from the equation that the consumption code is failing.
rpk topic consume my-topic
Check the offset of a consumer id
We can use the private __consumer_offsets
topic to read information about the offset.
rpk topic consume __consumer_offsets
The output is a list of consumer with they groupId and the value is their offset within a topic. It's encoded in Unicode. I hardcoded the groupid to patrickgroupid
. I know the offset is 4
in my example since the consumer script displays the offset and I ran the producer script four times. We can validate by fetching all the messages using:
rpk topic consume my-topic -o "start"
Which returns:
{
"topic": "my-topic",
"value": "Hello World!",
"timestamp": 1694128832293,
"partition": 0,
"offset": 0
}
{
"topic": "my-topic",
"value": "Hello World!",
"timestamp": 1694128870160,
"partition": 0,
"offset": 1
}
{
"topic": "my-topic",
"value": "Hello World!",
"timestamp": 1694129115833,
"partition": 0,
"offset": 2
}
{
"topic": "my-topic",
"value": "Hello World!",
"timestamp": 1694129182939,
"partition": 0,
"offset": 3
}
{
"topic": "my-topic",
"value": "Hello World!",
"timestamp": 1694129515917,
"partition": 0,
"offset": 4
}
We can see the private topic __consumer_offsets
has these three entries for the group id patrickgroupdid
:
{
"topic": "__consumer_offsets",
"key": "patrickgroupid",
"value": "consumerRoundRobinAssigner,kafkajs-a4eb9d4d-370c-4e98-a7df-4c08d94d9a33�q��<,kafkajs-a4eb9d4d-370c-4e98-a7df-4c08d94d9a33��kafkajs%n172.17.0.1�`u0my-topicmy-topic",
"timestamp": 1694129504832,
"partition": 0,
"offset": 0
}
{
"topic": "__consumer_offsets",
"key": "patrickgroupidmy-topic",
"value": "�����q���",
"timestamp": 1694129515933,
"partition": 0,
"offset": 1
}
{
"topic": "__consumer_offsets",
"key": "patrickgroupid",
"value": "consumer�����q�(U",
"timestamp": 1694129530965,
"partition": 0,
"offset": 2
}
The offset
is not the offset of the group id, but the offset of the message into the __consumer_offsets
topic. In that case, the value is "value": "consumer�����q�(U",
which is in a binary format.
Thus, some code is needed:
const admin = redpanda.admin();
try {
await admin.connect();
const groupPartitionOffset = await admin.fetchOffsets({
groupId: "patrickgroupid",
topics: [topicName],
});
for (const partitions of groupPartitionOffset) {
console.log(`Topic ${partitions.topic}`);
for (const p of partitions.partitions) {
console.log(`Offset for ${p.offset} at partition ${p.partition}`);
}
}
} catch (error) {
console.error("Admin offset:", error);
}
In this particular case, we are targetting the patrickgroupid
which output:
Topic my-topic
Offset for 5 at partition 0
The value is 5, meaning the following value to read would be the fifth offset.
Conclusion
The road to this simple example was relatively easy to achieve. Redpanda has good but minimal documentation, and Kafkajs library is prevalent (but needs to be more supported; they are looking for a maintainer). Nonetheless, we can produce and consume messages from Redpanda with minimal lines.
The source code of this article is available in Github.