my async calls appear to run in serial execution .. leaving me to believe Im not doing something correctly in the code or.. perhaps not understanding something
At a high level I take in 100,000 or so strings, use them to perform a query rate limited to 1 per second per connection.. I am starting with 4 connections to the service and want to use all 4 concurrently .. wait for all their answers to come back and then combine the results
Main.cpp
struct DataFetcher {
DataFetcher(cpool::ConnectionPool& tws_conn_pool): conn_pool(&conn_pool){
std::cout << "DataFetcher() this->conn_pool == "<< this->conn_pool << std::endl;
}
std::vector<std::string> process_data(std::vector<std::string> sub_batch_vector){
std::for_each(sub_batch_vector.begin(), sub_batch_vector.end(),[](std::string query_str){
std::cout << query_str << ";";
std::this_thread::sleep_for (std::chrono::seconds(1));
});
cpool::ConnectionPool::ConnectionProxy proxy_conn = this->my_conn_pool->get_connection();
proxy_conn->is_healthy();
// will do more stuff later here
return sub_batch_vector;
}
cpool::ConnectionPool * const tws_conn_pool;
};
What is the proper way to use this async call within a loop.. there will be a variable number of ServiceConnections available so I'd like to do something like this
std::vector<std::future<std::vector<std::string>>> results_vector;
for (int i=0 ; i < this->tws_conn_pool->size()-1; i ) {
std::vector<std::string> subvector = {queries_list.begin() (batch_size*i), queries_list.begin() (batch_size*(i 1))};
DataFetcher fetcher = DataFetcher(*this->conn_pool);
auto fut = std::async(std::launch::async, &DataFetcher::process_data, &fetcher, subvector);
results_vector.push_back(fut);
}
but this seems very obtuse and doesn't actually compile..
from /Server/cpp_server/src/_server.cpp:1:
/usr/include/c /9/ext/new_allocator.h: In instantiation of ‘void __gnu_cxx::new_allocator<_Tp>::construct(_Up*, _Args&& ...) [with _Up = std::future<std::vector<std::__cxx11::basic_string<char> > >; _Args = {const std::future<std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > >&}; _Tp = std::future<std::vector<std::__cxx11::basic_string<char> > >]’:
/usr/include/c /9/bits/alloc_traits.h:482:2: required from ‘static void std::allocator_traits<std::allocator<_CharT> >::construct(std::allocator_traits<std::allocator<_CharT> >::allocator_type&, _Up*, _Args&& ...) [with _Up = std::future<std::vector<std::__cxx11::basic_string<char> > >; _Args = {const std::future<std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > >&}; _Tp = std::future<std::vector<std::__cxx11::basic_string<char> > >; std::allocator_traits<std::allocator<_CharT> >::allocator_type = std::allocator<std::future<std::vector<std::__cxx11::basic_string<char> > > >]’
/usr/include/c /9/bits/stl_vector.h:1189:30: required from ‘void std::vector<_Tp, _Alloc>::push_back(const value_type&) [with _Tp = std::future<std::vector<std::__cxx11::basic_string<char> > >; _Alloc = std::allocator<std::future<std::vector<std::__cxx11::basic_string<char> > > >; std::vector<_Tp, _Alloc>::value_type = std::future<std::vector<std::__cxx11::basic_string<char> > >]’
/Server/cpp_server/src/_server.cpp:175:35: required from here
/usr/include/c /9/ext/new_allocator.h:145:20: error: use of deleted function ‘std::future<_Res>::future(const std::future<_Res>&) [with _Res = std::vector<std::__cxx11::basic_string<char> >]’
145 | noexcept(noexcept(::new((void *)__p)
| ^~~~~~~~~~~~~~~~~~
146 | _Up(std::forward<_Args>(__args)...)))
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In file included from /Server/cpp_server/src/_server.cpp:4:
/usr/include/c /9/future:782:7: note: declared here
782 | future(const future&) = delete;
| ^~~~~~
make[2]: *** [CMakeFiles/_server.dir/build.make:63: CMakeFiles/_server.dir/src/_server.cpp.o] Error 1
make[1]: *** [CMakeFiles/Makefile2:76: CMakeFiles/_server.dir/all] Error 2
CodePudding user response:
Have data fetcher take its pool by pointer. Taking a refernece and immeditatel taking its address is silly.
Move the fut into the vector. Futures cannot be copied, only moved.
Pass fetcher to async by value. Pointers/references to local variables should not be passed to async.
Move the subvector into async. Efficiency.
The future (or moved to destinations) produced by async blocks on the async task on destruction.
