I am running a plumber API on Google Cloud Run where I trigger the API with a Github webhook event. I want to run the API one time, do the work, and then delete the concurrent calls that arrived and waiting in the queue while the computation was still running.
I will receive a lot of post requests in a short amount of time and then nothing for a longer period of time. My main aim is to run the calculation once for each batch of calls.
Is there a way to delete the queue of concurrent requests in R? Or do you have any other suggestions for a possible solution?
Is there a way to delete the queue of concurrent requests in R?
No. They must be managed.
Do you know how long the burst lasts? Ex: Everything happens in ~ 5 seconds.
Without getting into shared memory and forked processes, we can set up a flag to cause other routes that are being requested within X seconds to be ignored / fail.
I believe we could lower the window time down to 1 second and that would state that "any route that will attempt to be processed within 1 second after the first successful processing will fail." This would include all routes currently in the queue and any new routes within that time period, as they would flush through fairly quickly.
That being said, doesn't hurt to make the window something larger, like 60 seconds.
# plumber.R
cache = list()
do_work <- function(id) {
cat("processing id: ", id, "\n")
str(cache)
Sys.sleep(10)
ans <- list(data = runif(10))
cat("done with id: ", id, "\n")
ans
}
#' @param id identification value to use when processing
#' @param window number of seconds after the first request finishes to not accept any more requests for a given id
#' @get /frontheavy
function(id = "abc", window = 10) {
if (!is.null(cache[[id]])) {
stop("already processing id: ", id, call. = FALSE)
}
# do the processing
ans <- do_work(id)
# like a semaphore, block the id from processing for a minimum amount of time
cache[[id]] <<- later::later(function() {
# Unblock the id
cache[[id]] <<- NULL
}, delay = window)
ans
}
Thank you for the detailed response. The processing does not take long, depending on the number of new datapoints it varies, however I can put in a sleep command for up to 5 minutes. After that the API should listen to further calls. I am not familiar with your proposed solution. How is this different than using a filter function and updating time of the execution and time of the call?
# The object last_run is defined as 0 in the entrypoint file containing plumb()
#* @filter checkTime
function(req, res){
webhook_payload <- jsonlite::fromJSON(req$postBody)
push_time <- webhook_payload$repository$pushed_at
if (last_run != 0 & push_time <= last_run) {
res$status <- 503
return(
list(
error="Service unavailable until previous calculation is finished"
)
)
} else {
plumber::forward()
}
}
#* Receive pub/sub message
#* @post /pubsub
function(){
Sys.sleep(120)
# Do the work here
last_run <- as.numeric(as.POSIXct(Sys.time()))
return(
list(
last_run, push_time
)
)
}
Your solution would work as well! Yours is cleaner in that it will only stop requests that have queued up. (Only downside of the solution is there is no way to distinguish between different ID values, but it doesn't seem like that is a necessity. )
The filter logic could be moved into the route if you don't want all of your routes to do this checking logic.
When setting the last_run value, I would feel free to add some extra time (10 seconds? / '5 minutes') to avoid having your burst actually produce 2+ events and not 1 event. (Such as when trying debounce requests and it ends up looking like throttleing due to a short timeout.)
Note, be sure to use <<- when setting last_run in your route. (Otherwise it will only modify locally and the new value will be forgotten.)