Yes every time I run this code with production scale data it fails. Test case with small dataset of 50 records on local box runs fine.
Thanks Ankur Sent from my iPhone > On Mar 4, 2017, at 12:09 PM, ayan guha <guha.a...@gmail.com> wrote: > > Just to be sure, can you reproduce the error using sql api? > >> On Sat, 4 Mar 2017 at 2:32 pm, Ankur Srivastava <ankur.srivast...@gmail.com> >> wrote: >> Adding DEV. >> >> Or is there any other way to do subtractByKey using Dataset APIs? >> >> Thanks >> Ankur >> >> On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava >> <ankur.srivast...@gmail.com> wrote: >> Hi Users, >> >> We are facing an issue with left_outer join using Spark Dataset api in 2.0 >> Java API. Below is the code we have >> >> Dataset<Row> badIds = filteredDS.groupBy(col("id").alias("bid")).count() >> .filter((FilterFunction<Row>) row -> (Long) row.getAs("count") > >> 75000); >> _logger.info("Id count with over 75K records that will be filtered: " + >> badIds.count()); >> >> Dataset<SomeData> fiteredRows = filteredDS.join(broadcast(badIds), >> filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer") >> .filter((FilterFunction<Row>) row -> row.getAs("bid") == null) >> .map((MapFunction<Row, SomeData>) row -> >> SomeDataFactory.createObjectFromDDRow(row), Encoders.bean(DeviceData.class)); >> >> We get the counts in the log file and then the application fils with below >> exception >> Exception in thread "main" java.lang.UnsupportedOperationException: Only >> code-generated evaluation is supported. >> at >> org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:118) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:109) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:118) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:118) >> at >> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) >> at scala.collection.immutable.List.exists(List.scala:84) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:118) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:133) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:131) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279) >> at >> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:268) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.apply(joins.scala:131) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.apply(joins.scala:98) >> at >> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) >> at >> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) >> at >> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) >> at >> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) >> at >> scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35) >> at >> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) >> at >> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) >> at scala.collection.immutable.List.foreach(List.scala:381) >> at >> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) >> at >> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:74) >> at >> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:74) >> at >> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78) >> at >> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76) >> at >> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83) >> at >> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83) >> at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555) >> at org.apache.spark.sql.Dataset.count(Dataset.scala:2226) >> at test.Driver.main(Driver.java:106) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) >> at >> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) >> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> >> Thanks >> Ankur >> > > -- > Best Regards, > Ayan Guha