How to use mpsc::unbounded to share data between Rust threads
Posted on: November 26, 2022
Moving information between threads is always a delicate task. This is because so many issues can occur, and Rust is built around safety; hence it has a safe way to send information between two threads.
One way is to rely on the futures
crate and use a unbounded
. The documentation is fuzzy around how to use the future::mpsc::unbounded,
and I'll try to provide an easy example to grasp the whole idea.
First, before getting too deep with future::mpsc::unbounded
, know that there is also future::mpsc::channel
. Channel is similar but is for bounded communication. The bounded has a single channel per sender, while the future::mpsc::unbounded
has a single channel for every sender. The example uses the future::mpsc::unbounded
with an internal buffer in case the receiver is not fetching the data fast enough.
Main Loop Function
The example consists of three functions. The first one, the main one, is the entry one that will spawn two threads.
1fn main() {2 let (broker_sender, broker_receiver) = mpsc::unbounded::<String>();3 task::spawn(receive_loop(broker_receiver));4 task::block_on(send_loop(broker_sender));5}
The loop uses task::spawn
that starts a thread without blocking the execution of the main
function and starting the receive_loop
function into a separate thread. It allows getting into the second line that blocks the main
function to keep the program running while also running a second function, send_loop
to run into another thread.
The mpsc::unbounded::<String>
create a sender and receiver that can be shared between the two threads.
send_loop Function
The send loop is a function that takes the user input from the command line and adds the value into the broker_sender
that gets the string into the mpsc::unbounded
to have the receive_loop
read it later.
1async fn send_loop(mut broker: mpsc::UnboundedSender<String>) -> () {2 loop {3 let mut user_input = String::new();45 print!("> ");6 io::stdout().flush().unwrap();7 io::stdin().read_line(&mut user_input).unwrap();8 let user_input_clean = user_input.trim();9 broker.send(user_input_clean.to_string()).await.unwrap();10 }11}
The function has an infinite loop, waiting for the user's inputs. Then, it waits to read the stdin
, which takes the user line that is all the characters until the enter key is pressed. Finally, the last line gets the mpsc::UnboundedSender<String>
and send
the String
using await.unwrap
, which asynchronously puts the value into the communication pipeline. It still needs to be consumed.
receive_loop Function
The last step is the receiving function.
1async fn receive_loop(mut broker: mpsc::UnboundedReceiver<String>) -> () {2 let ten_millis = time::Duration::from_millis(500);3 loop {4 if let Ok(wrapper_msg) = broker.try_next() {5 println!("New message: {}", wrapper_msg.unwrap());6 }7 thread::sleep(ten_millis)8 }9}
The function also has an infinite loop. It reads the mpsc::UnboundedReceiver<String>
where the value is sitting from the previous function. The code above takes the value by calling try_next
, which returns a Result
. The function does not look for the Err
path because the try_next
spams an error anytime the mpsc::UnboundedReceiver
is empty. However, when data is ready to be read, the value goes into the wrapper_msg
, an Option
type. Hence, the unwrap
is called to read the string.
Conclusion
A final observation is that the mpsc::UnboundedSender
and mpsc::UnboundedReceiver
shared between the two functions required to be mutable. You can see the mut
in both parameters. Without the mut
, the code throws a compilation error:
1cannot borrow `broker` as mutable, as it is not declared as mutable2cannot borrow as mutable
The reason seems to be that the try_next
has a &mut self
in its definition. From my limited knowledge of Rust, the reason of why the try_next
seems to be that at the end, it has a queue where it needs to change its value (mutate) with the unpark_one.
There is a very low amount of examples on the Internet using the mpsc::unbounded
. I hope these small examples gave you help on how to use Rust Future mpsc to pass information between threads.
The code in this article relies on these two dependencies:
1futures = "0.3.25"2async-std = "1.12.0"