How to use mpsc::unbounded to share data between Rust threads<!-- --> | <!-- -->Patrick Desjardins Blog
Patrick Desjardins Blog
Patrick Desjardins picture from a conference

How to use mpsc::unbounded to share data between Rust threads

Posted on: November 26, 2022

Moving information between threads is always a delicate task. This is because so many issues can occur, and Rust is built around safety; hence it has a safe way to send information between two threads.

One way is to rely on the futures crate and use a unbounded. The documentation is fuzzy around how to use the future::mpsc::unbounded, and I'll try to provide an easy example to grasp the whole idea.

First, before getting too deep with future::mpsc::unbounded, know that there is also future::mpsc::channel. Channel is similar but is for bounded communication. The bounded has a single channel per sender, while the future::mpsc::unbounded has a single channel for every sender. The example uses the future::mpsc::unbounded with an internal buffer in case the receiver is not fetching the data fast enough.

Main Loop Function

The example consists of three functions. The first one, the main one, is the entry one that will spawn two threads.

1fn main() {
2 let (broker_sender, broker_receiver) = mpsc::unbounded::<String>();
3 task::spawn(receive_loop(broker_receiver));
4 task::block_on(send_loop(broker_sender));
5}

The loop uses task::spawn that starts a thread without blocking the execution of the main function and starting the receive_loop function into a separate thread. It allows getting into the second line that blocks the main function to keep the program running while also running a second function, send_loop to run into another thread.

The mpsc::unbounded::<String> create a sender and receiver that can be shared between the two threads.

send_loop Function

The send loop is a function that takes the user input from the command line and adds the value into the broker_sender that gets the string into the mpsc::unbounded to have the receive_loop read it later.

1async fn send_loop(mut broker: mpsc::UnboundedSender<String>) -> () {
2 loop {
3 let mut user_input = String::new();
4
5 print!("> ");
6 io::stdout().flush().unwrap();
7 io::stdin().read_line(&mut user_input).unwrap();
8 let user_input_clean = user_input.trim();
9 broker.send(user_input_clean.to_string()).await.unwrap();
10 }
11}

The function has an infinite loop, waiting for the user's inputs. Then, it waits to read the stdin, which takes the user line that is all the characters until the enter key is pressed. Finally, the last line gets the mpsc::UnboundedSender<String> and send the String using await.unwrap, which asynchronously puts the value into the communication pipeline. It still needs to be consumed.

receive_loop Function

The last step is the receiving function.

1async fn receive_loop(mut broker: mpsc::UnboundedReceiver<String>) -> () {
2 let ten_millis = time::Duration::from_millis(500);
3 loop {
4 if let Ok(wrapper_msg) = broker.try_next() {
5 println!("New message: {}", wrapper_msg.unwrap());
6 }
7 thread::sleep(ten_millis)
8 }
9}

The function also has an infinite loop. It reads the mpsc::UnboundedReceiver<String> where the value is sitting from the previous function. The code above takes the value by calling try_next, which returns a Result. The function does not look for the Err path because the try_next spams an error anytime the mpsc::UnboundedReceiver is empty. However, when data is ready to be read, the value goes into the wrapper_msg, an Option type. Hence, the unwrap is called to read the string.

Conclusion

A final observation is that the mpsc::UnboundedSender and mpsc::UnboundedReceiver shared between the two functions required to be mutable. You can see the mut in both parameters. Without the mut, the code throws a compilation error:

1cannot borrow `broker` as mutable, as it is not declared as mutable
2cannot borrow as mutable

The reason seems to be that the try_next has a &mut self in its definition. From my limited knowledge of Rust, the reason of why the try_next seems to be that at the end, it has a queue where it needs to change its value (mutate) with the unpark_one.

There is a very low amount of examples on the Internet using the mpsc::unbounded. I hope these small examples gave you help on how to use Rust Future mpsc to pass information between threads.

The code in this article relies on these two dependencies:

1futures = "0.3.25"
2async-std = "1.12.0"