Running a job while sourcing files leads to variables not being found (scoping issue?)

I would like to run a script using RStudio's jobs feature.

Here's my script:

library(tidyverse)
library(DBI)
library(scales)


# globals
## kept in global memory and used by all scripts that reference them
game_name <- "fungame"
day_from <- 7 # train from
day_to <- 60 # predict to
source("globals.R") # format the globals based on above inputs


# S3 send loop
dates <- seq(as.Date("2020-01-01"), as.Date("2020-01-31"), by = 1)
for(d in seq_along(dates)) {
  
  # populate S3
  cohort_date <- dates[d] %>% toString()
  source("predict.R") # will return prediction_df with predictions for the above cohort
  source("send_to_s3.R") # send the predictions to s3
}

Error in paste0("spend_day_", day_from) : object 'day_from' not found
Calls: sourceWithProgress ... eval -> source -> withVisible -> eval -> eval -> paste0
Execution halted

I think it's to do with scoping and the fact I'm sourcing files. The code snippet called out in the error message, paste0("spend_day_", day_from) comes from the top sourced file, globals.R. This script runs when I run it right in the console as opposed to a job.

After some searching I added local = T to all the scripts that I source and that took me further. Some of the sourced scripts in turn source another script, I added local = T to those too.

But Now I've hit a wall with he following error:

Error: Unknown column `from_day_7_to_day_60` 
<error/rlang_error>
Unknown column `from_day_7_to_day_60` 
Backtrace:
     █
  1. ├─global::sourceWithProgress(...)
  2. │ └─base::eval(statements[[idx]], envir = sourceEnv)
  3. │   └─base::eval(statements[[idx]], envir = sourceEnv)
  4. │     ├─base::source("send_to_s3.R", local = T) run.R:20:2
  5. │     │ ├─base::withVisible(eval(ei, envir))
  6. │     │ └─base::eval(ei, envir)
  7. │     │   └─base::eval(ei, envir)
  8. │     └─`%>%`(...)
  9. │       ├─base::withVisible(eval(quote(`_fseq`(`_lhs`)), env, env))
 10. │       └─base::eval(quote(`_fseq`(`_lhs`)), env, env)
 11. │         └─base::eval(quote(`_fseq`(`_lhs`)), env, env)
 12. │           └─`_fseq`(`_lhs`)
 13. │             └─ma
Execution halted

I tried commenting out parts of the loop to identify the specific file causing the problem. Commenting out source("send_to_s3.R") seems to allow the loop to keep working (But with missing the important process within send_to_s3.R).

Here's what send_to_s3.R looks like:


con_s3 <- DBI::dbConnect(noctua::athena(), s3_staging_dir = paste0("s3://ourco-emr/tables/revenue_predictions.", game_name))
con_athena <- DBI::dbConnect(odbc(), "Athena")

yr <- year(cohort_date)
mt <- format(cohort_date %>% as.Date(), '%m')
d <- format(cohort_date %>% as.Date(), '%d')

if(day_to == 30) {

# send data
Sys.sleep(2)
dbWriteTable(conn = con_s3,
             name = paste0("revenue_predictions.", game_name),
             value = prediction_df,
             append = T,
             file.type = "parquet",
             partition = c(year = yr, month = mt, day = d),
             s3.location = paste0("s3://ourco-emr/tables/revenue_predictions.", game_name)
             )

} else { 
  
  ## download existing data
  cohort_query <- read_lines("sql_queries/cohorts_data.sql") %>% 
    glue_collapse(sep = "\n") %>% 
    glue_sql(.con = con_athena)
  cohort_data <- dbGetQuery(con_athena, cohort_query)
  
  ## join onto local prediction_df
  cohort_data <- cohort_data %>% 
    select_at(vars(-c(paste0("from_day_", day_from, "_to_day_", day_to)))) %>% 
    inner_join(prediction_df, by = "s") %>% # should be exact same s'
    select_at(vars(s, from_day_7_to_day_30, from_day_7_to_day_60, from_day_7_to_day_90, from_day_7_to_day_120)) # remove partitions, those will be re added below
  
  ## push the new df (even though setting says append, this seems to update as desired)
  Sys.sleep(2)
  dbWriteTable(conn = con_s3,
               name = paste0("revenue_predictions.", game_name),
               value = cohort_data,
               append = T,
               overwrite = F,
               file.type = "parquet",
               partition = c(year = yr, month = mt, day = d),
               s3.location = paste0("s3://ourco-emr/tables/revenue_predictions.", game_name))

}

Why would I hit this error message when running in a job, where this doesn't happen when running in my regular console?

In case it's relevant, here's my directory structure:

The only file referenced in the problem script send_to_s3.R is read_lines("sql_queries/cohorts_data.sql")

