Error: org.apache.spark.sql.AnalysisException: cannot resolve '`column`' given input columns

Our sparklyr code on RStudio Server Pro recently started failing after an upgrade of our Hadoop (CDH) and R (3.5.1).

We've updated many of the referenced R packages and continue to get this error. Can someone please help me understand what it means or point me in the right direction?

library(dplyr)    #Version '1.0.2'
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(sparklyr) #Version '1.5.0'
library(tibble)   #Version '3.0.4'
library(reprex)   #Version '0.2.1'
# library(dbplyr)   #Version '2.0.0.9000'
spark_context_start <- function(spark_version = "2.4.0",
                                executor_cores = 4, 
                                executor_memory = "32g", 
                                executor_idle_timeout = 600, 
                                driver_cores = 10, 
                                driver_memory = "16g", 
                                driver_max_result_size = "30g",
                                production = FALSE, ...)
{
  config <- sparklyr::spark_config()
  config$spark.executor.cores <- executor_cores
  config$spark.executor.memory <- executor_memory
  config$spark.driver.cores <- driver_cores
  config$spark.driver.maxResultSize <- driver_max_result_size
  config$spark.dynamicAllocation.cachedExecutorIdleTimeout <- executor_idle_timeout
  config$"sparklyr.shell.driver-memory" <- driver_memory
  if (production) {config$spark.yarn.queue <- "root.Production.MSR_Pool"}
  else {config$spark.yarn.queue <- "root.Development.MSR_Team_Pool"}
  sc <- sparklyr::spark_connect(master = "yarn-client", 
                                config = config, 
                                version = spark_version, ...)
  invisible(sc)
}
sc <- spark_context_start(production = TRUE)
sample_data_frame <- tibble::tribble(
  ~item, ~status,
  '191630', NA,    
  '19U459', 1,    
  '194639', 3,    
  '113037', 999,    
  '1X9004', NA,    
  '1B3B41', 0 )
dplyr::copy_to(sc, sample_data_frame, "my_data")
#> # Source: spark<my_data> [?? x 2]
#>   item   status
#>   <chr>   <dbl>
#> 1 191630     NA
#> 2 19U459      1
#> 3 194639      3
#> 4 113037    999
#> 5 1X9004     NA
#> 6 1B3B41      0
my_data_in_spark <- dplyr::tbl(sc, "my_data")
my_data_in_spark %>% distinct(item) %>% compute()
#> Warning: ORDER BY is ignored in subqueries without LIMIT
#> ℹ Do you need to move arrange() later in the pipeline or use window_order() instead?

#> Warning: ORDER BY is ignored in subqueries without LIMIT
#> ℹ Do you need to move arrange() later in the pipeline or use window_order() instead?
#> Error: org.apache.spark.sql.AnalysisException: cannot resolve '`status`' given input columns: [q05.item]; line 1 pos 15;
#> 'Project [item#72, 'status, '__row_num_4d0196b2_9268_4820_a724_6f3f7e53e565]
#> +- SubqueryAlias `q05`
#>    +- Project [item#72]
#>       +- SubqueryAlias `q04`
#>          +- Aggregate [item#8], [first(__row_num_4d0196b2_9268_4820_a724_6f3f7e53e565#70, false) AS __row_num_4d0196b2_9268_4820_a724_6f3f7e53e565#71, first(item#8, false) AS item#72]
#>             +- SubqueryAlias `q03`
#>                +- Project [item#8, status#9, __row_num_4d0196b2_9268_4820_a724_6f3f7e53e565#70]
#>                   +- Project [item#8, status#9, __row_num_4d0196b2_9268_4820_a724_6f3f7e53e565#70, __row_num_4d0196b2_9268_4820_a724_6f3f7e53e565#70]
#>                      +- Window [row_number() windowspecdefinition(null ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS __row_num_4d0196b2_9268_4820_a724_6f3f7e53e565#70], [null ASC NULLS FIRST]
#>                         +- Project [item#8, status#9]
#>                            +- SubqueryAlias `q02`
#>                               +- Project [item#8, status#9]
#>                                  +- SubqueryAlias `q01`
#>                                     +- Project [item#8, status#9]
#>                                        +- SubqueryAlias `my_data`
#>                                           +- LogicalRDD [item#8, status#9], false
#> 
#>  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:110)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:107)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278)
#>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:93)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:93)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
#>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:104)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$2.apply(QueryPlan.scala:121)
#>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
#>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
#>  at scala.collection.immutable.List.foreach(List.scala:392)
#>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
#>  at scala.collection.immutable.List.map(List.scala:296)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:121)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:126)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:126)
#>  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:93)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:107)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
#>  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
#>  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
#>  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
#>  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
#>  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
#>  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
#>  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
#>  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
#>  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:651)
#>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
#>  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
#>  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
#>  at java.lang.reflect.Method.invoke(Method.java:498)
#>  at sparklyr.Invoke.invoke(invoke.scala:147)
#>  at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
#>  at sparklyr.StreamHandler.read(stream.scala:61)
#>  at sparklyr.BackendHandler$$anonfun$channelRead0$1.apply$mcV$sp(handler.scala:58)
#>  at scala.util.control.Breaks.breakable(Breaks.scala:38)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:38)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:14)
#>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
#>  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
#>  at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
#>  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
#>  at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
#>  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
#>  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
#>  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
#>  at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
#>  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRu

It appears that the code is being translated to spark SQL incorrectly:

> my_data_in_spark %>% distinct(item) %>% show_query()
#><SQL>
#>SELECT `item`
#>FROM (SELECT FIRST(`__row_num_1aa94d87_1e8e_4368_a472_08a6ae5a54bc`, FALSE) AS #>`__row_num_1aa94d87_1e8e_4368_a472_08a6ae5a54bc`, FIRST(`item`, FALSE) AS `item`
#>FROM (SELECT `item`, `status`, ROW_NUMBER() OVER (ORDER BY NULL) AS #>`__row_num_1aa94d87_1e8e_4368_a472_08a6ae5a54bc`
#>FROM (SELECT `item`, `status`
#>FROM (SELECT `item`, `status`
#>FROM `my_data`) `q01`) `q02`) `q03`
#>GROUP BY `item`) `q04`
#>ORDER BY `__row_num_1aa94d87_1e8e_4368_a472_08a6ae5a54bc`
#>Warning message:
#>ORDER BY is ignored in subqueries without LIMIT
#>ℹ Do you need to move arrange() later in the pipeline or use window_order() instead? 

This topic was automatically closed 21 days after the last reply. New replies are no longer allowed.

If you have a query related to it or one of the replies, start a new topic and refer back with a link.