Un peu sur le calcul parallèle en R

La publication est très courte. Beaucoup de gens pensent que le calcul parallèle dans R est très difficile et ne s'applique pas à leurs tâches actuelles.





Oui et non. Si vous n'allez pas délibérément dans la théorie, le matériel et toutes sortes de détails, vous pouvez dessiner "3 et 1/2" de recettes presque universelles. Ces exemples sont volontairement similaires à des tâches productives, et non à un couple émasculé de lignes synthétiques.





C'est la continuation d'une série de publications précédentes .





Forfaits d'occasion

Chargement des paquets
library(tidyverse)
library(magrittr)
library(stringi)
library(glue)

library(dqrng)

library(iterators)
library(future)
library(foreach)
library(doFuture)

library(tictoc)
library(futile.logger)
library(lgr) #      `lgr`

library(hrbrthemes)
      
      



Modèles de parallélisation

Schéma 1. Parallélisation des calculs tidyverse

Situation. Il existe un script contenant de nombreux pipelines pour tidyverse



.





Exemple de tâche. Calculons la moyenne de la somme des carrés des nombres. Pour améliorer l'efficacité du calcul parallèle, il est important de réduire la quantité de transfert de données entre les threads. Nous utilisons le package furrr .





Pipeline `tidyverse`
registerDoFuture()
# future::plan(multiprocess)
workers <- parallel::detectCores() - 1
future::plan(multisession, workers = workers)

num_row <- 1:10^6

ff_seq <- function(x) x^2

ff_par <- function(x) mean(x^2)

tic(" ")
lst1 <- num_row %>%
  purrr::map_dbl(ff_seq) %>%
  mean()
toc()

tic(" ,  1")
lst2 <- num_row %>%
  furrr::future_map_dbl(ff_seq) %>%
  mean()
toc() 

tic(" ,  2")
lst2 <- num_row %>%
  split(cut(seq_along(.), workers, labels = FALSE)) %>%
  furrr::future_map_dbl(ff_par) %>%
  mean()
toc()
      
      



Naturellement, le résultat dépend de la plate-forme matérielle et du système d'exploitation sur lesquels tout fonctionne. Lors d'un test, j'ai cette disposition:





 : 7.23 sec elapsed
 ,  1: 3.43 sec elapsed
 ,  2: 0.64 sec elapsed
      
      



Windows et Linux sont assez différents dans la façon dont ils se mettent en parallèle. Linux en production est fortement préféré à Windows.





Modèle 2. Parallélisation manuelle locale

. . , . , %<-%



.





#  ,  20   10^5 
nn <- 10^5
tic("Generating sample data.frame")
df <- 100 %>%
  # stri_rand_strings(length = 10, pattern = "[a-z]") %>%
  sample(10^4:10^5, .) %>%
  sample(20 * nn, replace = TRUE) %>%
  matrix(byrow = TRUE, ncol = 20) %>%
  as_tibble(.name_repair = "universal") %>%
  mutate(user_id = as.character(sample(1:as.integer(nn/10), n(), replace = TRUE))) %>%
  #   
  mutate(ver = sample(1:20, n(), replace = TRUE)) %>%
  select(user_id, ver, everything())
toc()

#       
demo_fpath <- here::here("temp", "demo_data.xlsx")
openxlsx::write.xlsx(df, demo_fpath, asTable = TRUE)
      
      



plan(multisession, workers = parallel::detectCores() - 2)
# plan(sequential)
# https://github.com/HenrikBengtsson/future

# ,     2
tic("   ")
tic("  ")
res_lst <- list()
for (j in 1:6) {
  res_lst[[j]] <- { readxl::read_excel(demo_fpath) %>% head(5)}
}
toc()
seq_df <- bind_rows(res_lst)
toc()


tic("   ")
tic("  ")
df1 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df2 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df3 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df4 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df5 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df6 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
toc()
par_df <- bind_rows(df1, df2, df3, df4, df5, df6)
toc()