Any ideas about why I'm hitting an error while trying to run run.R in a job?

Hi @dougfir, could you boil down the logic of your inter-file calls so they don't refer to files and objects folks don't have access to?

Hi Dromano, it looks like the problem file is send_to_s3.R which I've pasted above. This file in turns readsLines from sql_queries/cohorts_data.sql which looks like this:

-- !preview conn=con_athena

/*gets data for a given game cohort*/
select * 
from {rlang::parse_exprs(glue('revenue_predictions.{game_name}'))}
where year || '-' || month || '-' || day = {cohort_date}

I'm really not sure what I'm missing. I've tried to include any files that might be relevant. Since I suspect a scoping issue I was hoping someone might read my post as sounding familiar and have some advice?

From your code, it looks like the error is triggered by the select_at() call following the inner join with prediction_df. My guess is prediction_df isn't visible, which is why I was curious about your inter-file calling structure: Setting local = t may not be enough if there's a source() call within a sourced file.

Hi Dromano. Yes, indeed one of the files being sourced in turn sources another file. In the loop I source predict.R which looks like this:

# libraries & source
library(tidyverse)
library(lubridate)
library(foreach)
library(doParallel) # includes package just parallel
library(scales)
library(kableExtra)
library(rmarkdown)
library(dbplyr)
library(DBI)
library(odbc)
library(rlang)
library(glue)
source("functions/prediction_functions.R", local = T)

# source query and get dataframe
con <- dbConnect(odbc(), "Athena")
select <- dplyr::select

# globals set in run.R
directory <- paste0("/home/rstudio-doug/analysis/radhoc/revenue_model/models/", game_name)
files <- list.files(directory, pattern = paste0("day_", day_from, "_to_day_", day_to, ".rds$"), full.names = TRUE)
model <- files[which.max(file.info(files)$ctime)] %>% readRDS()

# pull sql data for prediction
prediction_query <- read_lines("sql_queries/prediction_data_query.sql") %>% 
  glue_collapse(sep = "\n") %>% 
  glue_sql(.con = con)
prediction_data_raw <- dbGetQuery(con, prediction_query)

# preprocessing
prediction_data <- prediction_data_raw %>% 
  mutate(install_dt = ymd(install_dt),
         publisher_name = str_replace_na(publisher_name, "Unknown"),
         usa = as.numeric(usa)) %>%
  mutate(!!sessions_day_from := as.numeric(!! sym(sessions_day_from)),
         !!sum_session_time_day_from := as.numeric(!! sym(sum_session_time_day_from)),
         !!utility_day_from := as.numeric(!! sym(utility_day_from)),
         !!spend_day_from := as.numeric(!! sym(spend_day_from)),
         !!spend_day_to := as.numeric(!! sym(spend_day_to))) %>% 
  mutate(ios = if_else(platform == 'IOS', 1, 0)) %>% 
  mutate(is_publisher_organic = if_else(publisher_name == "organic", 1, 0),
         is_publisher_facebook = if_else(publisher_name == 'facebook', 1, 0)) %>%  # all 0's will indicate 'other'
  select_at(vars(s, ios, usa, is_publisher_organic, is_publisher_facebook, !!sessions_day_from, !!utility_day_from, recent_utility_ratio, !!spend_day_from, !!spend_day_to)) %>% 
  mutate(spender = if_else(!! sym(spend_day_to) == 0, FALSE, TRUE))

# predict
prediction_df <- prediction_data %>% 
  select(s) %>% 
  mutate(!! paste0("day_", day_from, "_day_", day_to) := revenue_predictions(model, new_data = prediction_data))

There's another read_lines call there but I'm not sure it's relevant.

it looks like the error is triggered by the select_at() call following the inner join with prediction_df

Yes, I think this is it. predict.R ends with a data frame called prediction_df and I it sounds like it's not available in the environment that send_to_s3.R is running in. Is that what your thinking?

Yes, so maybe sourcing 'send_to_s3.R' from inside 'predict.R' as the last line might work, but I'm still not sure how to explicitly manage visibility between the environments passed to the files being sourced.

Thanks for the suggestion. I gave that a try but got the same error as before. Any other suggestions most welcome

And just to make sure --- you used the local = T arugment when you sourced at the end of 'predict.R'?

Yes, confirmed. I set local = T within source.

Oh, and if you insert a print() command, or something like that, which will store the output of str(prediction_df) in an external text file that you can inspect, @dougfir? (Immediately after it's been created?)

I'm going to delete this question, thank you for trying to help out. I was able to make a more reproducible example over here: source() with 2 levels of nesting with a job

1 Like

This topic was automatically closed 21 days after the last reply. New replies are no longer allowed.