I have a situation where I have fit many multiple models, all with different outcome variables, and I need to score (generate predictions for) anywhere between 5M and 180M additional records at a time. When I have 20+ models and that many predictions, the scoring stage is slow and I'm interested in running either predict or augment in parallel to speed things up. Here's a reproducible example which is similar:
Skip the setup!
library(tidyverse)
library(nycflights13)
library(tidymodels)
library(workflows)
flight_data <- flights %>%
mutate(
# Convert the arrival delay to a factor
arr_delay = ifelse(arr_delay >= 30, "late", "on_time"),
arr_delay = factor(arr_delay),
is_delta = ifelse(carrier %in% "DL", "delta", "not_delta"),
is_delta = factor(is_delta),
# We will use the date (not date-time) in the recipe below
date = lubridate::as_date(time_hour)
) %>%
# Include the weather data
inner_join(weather, by = c("origin", "time_hour")) %>%
# Only retain the specific columns we will use
select(arr_delay, is_delta, dep_time, flight, origin, dest, air_time, distance,
carrier, date, time_hour, wind_dir, wind_speed) %>%
# Exclude missing data
na.omit()
set.seed(222)
# Skim off two ~6,000k samples for training two models
data_splits <- flight_data %>%
bind_rows(flight_data, .id = "group_id") %>%
group_by(group_id) %>%
group_split() %>%
map2(.y = list(c("arr_delay", "distance", "carrier", "wind_speed", "flight"),
c("is_delta", "distance", "dest", "wind_speed", "flight")),
.f = ~select(.x, all_of(.y))) %>%
map2(.y = c("arr_delay", "is_delta"),
.f = ~initial_split(.x, prop = 0.02, strata = all_of(.y)))
train_data <- map(data_splits, ~training(.x))
recipe_lst <- train_data %>%
map2(.y = c("arr_delay", "is_delta"),
.f = ~recipes::recipe(.x) %>%
recipes::update_role(everything(), new_role = "predictor") %>%
# Specify which column(s) is/are the targets/outcomes
recipes::update_role(all_of(.y), new_role = "outcome") %>%
recipes::update_role(flight, new_role = "id") %>%
# Turn strings into factor variables
recipes::step_string2factor(recipes::all_nominal_predictors(), -recipes::has_role(match = "id")) %>%
recipes::step_novel(recipes::all_nominal_predictors()) %>%
recipes::step_other(recipes::all_nominal_predictors(), threshold = 0.03)
)
rf_spec <- rand_forest(trees = 500) %>%
set_engine("ranger", importance = "impurity", num.threads = 4) %>%
set_mode("classification")
workflow_fits <- recipe_lst %>%
map(~workflows::workflow(spec = rf_spec) %>% add_recipe(.x)) %>%
map2(train_data, ~fit(.x, data = .y))
# Stack the original flights data to simulate scoring millions of rows
flight_data_larger <- flight_data %>%
bind_rows(flight_data, flight_data, flight_data, flight_data, flight_data)
# Score models sequentially (my current process)
tictoc::tic()
scores <- workflow_fits %>%
map(~extract_fit_parsnip(.x)) %>%
map(~generics::augment(.x, new_data = flight_data_larger, type = "prob")) %>%
map(~select(.x, flight, .pred_class))
tictoc::toc() # ~372 seconds
# Try scoring in parallel over the models with one data frame
library(furrr)
plan(multisession, workers = 3)
tictoc::tic()
scores_parallel <- workflow_fits %>%
furrr::future_map(
~workflows::extract_fit_parsnip(.x) %>%
generics::augment(new_data = flight_data_larger, type = "prob"),
.options = furrr_options(seed = TRUE))
tictoc::toc()
# Attempt 2, break the data frame into chunks
large_data_by_group <- flight_data_larger %>%
group_by(origin) %>%
group_split() %>%
purrr::cross2(.y = workflow_fits)
tictoc::tic()
scores_parallel <- large_data_by_group %>%
furrr::future_map(.f = function(x){
x %>%
magrittr::extract2(2) %>%
workflows::extract_fit_parsnip() %>%
generics::augment(new_data = magrittr::extract2(x, 1),
type = "prob")
}, .options = furrr_options(seed = TRUE))
tictoc::toc() # ~404 seconds
I'm not sure why either attempt is not faster here. Maybe someone else with more future/parallel experience might be able to suggest a better way to frame the solution. In my real data I actually do see a speed increase with the first parallel construction, but it is only about a 1/3 faster than the sequential solution, which made me want to try splitting the data into chunks. The chunked solution on my real data was actually slower.
Thoughts anyone?
(tagging @davis in hopes he sees this too)