Our sparklyr code on RStudio Server Pro recently started failing after an upgrade of our Hadoop (CDH) and R (3.5.1).
We've updated many of the referenced R packages and continue to get this error. Can someone please help me understand what it means or point me in the right direction?
library(dplyr) #Version '1.0.2'
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
library(sparklyr) #Version '1.5.0'
library(tibble) #Version '3.0.4'
library(reprex) #Version '0.2.1'
# library(dbplyr) #Version '2.0.0.9000'
spark_context_start <- function(spark_version = "2.4.0",
executor_cores = 4,
executor_memory = "32g",
executor_idle_timeout = 600,
driver_cores = 10,
driver_memory = "16g",
driver_max_result_size = "30g",
production = FALSE, ...)
{
config <- sparklyr::spark_config()
config$spark.executor.cores <- executor_cores
config$spark.executor.memory <- executor_memory
config$spark.driver.cores <- driver_cores
config$spark.driver.maxResultSize <- driver_max_result_size
config$spark.dynamicAllocation.cachedExecutorIdleTimeout <- executor_idle_timeout
config$"sparklyr.shell.driver-memory" <- driver_memory
if (production) {config$spark.yarn.queue <- "root.Production.MSR_Pool"}
else {config$spark.yarn.queue <- "root.Development.MSR_Team_Pool"}
sc <- sparklyr::spark_connect(master = "yarn-client",
config = config,
version = spark_version, ...)
invisible(sc)
}
sc <- spark_context_start(production = TRUE)
sample_data_frame <- tibble::tribble(
~item, ~status,
'191630', NA,
'19U459', 1,
'194639', 3,
'113037', 999,
'1X9004', NA,
'1B3B41', 0 )
dplyr::copy_to(sc, sample_data_frame, "my_data")
#> # Source: spark<my_data> [?? x 2]
#> item status
#> <chr> <dbl>
#> 1 191630 NA
#> 2 19U459 1
#> 3 194639 3
#> 4 113037 999
#> 5 1X9004 NA
#> 6 1B3B41 0
my_data_in_spark <- dplyr::tbl(sc, "my_data")
my_data_in_spark %>% distinct(item) %>% compute()
#> Warning: ORDER BY is ignored in subqueries without LIMIT
#> ℹ Do you need to move arrange() later in the pipeline or use window_order() instead?
#> Warning: ORDER BY is ignored in subqueries without LIMIT
#> ℹ Do you need to move arrange() later in the pipeline or use window_order() instead?
#> Error: org.apache.spark.sql.AnalysisException: cannot resolve '`status`' given input columns: [q05.item]; line 1 pos 15;
#> 'Project [item#72, 'status, '__row_num_4d0196b2_9268_4820_a724_6f3f7e53e565]
#> +- SubqueryAlias `q05`
#> +- Project [item#72]
#> +- SubqueryAlias `q04`
#> +- Aggregate [item#8], [first(__row_num_4d0196b2_9268_4820_a724_6f3f7e53e565#70, false) AS __row_num_4d0196b2_9268_4820_a724_6f3f7e53e565#71, first(item#8, false) AS item#72]
#> +- SubqueryAlias `q03`
#> +- Project [item#8, status#9, __row_num_4d0196b2_9268_4820_a724_6f3f7e53e565#70]
#> +- Project [item#8, status#9, __row_num_4d0196b2_9268_4820_a724_6f3f7e53e565#70, __row_num_4d0196b2_9268_4820_a724_6f3f7e53e565#70]
#> +- Window [row_number() windowspecdefinition(null ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS __row_num_4d0196b2_9268_4820_a724_6f3f7e53e565#70], [null ASC NULLS FIRST]
#> +- Project [item#8, status#9]
#> +- SubqueryAlias `q02`
#> +- Project [item#8, status#9]
#> +- SubqueryAlias `q01`
#> +- Project [item#8, status#9]
#> +- SubqueryAlias `my_data`
#> +- LogicalRDD [item#8, status#9], false
#>
#> at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
#> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:110)
#> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:107)
#> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278)
#> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278)
#> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
#> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:93)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:93)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
#> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:104)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$2.apply(QueryPlan.scala:121)
#> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
#> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
#> at scala.collection.immutable.List.foreach(List.scala:392)
#> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
#> at scala.collection.immutable.List.map(List.scala:296)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:121)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:126)
#> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:126)
#> at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:93)
#> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:107)
#> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
#> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
#> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85)
#> at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
#> at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
#> at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
#> at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
#> at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
#> at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
#> at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
#> at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
#> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
#> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:651)
#> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
#> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
#> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
#> at java.lang.reflect.Method.invoke(Method.java:498)
#> at sparklyr.Invoke.invoke(invoke.scala:147)
#> at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
#> at sparklyr.StreamHandler.read(stream.scala:61)
#> at sparklyr.BackendHandler$$anonfun$channelRead0$1.apply$mcV$sp(handler.scala:58)
#> at scala.util.control.Breaks.breakable(Breaks.scala:38)
#> at sparklyr.BackendHandler.channelRead0(handler.scala:38)
#> at sparklyr.BackendHandler.channelRead0(handler.scala:14)
#> at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
#> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
#> at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
#> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
#> at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
#> at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
#> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
#> at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
#> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
#> at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
#> at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
#> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
#> at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
#> at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
#> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
#> at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
#> at io.netty.util.concurrent.DefaultThreadFactory$DefaultRu