In the following program I use Tokio's mpsc channels. The Sender is moved to a task named input_message and the Receiver is moved to another task named printer. Both tasks are tokio::spawn()-ed in the main function. The input_message task is to read the user's input and send it through a Channel. The printer task recv() on the channel to get the user's input and simply prints it to stdout:
use std::error::Error;
use tokio::sync::mpsc;
use std::io::{BufRead, Write};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
let printer = tokio::spawn(async move {
loop {
let res = rx.recv().await; // (11) Comment this ..
// let res = rx.try_recv(); // (12) Uncomment this ,,
if let Some(m) = res { // .. and this
// if let Ok(m) = res { // ,, and this
if m.trim() == "q".to_string() {
break;
}
println!("Received: {}", m.trim());
}
}
println!("Printer exited");
});
let input_message = tokio::spawn(async move {
let stdin = std::io::stdin();
let mut bufr = std::io::BufReader::new(stdin);
let mut buf = String::new();
loop {
// Let the printer thread print the string before asking the user's input.
std::thread::sleep(std::time::Duration::from_millis(1));
print!("Enter input: ");
std::io::stdout().flush().unwrap();
bufr.read_line(&mut buf).unwrap();
if buf.trim() == "q".to_string() {
tx.send(buf).unwrap();
break;
}
tx.send(buf).unwrap();
buf = String::new();
}
println!("InputMessage exited");
});
tokio::join!(input_message, printer);
Ok(())
}
The expected behavior of the program is to:
- Ask the user a random input (q to quit)
- Print that same input to stdout
Using rx.recv().await as in line 11-13 the program seems to buffer the Strings representing the user's input: the various inputs are not received by the printer task that therefore does not print the strings to stdout. Once the quit message (i.e. q) is sent, the input_message task exits and the messages seems to be flushed out of the channel and the receiver processes them all at once, and so the printer task prints all the inputs at once. Here's an example of wrong output:
Enter input: Hello
Enter input: World
Enter input: q
InputMessage exited
Received: Hello
Received: World
Printer exited
My question here is, how is it possible that the channel buffers the messages and processes them in one go only when the sending thread exits, instead of receiving them as they are sent?
What I tried to do is to use the try_recv() function as in line 12-14 and indeed it fixes the problem. The output is correctly printed, here is an example:
Enter input: Hello
Received: Hello
Enter input: World
Received: World
Enter input: q
InputMessage exited
Printer exited
In light of this, I get confused. I get the difference between the recv().await and the try_recv() functions but I think there's something more in this case that I'm ignoring that makes the latter work and the former not work. Is anybody able to shed some light and elaborate on this? Why does try_recv() work and recv().await not, and why should recv().await not work in this scenario? In terms of efficiency is looping on try_recv() bad or "bad practice" at all?
CodePudding user response:
There are a few things to point out here, but first of all, you are waiting for lines on std::io::stdin() which blocks the thread until a line arrives on that stream. While the thread waiting for input, no other future can be executed on this thread, this blog post is a great resource if you want to dive deeper why you shouldn't do that.
Tokio's io module offers an async handle to stdin(), you can work with this as a quick fix, although the documentation explicitly mentions that you should spin up a dedicated (non-async) thread for interactive user input instead of using the async handle.
Swapping std::io::stdin() for tokio::io::stdin() also entails swapping out the standard library BufReader for tokio's implementation that wraps an R: AsyncRead rather than R: Read.
To prevent interleaved writes between the input task and the output task, you can use a responder channel that signals to the input task when the output has been printed. Instead of sending String over the channel, you could send a Message with these fields:
struct Message {
payload: String,
done_tx: oneshot::Sender<()>,
}
After reading an input line, send the Message over the channel to the printer task. The printer task prints the String and signals through the done_tx that the input task can print the input prompt and wait for a new line.
Putting all that together with some other changes like a while loop to wait for messages, you'd end up with something like this:
use std::error::Error;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot};
#[derive(Debug)]
struct Message {
done_tx: oneshot::Sender<()>,
message: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
let printer = tokio::spawn(async move {
while let Some(Message {
message: m,
done_tx,
}) = rx.recv().await
{
if m.trim() == "q".to_string() {
break;
}
println!("Received: {}", m.trim());
done_tx.send(()).unwrap();
}
println!("Printer exited");
});
let input_message = tokio::spawn(async move {
let stdin = tokio::io::stdin();
let mut stdout = tokio::io::stdout();
let mut bufr = tokio::io::BufReader::new(stdin);
let mut buf = String::new();
loop {
// Let the printer thread print the string before asking the user's input.
stdout.write(b"Enter input: ").await.unwrap();
stdout.flush().await.unwrap();
bufr.read_line(&mut buf).await.unwrap();
let end = buf.trim() == "q";
let (done_tx, done) = oneshot::channel();
let message = Message {
message: buf,
done_tx,
};
tx.send(message).unwrap();
if end {
break;
}
done.await.unwrap();
buf = String::new();
}
println!("InputMessage exited");
});
tokio::join!(input_message, printer);
Ok(())
}
