Patrick Desjardins Blog
Patrick Desjardins picture from a conference

How to use mpsc::unbounded to share data between Rust threads

Posted on: 2022-11-26

Moving information between threads is always a delicate task. This is because so many issues can occur, and Rust is built around safety; hence it has a safe way to send information between two threads.

One way is to rely on the futures crate and use a unbounded. The documentation is fuzzy around how to use the future::mpsc::unbounded, and I'll try to provide an easy example to grasp the whole idea.

First, before getting too deep with future::mpsc::unbounded, know that there is also future::mpsc::channel. Channel is similar but is for bounded communication. The bounded has a single channel per sender, while the future::mpsc::unbounded has a single channel for every sender. The example uses the future::mpsc::unbounded with an internal buffer in case the receiver is not fetching the data fast enough.

Main Loop Function

The example consists of three functions. The first one, the main one, is the entry one that will spawn two threads.

fn main() {
    let (broker_sender, broker_receiver) = mpsc::unbounded::<String>();
    task::spawn(receive_loop(broker_receiver));
    task::block_on(send_loop(broker_sender));
}
```

The loop uses `task::spawn` that starts a thread without blocking the execution of the `main` function and starting the `receive_loop` function into a separate thread. It allows getting into the second line that blocks the `main` function to keep the program running while also running a second function, `send_loop` to run into another thread. 

The `mpsc::unbounded::<String>` create a sender and receiver that can be shared between the two threads.

# send_loop Function

The send loop is a function that takes the user input from the command line and adds the value into the `broker_sender` that gets the string into the `mpsc::unbounded` to have the `receive_loop` read it later. 

````rust
async fn send_loop(mut broker: mpsc::UnboundedSender<String>) -> () {
    loop {
        let mut user_input = String::new();

        print!("> ");
        io::stdout().flush().unwrap();
        io::stdin().read_line(&mut user_input).unwrap();
        let user_input_clean = user_input.trim();
        broker.send(user_input_clean.to_string()).await.unwrap();
    }
}
```

The function has an infinite loop, waiting for the user's inputs. Then, it waits to read the `stdin`, which takes the user line that is all the characters until the enter key is pressed. Finally, the last line gets the `mpsc::UnboundedSender<String>` and `send` the `String` using `await.unwrap`, which asynchronously puts the value into the communication pipeline. It still needs to be consumed.

# receive_loop Function
The last step is the receiving function.

````rust
async fn receive_loop(mut broker: mpsc::UnboundedReceiver<String>) -> () {
    let ten_millis = time::Duration::from_millis(500);
    loop {
        if let Ok(wrapper_msg) = broker.try_next() {
            println!("New message: {}", wrapper_msg.unwrap());
        }
        thread::sleep(ten_millis)
    }
}
```

The function also has an infinite loop. It reads the `mpsc::UnboundedReceiver<String>` where the value is sitting from the previous function. The code above takes the value by calling `try_next`, which returns a `Result`. The function does not look for the `Err` path because the `try_next` spams an error anytime the `mpsc::UnboundedReceiver` is empty. However, when data is ready to be read, the value goes into the `wrapper_msg`, an `Option` type. Hence, the `unwrap` is called to read the string.

# Conclusion

A final observation is that the `mpsc::UnboundedSender` and `mpsc::UnboundedReceiver` shared between the two functions required to be mutable. You can see the `mut` in both parameters. Without the `mut`, the code throws a compilation error:

```
cannot borrow `broker` as mutable, as it is not declared as mutable
cannot borrow as mutable
```

The reason seems to be that the `try_next` has a `&mut self`  in its [definition](https://docs.rs/futures-channel/0.3.25/src/futures_channel/mpsc/mod.rs.html#1131). From my limited knowledge of Rust, the reason of why the `try_next` seems to be that at the end, it has a queue where it needs to change its value (mutate) with the [unpark_one](https://docs.rs/futures-channel/0.3.25/src/futures_channel/mpsc/mod.rs.html#1031). 


There is a very low amount of examples on the Internet using the `mpsc::unbounded`. I hope these small examples gave you help on how to use Rust Future mpsc to pass information between threads.

The code in this article relies on these two dependencies:

```
futures = "0.3.25"
async-std = "1.12.0"
```