Python Asynchronous Queue
Posted on: 2025-08-15
Asynchronous Queue
The library asyncio offers a queue. A producer injects data into the queue. A consumer reads data from the queue. The queue is first-in, first-out (FIFO) and is useful when there is incoming data that is difficult to synchronize with the output process. For example, a system might receive data and sometimes receive spikes of new data, while the processing of the data varies depending on the content. Thus, there isn't a smooth flow between receiving and processing the outcome. Having a queue between the in and out is interesting. The main goal is to offer back pressure to the consumer without having the producer wait. Instead, the producer continues to act as fast as possible, and the consumer reads asynchronously from the queue.
Let's create a hypothetical example where the main code initiates two asynchronous functions. The two functions will run in an infinite loop, simulating a system that operates continuously.
async def main():
queue = Queue()
await gather(producer(queue), consumer(queue))
if __name__ == "__main__":
run(main())
Line 2 creates a Queue
which is from the asyncio library. The main function passes the instance of the queue to the producer and consumer functions. The shared instance allows both parts of the code to interact on the same queue.
Producer
The fake producer randomly sleeps between 250ms and 1500ms before inserting using put
into the queue. The data in the queue is the random delay, but could be anything.
async def producer(queue: Queue):
while True:
delay = random.randint(250, 1500) / 1000
await sleep(delay)
print(f"Writing {delay}")
await queue.put(delay)
The code simulates a steady flow of new data in a random fashion. For example, that could be an HTTP request coming into our Python application from an endpoint that requires inserting data into the database.
Consumer
The consumer is also a long-lived function that loops forever. However, there is a twist, it consumes slow or fast depending of the amount of data in the queue. If the data is above 5, the sleep
is shorter; otherwise, it is longer. The simulation aims to observe over time whether the consumer will eliminate almost everything in the queue or sometimes accumulate more.
async def consumer(queue: Queue):
while True:
queue_size = queue.qsize()
if queue_size > 5:
consume_fast = True
elif queue_size == 0:
consume_fast = False
if consume_fast:
delay = random.randint(200, 600) / 1000
else:
delay = random.randint(1000, 2000) / 1000
await sleep(delay)
data = await queue.get()
print(f"Reading {data} and the queue still has {queue.qsize()} elements")
Besides the logic to have two speeds of consumption, the code is mostly awaiting the queue.get
method, which removes the next oldest item from the queue.
The video shows that the producer writes more than the consumer reads. At 13 seconds, the queue jumps from 4 elements to 6, activating the consumer to consume from 1-2 seconds to 200-600ms. By the 19th second, the queue is empty. While empty, the consumer's loop does not run at a crazy speed -- it waits in the queue using queue.get
. Letting the processor handle other tasks, like the producer code.
Conclusion
Even with two infinite loops, the Python script continues to run smoothly. In the case that one function has nothing to work on, it doesn't block or consume any time, allowing other parts of the system to use the resources. The advantage of using a queue is to facilitate cross-communication between asynchronous functions safely. Adding and removing elements does not cause issues as the queue handles the locking mechanism of the queue, avoiding potential errors.
Source code available on Github.