I would like to race two threads in order to create a tibble with column names p1,p2,algorithm_name,cpu_time,killed
. Sometimes algorithm1 takes 10s while algorithm2 takes 1s or vice versa. It is enough to know that the slower one takes >2x time. The remaining 8s would be better spent on other points in the p1,p2 space.
It looks like I could use mcfork, mcexit, readChildren and mckill to do it. But I'm having trouble starting a single thread:
p <- parallel:::mcfork()
if ("childProcess" %in% class(p)) {
parallel:::mcexit(send = 5)
}
parallel:::readChildren()
On my R 4.3.1 ubuntu 20.04 the above code has an error > Error in parallel:::mcexit(send = 5) : 'mcexit' can only be used in a child process
. I'm already trying to do that.
Edit: I have a solution with mcparallel, mccollect, mckill
# race(fs, x) races the functions in fs on the data in x. fs can either be a
# list of functions with names, or a list of strings which must be the names of
# functions. In other words, do.call(fs[i], x)
#
# Losers of the race are disqualified if they take 1+disqualifiation times
# the winner's time.
#
# polling_period is the time in seconds to wait between checking for the winner.
#
# the result is a tibble with columns
#
# > # A tibble: 2 × 4
# > f elapsed crashed killed
# > <chr> <dbl> <lgl> <lgl>
# > 1 closest_pairs_hash 0.0490 FALSE FALSE
# > 2 closest_pairs_emst 0.0588 FALSE TRUE
race <- function(fs, x, polling_period=0.05, disqualification = 0.2) {
do.call(assert_that,c(is.character(fs) || is.list(fs),
is.list(x),
imap(fs, ~ { is.character(.y) && (is.function(.x) || exists(.x, mode="function")) ||
is.character(.x) && exists(.x, mode="function") } )))
# or fs could be a list of functions, and the names(fs) is used for the f column
ps <- imap(fs, ~ {
f_str <- if (is.character(.x)) .x else .y
mcparallel({
tic()
crashed <- tryCatch( { do.call(.x, x); FALSE},
error = function(e) TRUE)
list(f= f_str,
elapsed = with(toc(), if (!crashed) toc - tic else NA),
crashed=crashed)
}, f_str)
})
complete <- list()
while (TRUE) {
# check each uncompleted process in ps once every every polling_period
if (is.null(ps)) break
complete1 <- map(ps, ~ mccollect(., wait=F))
nulls <- map_lgl(complete1, is.null)
complete <- c(complete, complete1[!nulls])
ps <- ps[nulls]
crashes <- complete1[!nulls] %>% map_lgl(~ .[[1]]$crashed)
if (any(!crashes)) break
Sys.sleep(polling_period)
}
# either all processes crashed or one completed
dt <- map_dbl(complete, pluck, 1, "elapsed", .default=Inf) %>% min(na.rm=TRUE)
complete2 <- mccollect(ps, wait=FALSE, timeout = dt)
ps <- if (is.null(complete2)) ps else ps[map_lgl(complete2, is.null)]
walk(ps, parallel:::mckill)
t_max <- (1+disqualification) * dt
rbind(map_dfr(complete, pluck, 1) %>% mutate(killed = FALSE),
bind_rows(complete2) %>% mutate(killed=FALSE),
map_dfr(ps, ~ tibble( f = .$name,
elapsed = NA,
crashed = FALSE,
killed = TRUE ))) %>%
# replace NA in elapsed with the maximum time
mutate_at("elapsed", ~ ifelse(is.na(.), t_max, .))
}