all_equal(seq_df, par_df)
      
      



. . :





   : 46.23 sec elapsed
   : 37.82 sec elapsed
      
      



3.

. , , .





.

. $C_n^k$



. .





.




#  
flog_logname <- here::here("log", "job_futile.log")
lgr_logname <- here::here("log", "job_lgr.log")

initLogging <- function(log_file){

  lgr <- get_logger_glue("logger")

  lgr$set_propagate(FALSE)
  lgr$set_threshold("all")
  lgr$set_appenders(list(
    console = AppenderConsole$new(
      threshold = "info"
    ),
    file = AppenderFile$new(
      file = log_file,
      threshold = "all"
    )
  ))

  lgr  
}

invisible(flog.appender(appender.tee(flog_logname)))
invisible(flog.threshold(INFO))
lgr <- initLogging(lgr_logname)
      
      



"Start batch processing" %T>%
  flog.info() %T>%
  lgr$info()

#   
# https://github.com/HenrikBengtsson/doFuture
# https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html
registerDoFuture()
# future::plan(multiprocess)
future::plan(multisession, workers = parallel::detectCores())
# future::plan(sequential)
# plan(future.callr::callr)

tic("Batch processing")
start_time <- Sys.time()

foreach(it = iter(jobs_tbl, by = "row"), .export = c("start_time"), 
        # .packages = 'futile.logger',
        .verbose = FALSE, .inorder = FALSE, .errorhandling = "remove") %dopar% {

          start <- Sys.time() - start_time

          #    
          flog.appender(appender.tee(flog_logname))
          lgr <- initLogging(lgr_logname)

          res <- arrangements::npermutations(k = it$k, n = it$n, bigz = TRUE)

          # https://www.jottr.org/2020/11/06/future-1.20.1-the-future-just-got-a-bit-brighter/
          message("     message from thread")

          glue("Step {it$idx_str} finished. RAM used {capture.output(pryr::mem_used())}.",
               "PID: {Sys.getpid()}",
               "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->",
               .sep = " ") %T>%
            flog.info() %T>%
            lgr$info()

          #   
          return(list(pid = Sys.getpid(), start = start, finish = Sys.time() - start_time))
        } -> output_lst
flog.info("Foreach finished")

checkmate::assertList(output_lst, any.missing = FALSE, null.ok = FALSE, min.len = 1)
output_tbl <- dplyr::bind_rows(output_lst)
# rm(output_lst)

#    --------------
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc()))
      
      



() () . windows.





.
#      
output_tbl %>%
  mutate_at("pid", as.factor) %>%
  mutate_at(vars(start, finish), as.numeric) %>%
  ggplot(aes(start, pid, colour = pid)) +
  geom_point(size = 3, alpha = .7) +
  geom_point(aes(x=finish), shape = 4, size = 3, colour = "black") +
  geom_vline(aes(xintercept = start, colour = pid), lty = "dashed", alpha = 0.7) +
  ggthemes::scale_fill_tableau("Tableau 10") +
  theme_ipsum_rc() +
  xlim(c(0, 5))
      
      



, , , . , « »:





  1. (worker) . (, , …), . () .





  2. , core - 1, . , reduce , . .





  3. .





  4. , . , , ( , , API ..). .





  5. , . .





  6. Pour un certain nombre de tâches liées à de longues requêtes synchrones provenant de systèmes externes (les représentants typiques sont l'API REST / la mise au rebut Web), vous pouvez créer beaucoup plus de calculatrices que les cœurs disponibles. Ils se bloquent encore la plupart du temps en mode veille. Peut être exécuté en tant que processus OS séparés en configurant le backend approprié. registerDoFuture();



    plan(future.callr::callr).



    Il s'agit de la moitié restante de la recette.





Publication précédente - "Nuances d'exploitation des solutions R dans un environnement d'entreprise?" ...








All Articles