Using spark_apply throws a SparkException: Process List(tar, -xf, packages.75547.tar) exited with code 2

New to Spark, trying to use spark_apply to run a custom function and I keep getting an error. I boiled it down to the simple example below and am still getting the error, and running it on another computer with a local Spark install works, so I'm guessing something is misconfigured. This is on Cloudera Machine Learning, with the option for using either Spark 2.4.7 or 3.2.1 and I see the same error on both. I'm not sure what other info would be useful, please let me know.

Here's my MRE:

library(tidyverse)
library(sparklyr)

conf <- spark_config()
sc <- spark_connect(master = 'yarn-client', config = conf)

df_returner <- function(input_df) {
  return(input_df)
}

iris_sc <- copy_to(sc, iris)

iris_sc |> 
  spark_apply(df_returner)

And here's the error output after that last pair of lines:

Error:
! org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
  stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 6,
  10.42.5.147, executor 1): org.apache.spark.SparkException: Process List(tar, -xf,
  packages.75547.tar) exited with code 2

Tried this even more simple version from sparklyr - Distributing R Computations and got what may be a more informative error:

sdf_len(sc, 5, repartition = 1) %>%
  spark_apply(function(e) I(e))
Error in `db_query_fields.DBIConnection()`:
! Can't query fields.
Caused by error in `value[[3L]]()`:
! Failed to fetch data: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 588.0 failed 4 times, most recent failure: Lost task 0.3 in stage 588.0 (TID 57382) (10.42.5.51 executor 112): org.apache.spark.SparkException: Process List(tar, -xf, packages.51962.tar) exited with code 2
Backtrace:
  1. sdf_len(sc, 5, repartition = 1) %>% ...
  4. sparklyr::sdf_len(sc, 5, repartition = 1)
  5. sparklyr::sdf_seq(sc, 1, length, repartition = repartition, type = type)
  7. sparklyr:::sdf_register.spark_jobj(sdf)
  9. sparklyr:::tbl.spark_connection(sc, name)
 10. sparklyr:::spark_tbl_sql(src = src, from)
 11. dbplyr::tbl_sql(...)
 13. dbplyr:::dbplyr_query_fields(src$con, from)
 14. dbplyr:::dbplyr_fallback(con, "db_query_fields", ...)
 16. dbplyr:::db_query_fields.DBIConnection(con, ...)
> dbplyr:::db_query_fields.DBIConnection
function (con, sql, ...) 
{
    sql <- sql_query_fields(con, sql, ...)
    tryCatch({
        df <- DBI::dbGetQuery(con, sql)
    }, error = function(cnd) {
        cli_abort("Can't query fields.", parent = cnd)
    })
    names(df)
}
<bytecode: 0x555f6f0043a0>
<environment: namespace:dbplyr>

Full stack trace:

Run `sparklyr::spark_last_error()` to see the full Spark error (multiple lines)
To use the previous style of error message set `options("sparklyr.simple.errors" = TRUE)`
Backtrace:
  1. sparklyr::spark_apply(iris_sc, df_returner)
  3. sparklyr::sdf_collect(.)
  4. sparklyr:::sdf_collect_static(object, impl, ...)
  6. sparklyr:::invoke_static.spark_shell_connection(...)
  8. sparklyr:::invoke_method.spark_shell_connection(...)
  9. sparklyr:::core_invoke_method(...)
 10. sparklyr:::core_invoke_method_impl(...)
 11. sparklyr:::spark_error(msg)
> sparklyr::spark_last_error()
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 586.0 failed 4 times, most recent failure: Lost task 0.3 in stage 586.0 (TID 57374) (10.42.3.5 executor 114): org.apache.spark.SparkException: Process List(tar, -xf, packages.51962.tar) exited with code 2
	at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:1355)
	at org.apache.spark.util.Utils$.fetchFile(Utils.scala:567)
	at org.apache.spark.executor.Executor.$anonfun$updateDependencies$4(Executor.scala:935)
	at org.apache.spark.executor.Executor.$anonfun$updateDependencies$4$adapted(Executor.scala:932)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
	at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
	at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
	at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
	at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:932)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Process List(tar, -xf, packages.51962.tar) exited with code 2
	at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:1355)
	at org.apache.spark.util.Utils$.fetchFile(Utils.scala:567)
	at org.apache.spark.executor.Executor.$anonfun$updateDependencies$4(Executor.scala:935)
	at org.apache.spark.executor.Executor.$anonfun$updateDependencies$4$adapted(Executor.scala:932)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
	at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
	at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
	at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
	at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:932)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

This topic was automatically closed 21 days after the last reply. New replies are no longer allowed.

If you have a query related to it or one of the replies, start a new topic and refer back with a link.