I'm seeing an issue where the dplyr mutate step is causing a foreach loop withRestarts seems to error without error.
If I run the dplyr mutate out of the foreach %dopar% in %do% it works as designed.
library(magrittr)
library(dplyr)
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(tibble)
library(foreach)
library(future)
library(stringr)
tmp_tb <- tibble(Id = c(1:5),
Sample_color = c("green", "blue", "yellow", "orange", "grey"),
Sample_text = c("\n Test1", "Test2", "test 3", "test 4", "test 5"))
tmp_fun <- function(loop_n, df) {
print(paste0(loop_n, "before withCallingHndlears\n"))
status_tb <- tibble(Foreach_loop = as.character(),
For_loop = as.character(),
Status = as.character())
for (i in seq_len(nrow(df))) {
withCallingHandlers({
withRestarts({
if (i == 2) {
tmp_status_tb <- tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Good")
status_tb <- rbind(status_tb, tmp_status_tb)
} else if (i == 3) {
tb_test_df <- df %>%
mutate(across(.cols = all_of(names(df)),
.fns = ~ (str_to_upper(.))))
tmp_status_tb <- tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Good")
status_tb <- rbind(status_tb, tmp_status_tb)
} else if (i == 4) {
tb_test_df <- df %>%
mutate(across(.cols = all_of(names(df)),
.fns = ~ (str_replace_all(string = .,
pattern = "[[:cntrl:]]",
replacement = " " ))))
tmp_status_tb <- tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Good")
status_tb <- rbind(status_tb, tmp_status_tb)
} else {
stop("this is an error!")
}
}, muffleStop = function() {
message("'stop' muffled")
tmp_status_tb <- tibble(Foreach_loop = loop_n,
For_loop = i,
Status = "Failure")
status_tb <- rbind(status_tb, tmp_status_tb)
assign(x = "status_tb", value = status_tb, envir = parent.frame(n = 4))
})
},
error = function(cond) {
print(cond$message)
invokeRestart("muffleStop")
}
)
}
print(paste0(loop_n, "after withCallingHndlears\n"))
return(status_tb)
}
doFuture::registerDoFuture()
numWorkers <- 2
future::plan(future::multisession, workers = numWorkers, gc = FALSE, earlySignal = TRUE)
status_ls <- foreach::foreach(out_i = seq_along(1:2), .verbose = FALSE, .errorhandling = "pass") %dopar% {
tmp_fun(loop_n = out_i, df = tmp_tb)
}
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "1before withCallingHndlears\n"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "1after withCallingHndlears\n"
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "2before withCallingHndlears\n"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "2after withCallingHndlears\n"
future::plan("default")
output_df <- bind_rows(status_ls)
output_df
#> # A tibble: 10 x 3
#> Foreach_loop For_loop Status
#> <int> <int> <chr>
#> 1 1 1 Failure
#> 2 1 2 Good
#> 3 1 3 Failure
#> 4 1 4 Failure
#> 5 1 5 Failure
#> 6 2 1 Failure
#> 7 2 2 Good
#> 8 2 3 Failure
#> 9 2 4 Failure
#> 10 2 5 Failure
tmp_tb %>%
mutate(across(.cols = all_of(names(tmp_tb)),
.fns = ~ (str_to_upper(.))))
#> # A tibble: 5 x 3
#> Id Sample_color Sample_text
#> <chr> <chr> <chr>
#> 1 1 GREEN "\n TEST1"
#> 2 2 BLUE "TEST2"
#> 3 3 YELLOW "TEST 3"
#> 4 4 ORANGE "TEST 4"
#> 5 5 GREY "TEST 5"
tmp_tb %>%
mutate(across(.cols = all_of(names(tmp_tb)),
.fns = ~ (str_replace_all(string = .,
pattern = "[[:cntrl:]]",
replacement = " " ))))
#> # A tibble: 5 x 3
#> Id Sample_color Sample_text
#> <chr> <chr> <chr>
#> 1 1 green " Test1"
#> 2 2 blue "Test2"
#> 3 3 yellow "test 3"
#> 4 4 orange "test 4"
#> 5 5 grey "test 5"
SesssionInfo:
sessionInfo()
#> R version 4.0.2 (2020-06-22)
#> Platform: x86_64-pc-linux-gnu (64-bit)
#> Running under: Red Hat Enterprise Linux
#>
#> Matrix products: default
#> BLAS: /opt/R/R_4.0.2/lib64/R/lib/libRblas.so
#> LAPACK: /opt/R/R_4.0.2/lib64/R/lib/libRlapack.so
#>
#> locale:
#> [1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C
#> [3] LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8
#> [5] LC_MONETARY=en_US.UTF-8 LC_MESSAGES=en_US.UTF-8
#> [7] LC_PAPER=en_US.UTF-8 LC_NAME=C
#> [9] LC_ADDRESS=C LC_TELEPHONE=C
#> [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C
#>
#> attached base packages:
#> [1] stats graphics grDevices utils datasets methods base
#>
#> loaded via a namespace (and not attached):
#> [1] digest_0.6.27 withr_2.4.1 magrittr_2.0.1 reprex_2.0.0
#> [5] evaluate_0.14 highr_0.8 stringi_1.5.3 rlang_0.4.10
#> [9] cli_2.4.0 rstudioapi_0.13 fs_1.5.0 rmarkdown_2.7
#> [13] tools_4.0.2 stringr_1.4.0 glue_1.4.2 xfun_0.22
#> [17] yaml_2.2.1 compiler_4.0.2 htmltools_0.5.1.1 knitr_1.31