I've a problem with my loop on a column of a dataframe who call a function that I've created in each iteration. I'm looking for a way to reduce de run time. Is there a possibility to make it ?
Loop
for (i in df_contacts$id) {
tab[nrow(tab) 1,] <- makeHistoryObservedTroubles(current_clean_followups,"2021-02",i)
}
my function
makeHistoryObservedTroubles <- function(df, months_val,v_id) {
col1 = NULL
col2 = NULL
col1 = df %>%
filter(
substr(date_of_followup, 0, 7) == months_val &
(
clin_cmd_ostep == "oui" |
clin_musc == "oui" |
clin_lomb == "oui"
) & visual_id==v_id
) %>%
count()
col2 = df %>%
filter(
substr(date_of_followup, 0, 7) == months_val &
(
clin_ocul == "oui" |
clin_vis == "oui" |
clin_larm == "oui" |
) & visual_id==v_id
) %>%
count()
r <- c(toupper(v_id),col1,col2)
return(r)
}
CodePudding user response:
You could use the foreach package to easily parallelize your code (only if the iterations are independent among each other). The results are saved in a list data structure.
library(foreach)
library(doParallel)
# Prepare for parallelism
n_cores <- detectCores() - 1 # Get the number of cores minus 1
cl <- makeCluster(n_cores)
registerDoParallel(cl)
results <- foreach(i = df_contacts$id) %dopar% {
makeHistoryObservedTroubles(current_clean_followups, "2021-02", i)
}
stopCluster(cl)
CodePudding user response:
Without modifying your function, this could be one approach. Some modifications may still be needed since I have no similar database to test the code
library(parallel)
If the structure of your dfs allow for an easy filtering of these contacts, that would be a smart thing to do, since it seems you're only interested in the ones showing up during a particular month. This will just consider them all:
total.length.contacts <- length(df_contacts$id)
Then here it depends on how many cores you have in your machine. If you have 20, I'd use 18 to avoid computer freezing on regular OS running processes.. If you have 2, forget about a parallel approach.
available.cores <- 18
With the following you'll just have a list of vectors with splits of your contacts ids.
batch.max.size <- ceiling(total.length.contacts/available.cores)
batch.contacts <- list()
for(hh in 1:available.cores){
if(hh!=available.cores){
batch.size <- batch.max.size
}else{
batch.size <- total.length.contacts-(batch.max.size*(available.cores-1))
}
batch.contacts[[hh]] <- df_contacts$id[(1 (batch.size*(hh-1))):(batch.size (batch.size*(hh-1)))]
}
rm(hh)
Then you create a function which has your function inside.
- It uses an index (batch.number) to access the respective item of the batch.contacts list (a fraction of your contacts' full list).
- Anything your function required in terms of libraries needs to be called inside this function, since each core works in isolation from the global environment you're working in (I was too lazy to try to deduce :)).
- I assume the tab df has elements already when you run your function. I suggest using emptying it to use just its structure.
- One way to get the resulting dfs from this function run in parallel is to save them in files (with the below, they'll be stored in the working directory, but you can write the specific path wherever you want in you computer).
.
makeHistoryObservedTroubles.in.parallel <- function(batch.number){
#### library(???)
contacts.in.batch <- batch.contacts[[batch.number]]
tab2 <- tab[c(),]
for (i in contacts.in.batch) {
tab[nrow(tab2) 1,] <- makeHistoryObservedTroubles(current_clean_followups,"2021-02",i)
}
write.csv(tab2, file=paste0("tab.",
ifelse(batch.number>9,batch.number,paste0("0",batch.number)),
".csv"))
}
With the function created, now you run it in parallel:
cl<-makeCluster(available.cores,type="SOCK")
clusterExport(cl, c("makeHistoryObservedTroubles","current_clean_followups",
"tab","batch.contacts"))
clusterApplyLB(cl, 1:available.cores,makeHistoryObservedTroubles.in.parallel)
stopCluster(cl)
gc()
Note in the vector inside clusterExport you need to include anything the function will use in terms of elements (dfs, functions, etc). Don't worry, if something is missing a rather clear error will pop up with the process stopping and you can add to the list.
Once the whole process is done, you'll just have to read.csv() the files and rbind() them.
Bonus tip: it you really have MANY elements to analize and even this way the process takes a lot of time, you may consider splitting into more batches than available cores. The clusterApplyLB function will run through the first batches first, and when one is finished (freeing one core), it will jump to the next in the list and so on. That way, if batches are not very evenly split in terms of computations required, you optimize/reduce the time-cores idle before the whole process is done.
