I am trying to write a program that takes a list of urls and uses async requests to retrieve the StatusCode and body. This is what It currently looks like, but the requests don't seem to be sent async, but one by one.
extern crate futures;
use futures::{stream, StreamExt};
use reqwest::{Client as http, StatusCode};
use std::time::Duration;
#[tokio::main]
async fn main() {
let client_builder = http::builder().connect_timeout(Duration::from_secs(5))
.danger_accept_invalid_certs(true)
.redirect(reqwest::redirect::Policy::none())
.timeout(Duration::from_secs(5))
.user_agent("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36");
let client = client_builder.build().unwrap();
let mut urls = vec![];
for _ in 0 .. 100 {
urls.push("https://google.com:443".to_string());
}
let results = stream::iter(urls).filter_map(|url| async {
let response = (&client)
.get(&url)
.send()
.await
.ok()?; // Result<T, E> => Option<T> => T
Some((url, response))
})
.filter_map(|(url, response)| async {
let status = response.status();
let body = response
.text()
.await
.ok()?; // Result<T, E> => Option<T> => T
println!("{}", url);
Some((url, status, body))
})
.map(|elem| futures::future::ready(elem))
.buffer_unordered(40)
.collect::<Vec<(String, StatusCode, String)>>()
.await;
}
Modifying the value of .buffer_unordered(40) to 1 leads to no speed difference, which confirms the fact that the requwests are not being async. It's also obvious that the program waits for the response for each request individually when obvserving the print output.
What do I need to modify in order for my requests to be sent async?
It is also important for me that I can control conccurency using .buffer_unordered() and that the results are collected into a Vector, not printed, so removing the .collect() at the end won't work.
CodePudding user response:
Firstly, this line is not good:
.map(|elem| futures::future::ready(elem))
It is basically forcing that if you want to proceed, we need to be able to get an elem: (url, status, body) result.
With this when buffer_unordered() wants to buffer them one by one, the futures are getting completed one by one. buffer_unordered can't magically undo what happens before it.
Secondly, once you remove this line, you'll see that buffer_unordered expects Future items (to be able to group them first and then run in parallel), but you are giving it (url, status, body) from filter_map.
So instead of filter_map you could use a plain "map" to map urls to async blocks (which are futures):
let results = stream::iter(urls)
.map(|url| async { // not run anything yet at this point,
// just convert url -> Future
let response = (&client)
.get(&url)
.send()
.await
.ok()?; // Result<T, E> => Option<T> => T
let status = response.status();
let body = response
.text()
.await
.ok()?; // Result<T, E> => Option<T> => T
println!("{} ready", url);
Some((url, status, body))
})
.buffer_unordered(40) // get up to 40 futures (async blocks)
// and run them in parallel
...
You could make this more explicit, make Futures eagerly first, and then lazily run them in a stream:
let futures = urls.into_iter().map(|url| async { ... })
let results = stream::iter(futures)
.buffer_unordered(40)
...
If you want to filter out errors, you can do this after buffer_unordered:
.buffer_unordered(40)
.filter_map(|result| result)
...
