I am trying to build an program that performs async http requests and returns their Response body.
This is what the function that returns the Response looks like:
let responses = stream::iter(urls)
.map(|line| {
let client = &client;
async move {
client.get(&line).send().await.map(|resp| {
(line, resp)
})}
})
.buffer_unordered(concurrency_amount);
However, after returning resp, I can't use resp.text(), since resp.text() is of type Future<Output=Result<String>>.
How can I make the function also return resp.text() in the tuple?
CodePudding user response:
Assuming that you are using reqwest, it should be possible to collect the bytes of the response body with Response::chunk(), text() consumes self but chunk() only takes a mutable reference.
Something like the following collects the response body and decodes it to a String in a lossy manner.
use futures_util::StreamExt;
#[tokio::main]
async fn main() {
let cli = reqwest::Client::new();
let urls = vec![
"https://stackoverflow.com".to_string(),
"https://google.com".into(),
"https://tokio.rs".into(),
];
let responses = futures_util::stream::iter(urls.into_iter())
.then(|url| { // note that I replaced `map` with `then` here.
let cli = cli.clone();
async move {
let mut resp = cli.get(url.clone()).send().await.unwrap();
let mut body = Vec::new();
while let Some(chunk) = resp.chunk().await.unwrap() {
body.extend_from_slice(&*chunk);
}
(url, resp, String::from_utf8_lossy(&body).to_string())
}
})
.collect::<Vec<_>>()
.await;
for (url, response, text) in responses {
println!("url: {} status: {} text: {}", url, response.status(), text);
}
}
As the in-line comment notes: I changed the map() call to then() so the stream yields the tuples rather than futures with the tuples as output.
CodePudding user response:
Does this work?
let responses = stream::iter(urls)
.map(|line| {
let client = &client;
(async move || {
let resp = client.get(&line).send().await?;
let text = resp.text().await?;
(line, resp, text)
})()
})
.buffer_unordered(concurrency_amount);
