mcexit can only be used in a child process

I would like to race two threads in order to create a tibble with column names p1,p2,algorithm_name,cpu_time,killed. Sometimes algorithm1 takes 10s while algorithm2 takes 1s or vice versa. It is enough to know that the slower one takes >2x time. The remaining 8s would be better spent on other points in the p1,p2 space.
It looks like I could use mcfork, mcexit, readChildren and mckill to do it. But I'm having trouble starting a single thread:

p <- parallel:::mcfork()
if ("childProcess" %in% class(p)) {
        parallel:::mcexit(send = 5)
}
parallel:::readChildren()

On my R 4.3.1 ubuntu 20.04 the above code has an error > Error in parallel:::mcexit(send = 5) : 'mcexit' can only be used in a child process. I'm already trying to do that.

Edit: I have a solution with mcparallel, mccollect, mckill

# race(fs, x) races the functions in fs on the data in x. fs can either be a
# list of functions with names, or a list of strings which must be the names of
# functions. In other words, do.call(fs[i], x)
#
# Losers of the race are disqualified if they take 1+disqualifiation times
# the winner's time.
#
# polling_period is the time in seconds to wait between checking for the winner.
#
# the result is a tibble with columns
#
# >  # A tibble: 2 × 4
# >    f                  elapsed crashed killed
# >    <chr>                <dbl> <lgl>   <lgl> 
# >  1 closest_pairs_hash  0.0490 FALSE   FALSE 
# >  2 closest_pairs_emst  0.0588 FALSE   TRUE
race <- function(fs, x, polling_period=0.05, disqualification = 0.2) {
 do.call(assert_that,c(is.character(fs) || is.list(fs),
                       is.list(x),
                       imap(fs, ~ { is.character(.y) && (is.function(.x) || exists(.x, mode="function")) ||
                                    is.character(.x) && exists(.x, mode="function") } )))
 # or fs could be a list of functions, and the names(fs) is used for the f column
 ps <- imap(fs, ~ {
        f_str <- if (is.character(.x)) .x else .y
        mcparallel({
                tic()
                crashed <- tryCatch( { do.call(.x, x); FALSE},
                        error = function(e) TRUE)
                list(f= f_str,
                     elapsed = with(toc(), if (!crashed) toc - tic else NA),
                     crashed=crashed)
         }, f_str)
 })
 complete <- list()
 while (TRUE) {
         # check each uncompleted process in ps once every every polling_period
         if (is.null(ps)) break
         complete1 <- map(ps, ~ mccollect(., wait=F))
         nulls <- map_lgl(complete1, is.null)
         complete <- c(complete, complete1[!nulls])
         ps <- ps[nulls]
         crashes <- complete1[!nulls] %>% map_lgl(~ .[[1]]$crashed)
         if (any(!crashes)) break
         Sys.sleep(polling_period)
 }
 # either all processes crashed or one completed
 dt <- map_dbl(complete, pluck, 1, "elapsed", .default=Inf) %>% min(na.rm=TRUE)
 complete2 <- mccollect(ps, wait=FALSE, timeout = dt)
 ps <- if (is.null(complete2)) ps  else ps[map_lgl(complete2, is.null)]
 walk(ps, parallel:::mckill)
 t_max <- (1+disqualification) * dt
 rbind(map_dfr(complete, pluck, 1) %>% mutate(killed = FALSE),
       bind_rows(complete2) %>% mutate(killed=FALSE),
       map_dfr(ps, ~ tibble( f = .$name,
                             elapsed = NA,
                             crashed = FALSE,
                             killed = TRUE ))) %>%
        # replace NA in elapsed with the maximum time
        mutate_at("elapsed", ~ ifelse(is.na(.), t_max, .))
}

This topic was automatically closed 42 days after the last reply. New replies are no longer allowed.

If you have a query related to it or one of the replies, start a new topic and refer back with a link.