I would like to run a script using RStudio's jobs feature.
Here's my script:
library(tidyverse)
library(DBI)
library(scales)
# globals
## kept in global memory and used by all scripts that reference them
game_name <- "fungame"
day_from <- 7 # train from
day_to <- 60 # predict to
source("globals.R") # format the globals based on above inputs
# S3 send loop
dates <- seq(as.Date("2020-01-01"), as.Date("2020-01-31"), by = 1)
for(d in seq_along(dates)) {
# populate S3
cohort_date <- dates[d] %>% toString()
source("predict.R") # will return prediction_df with predictions for the above cohort
source("send_to_s3.R") # send the predictions to s3
}
Error in paste0("spend_day_", day_from) : object 'day_from' not found
Calls: sourceWithProgress ... eval -> source -> withVisible -> eval -> eval -> paste0
Execution halted
I think it's to do with scoping and the fact I'm sourcing files. The code snippet called out in the error message, paste0("spend_day_", day_from)
comes from the top sourced file, globals.R. This script runs when I run it right in the console as opposed to a job.
After some searching I added local = T
to all the scripts that I source and that took me further. Some of the sourced scripts in turn source another script, I added local = T
to those too.
But Now I've hit a wall with he following error:
Error: Unknown column `from_day_7_to_day_60`
<error/rlang_error>
Unknown column `from_day_7_to_day_60`
Backtrace:
█
1. ├─global::sourceWithProgress(...)
2. │ └─base::eval(statements[[idx]], envir = sourceEnv)
3. │ └─base::eval(statements[[idx]], envir = sourceEnv)
4. │ ├─base::source("send_to_s3.R", local = T) run.R:20:2
5. │ │ ├─base::withVisible(eval(ei, envir))
6. │ │ └─base::eval(ei, envir)
7. │ │ └─base::eval(ei, envir)
8. │ └─`%>%`(...)
9. │ ├─base::withVisible(eval(quote(`_fseq`(`_lhs`)), env, env))
10. │ └─base::eval(quote(`_fseq`(`_lhs`)), env, env)
11. │ └─base::eval(quote(`_fseq`(`_lhs`)), env, env)
12. │ └─`_fseq`(`_lhs`)
13. │ └─ma
Execution halted
I tried commenting out parts of the loop to identify the specific file causing the problem. Commenting out source("send_to_s3.R")
seems to allow the loop to keep working (But with missing the important process within send_to_s3.R
).
Here's what send_to_s3.R looks like:
con_s3 <- DBI::dbConnect(noctua::athena(), s3_staging_dir = paste0("s3://ourco-emr/tables/revenue_predictions.", game_name))
con_athena <- DBI::dbConnect(odbc(), "Athena")
yr <- year(cohort_date)
mt <- format(cohort_date %>% as.Date(), '%m')
d <- format(cohort_date %>% as.Date(), '%d')
if(day_to == 30) {
# send data
Sys.sleep(2)
dbWriteTable(conn = con_s3,
name = paste0("revenue_predictions.", game_name),
value = prediction_df,
append = T,
file.type = "parquet",
partition = c(year = yr, month = mt, day = d),
s3.location = paste0("s3://ourco-emr/tables/revenue_predictions.", game_name)
)
} else {
## download existing data
cohort_query <- read_lines("sql_queries/cohorts_data.sql") %>%
glue_collapse(sep = "\n") %>%
glue_sql(.con = con_athena)
cohort_data <- dbGetQuery(con_athena, cohort_query)
## join onto local prediction_df
cohort_data <- cohort_data %>%
select_at(vars(-c(paste0("from_day_", day_from, "_to_day_", day_to)))) %>%
inner_join(prediction_df, by = "s") %>% # should be exact same s'
select_at(vars(s, from_day_7_to_day_30, from_day_7_to_day_60, from_day_7_to_day_90, from_day_7_to_day_120)) # remove partitions, those will be re added below
## push the new df (even though setting says append, this seems to update as desired)
Sys.sleep(2)
dbWriteTable(conn = con_s3,
name = paste0("revenue_predictions.", game_name),
value = cohort_data,
append = T,
overwrite = F,
file.type = "parquet",
partition = c(year = yr, month = mt, day = d),
s3.location = paste0("s3://ourco-emr/tables/revenue_predictions.", game_name))
}
Why would I hit this error message when running in a job, where this doesn't happen when running in my regular console?
In case it's relevant, here's my directory structure:
The only file referenced in the problem script send_to_s3.R
is read_lines("sql_queries/cohorts_data.sql")
Any ideas about why I'm hitting an error while trying to run run.R in a job?