I have an analysis where I've used purrr::map* to repeat the following pattern for a series of dates.
* Well, really furrr::map, but I want to discuss basic principles here.
Query database for data occurring on Day X.
Crunch some stats for Day X.
Repeat for all dates and return the calculations by day.
Since each day's data is quite large and I'm doing this for a full year's worth of dates, the job takes a long while (8-10 hours depending on EC2 size). I'd like to use sparklyr to leverage spark to speed up the computations, which I'll explain below with code, but as I wrote it out I started to wonder if there was a better practice for this pattern.
library(tidyverse)
library(sparklyr)
analysis_calendar <- analysis_start_date %>%
seq.Date(analysis_end_date, by = "day")
config <- spark_config()
config$`sparklyr.shell.driver-class-path` <- "path/to/driver/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
sc <- spark_connect(master = "local", config = config)
jdbc_read_url <- config %>%
glue::glue_data("jdbc:redshift://{host_url}:{port}/{db_name}") %>%
as.character()
calculations <- analysis_calendar %>%
map(~{
ith_query <- glue::glue_sql(
"SELECT id, date, col1, col2, col3, col4
FROM {`schema_code`}.table_5min_data
WHERE date = {.x}",
.con = DBI::ANSI()) %>% # Use ANSI connection for proper date quotes
as.character()
data_sdf <- sc %>%
spark_read_jdbc(name = "5min_data",
options = list(url = jdbc_read_url,
user = creds$user_name,
password = creds$password,
query = ith_query,
driver = "com.amazon.redshift.jdbc42.Driver"),
memory = FALSE,
overwrite = TRUE)
# Just a sample computation
summarise(data_sdf, average = mean(col1, na.rm = TRUE)) %>% collect()
})
The code above really boils down to purrr::map(R list, ~Spark calls). It seems to work fine, but something about this construction just feels wrong to me. Is there a better way to use sparklyr/Spark to handle the repeated database querying and performing tasks on all of those data frames?
I'm still looking for general advice here, but I've also since discovered that sparklyr supports being a backend for foreach, which is another option for iterating my code over the dates. I'm going to try it and see how that works out.
@nirgrahamuk in this case, the data frame needs to be called from a jdbc connection, so spark apply won't work.
@Col Thanks, and forgive the simple example, I am really doing something more complex, I left the example basic to focus the attention on parallelizing the queries.
If you are able to refactor to only SQL, there are a lot of benefits. For a given year, you would be scanning through your data 365 times to filter. Even if you have an index on your date field, there's still a lot of filtering instead of addressing what it really is - a grouping problem.
FWIW, here's a simple example. Grouping is about 30x faster than filtering and building an answer.
library(dplyr, warn.conflicts = FALSE)
set.seed(123L)
n = 1e7L
n_days = 365L
DF = data.frame(ID = sample(n_days, n, replace = TRUE),
V1 = seq(n))
head(DF)
#> ID V1
#> 1 179 1
#> 2 14 2
#> 3 195 3
#> 4 306 4
#> 5 118 5
#> 6 299 6
system.time(
dplyr::summarize(dplyr::group_by(DF, ID), res = mean(V1))
)
#> `summarise()` ungrouping output (override with `.groups` argument)
#> user system elapsed
#> 0.67 0.05 0.72
ID_vec = DF[["ID"]]
V1_vec = DF[["V1"]]
system.time({
uni_ID = unique(ID_vec)
ans = numeric(length(uni_ID))
for (i in seq_along(uni_ID)) {
ans[i] = mean(V1_vec[ID_vec == ID_vec[uni_ID[i]]])
}
})
#> user system elapsed
#> 19.88 1.34 21.22