Re: Spark 2.0 issue with left_outer join
How about running this - select * from (select * , count() over (partition by id order by id) c from filteredDS) f where f.cnt < 7500 On Sun, Mar 5, 2017 at 12:05 PM, Ankur Srivastava < ankur.srivast...@gmail.com> wrote: > Yes every time I run this code with production scale data it fails. Test > case with small dataset of 50 records on local box runs fine. > > Thanks > Ankur > > Sent from my iPhone > > On Mar 4, 2017, at 12:09 PM, ayan guha wrote: > > Just to be sure, can you reproduce the error using sql api? > > On Sat, 4 Mar 2017 at 2:32 pm, Ankur Srivastava < > ankur.srivast...@gmail.com> wrote: > >> Adding DEV. >> >> Or is there any other way to do subtractByKey using Dataset APIs? >> >> Thanks >> Ankur >> >> On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava < >> ankur.srivast...@gmail.com> wrote: >> >> Hi Users, >> >> We are facing an issue with left_outer join using Spark Dataset api in >> 2.0 Java API. Below is the code we have >> >> Dataset badIds = filteredDS.groupBy(col("id").alias("bid")).count() >> .filter((FilterFunction) row -> (Long) row.getAs("count") > >> 75000); >> _logger.info("Id count with over 75K records that will be filtered: " + >> badIds.count()); >> >> Dataset fiteredRows = filteredDS.join(broadcast(badIds), >> filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer") >> .filter((FilterFunction) row -> row.getAs("bid") == null) >> .map((MapFunction) row -> >> SomeDataFactory.createObjectFromDDRow(row), Encoders.bean(DeviceData.class)); >> >> >> We get the counts in the log file and then the application fils with >> below exception >> Exception in thread "main" java.lang.UnsupportedOperationException: Only >> code-generated evaluation is supported. >> at org.apache.spark.sql.catalyst.expressions.objects.Invoke. >> eval(objects.scala:118) >> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$. >> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$ >> canFilterOutNull(joins.scala:109) >> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$ >> anonfun$7.apply(joins.scala:118) >> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$ >> anonfun$7.apply(joins.scala:118) >> at scala.collection.LinearSeqOptimized$class. >> exists(LinearSeqOptimized.scala:93) >> at scala.collection.immutable.List.exists(List.scala:84) >> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$. >> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$ >> buildNewJoinType(joins.scala:118) >> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$ >> anonfun$apply$2.applyOrElse(joins.scala:133) >> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$ >> anonfun$apply$2.applyOrElse(joins.scala:131) >> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3. >> apply(TreeNode.scala:279) >> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3. >> apply(TreeNode.scala:279) >> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$. >> withOrigin(TreeNode.scala:69) >> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( >> TreeNode.scala:278) >> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> transformDown$1.apply(TreeNode.scala:284) >> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> transformDown$1.apply(TreeNode.scala:284) >> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5. >> apply(TreeNode.scala:321) >> at org.apache.spark.sql.catalyst.trees.TreeNode. >> mapProductIterator(TreeNode.scala:179) >> at org.apache.spark.sql.catalyst.trees.TreeNode. >> transformChildren(TreeNode.scala:319) >> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( >> TreeNode.scala:284) >> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> transformDown$1.apply(TreeNode.scala:284) >> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> transformDown$1.apply(TreeNode.scala:284) >> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5. >> apply(TreeNode.scala:321) >> at org.apache.spark.sql.catalyst.trees.TreeNode. >> mapProductIterator(TreeNode.scala:179) >> at org.apache.spark.sql.catalyst.trees.TreeNode. >> transformChildren(TreeNode.scala:319) >> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( >> TreeNode.scala:284) >> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> transformDown$1.apply(TreeNode.scala:284) >> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> transformDown$1.apply(TreeNode.scala:284) >> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5. >> apply(TreeNode.scala:321) >> at org.apache.spark.sql.catalyst.trees.TreeNode. >> mapProductIterator(TreeNode.scala:179) >> at org.apache.spark.sql.catalyst.trees.TreeNode. >> transformChildren(TreeNo
Re: Spark 2.0 issue with left_outer join
Yes every time I run this code with production scale data it fails. Test case with small dataset of 50 records on local box runs fine. Thanks Ankur Sent from my iPhone > On Mar 4, 2017, at 12:09 PM, ayan guha wrote: > > Just to be sure, can you reproduce the error using sql api? > >> On Sat, 4 Mar 2017 at 2:32 pm, Ankur Srivastava >> wrote: >> Adding DEV. >> >> Or is there any other way to do subtractByKey using Dataset APIs? >> >> Thanks >> Ankur >> >> On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava >> wrote: >> Hi Users, >> >> We are facing an issue with left_outer join using Spark Dataset api in 2.0 >> Java API. Below is the code we have >> >> Dataset badIds = filteredDS.groupBy(col("id").alias("bid")).count() >> .filter((FilterFunction) row -> (Long) row.getAs("count") > >> 75000); >> _logger.info("Id count with over 75K records that will be filtered: " + >> badIds.count()); >> >> Dataset fiteredRows = filteredDS.join(broadcast(badIds), >> filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer") >> .filter((FilterFunction) row -> row.getAs("bid") == null) >> .map((MapFunction) row -> >> SomeDataFactory.createObjectFromDDRow(row), Encoders.bean(DeviceData.class)); >> >> We get the counts in the log file and then the application fils with below >> exception >> Exception in thread "main" java.lang.UnsupportedOperationException: Only >> code-generated evaluation is supported. >> at >> org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:118) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:109) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:118) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:118) >> at >> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) >> at scala.collection.immutable.List.exists(List.scala:84) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:118) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:133) >> at >> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:131) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279) >> at >> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfu
Re: Spark 2.0 issue with left_outer join
Just to be sure, can you reproduce the error using sql api? On Sat, 4 Mar 2017 at 2:32 pm, Ankur Srivastava wrote: > Adding DEV. > > Or is there any other way to do subtractByKey using Dataset APIs? > > Thanks > Ankur > > On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava < > ankur.srivast...@gmail.com> wrote: > > Hi Users, > > We are facing an issue with left_outer join using Spark Dataset api in 2.0 > Java API. Below is the code we have > > Dataset badIds = filteredDS.groupBy(col("id").alias("bid")).count() > .filter((FilterFunction) row -> (Long) row.getAs("count") > > 75000); > _logger.info("Id count with over 75K records that will be filtered: " + > badIds.count()); > > Dataset fiteredRows = filteredDS.join(broadcast(badIds), > filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer") > .filter((FilterFunction) row -> row.getAs("bid") == null) > .map((MapFunction) row -> > SomeDataFactory.createObjectFromDDRow(row), Encoders.bean(DeviceData.class)); > > > We get the counts in the log file and then the application fils with below > exception > Exception in thread "main" java.lang.UnsupportedOperationException: Only > code-generated evaluation is supported. > at > org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:118) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:109) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:118) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:118) > at > scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) > at scala.collection.immutable.List.exists(List.scala:84) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:118) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:133) > at > org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:131) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319) >
Re: Spark 2.0 issue with left_outer join
Adding DEV. Or is there any other way to do subtractByKey using Dataset APIs? Thanks Ankur On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava wrote: > Hi Users, > > We are facing an issue with left_outer join using Spark Dataset api in 2.0 > Java API. Below is the code we have > > Dataset badIds = filteredDS.groupBy(col("id").alias("bid")).count() > .filter((FilterFunction) row -> (Long) row.getAs("count") > > 75000); > _logger.info("Id count with over 75K records that will be filtered: " + > badIds.count()); > > Dataset fiteredRows = filteredDS.join(broadcast(badIds), > filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer") > .filter((FilterFunction) row -> row.getAs("bid") == null) > .map((MapFunction) row -> > SomeDataFactory.createObjectFromDDRow(row), Encoders.bean(DeviceData.class)); > > > We get the counts in the log file and then the application fils with below > exception > Exception in thread "main" java.lang.UnsupportedOperationException: Only > code-generated evaluation is supported. > at org.apache.spark.sql.catalyst.expressions.objects.Invoke. > eval(objects.scala:118) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$. > org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$ > canFilterOutNull(joins.scala:109) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$ > anonfun$7.apply(joins.scala:118) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$ > anonfun$7.apply(joins.scala:118) > at scala.collection.LinearSeqOptimized$class. > exists(LinearSeqOptimized.scala:93) > at scala.collection.immutable.List.exists(List.scala:84) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$. > org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$ > buildNewJoinType(joins.scala:118) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$ > anonfun$apply$2.applyOrElse(joins.scala:133) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$ > anonfun$apply$2.applyOrElse(joins.scala:131) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3. > apply(TreeNode.scala:279) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3. > apply(TreeNode.scala:279) > at org.apache.spark.sql.catalyst.trees.CurrentOrigin$. > withOrigin(TreeNode.scala:69) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( > TreeNode.scala:278) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5. > apply(TreeNode.scala:321) > at org.apache.spark.sql.catalyst.trees.TreeNode. > mapProductIterator(TreeNode.scala:179) > at org.apache.spark.sql.catalyst.trees.TreeNode. > transformChildren(TreeNode.scala:319) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( > TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5. > apply(TreeNode.scala:321) > at org.apache.spark.sql.catalyst.trees.TreeNode. > mapProductIterator(TreeNode.scala:179) > at org.apache.spark.sql.catalyst.trees.TreeNode. > transformChildren(TreeNode.scala:319) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( > TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5. > apply(TreeNode.scala:321) > at org.apache.spark.sql.catalyst.trees.TreeNode. > mapProductIterator(TreeNode.scala:179) > at org.apache.spark.sql.catalyst.trees.TreeNode. > transformChildren(TreeNode.scala:319) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( > TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5. > apply(TreeNode.scala:321) > at org.apache.spark.sql.catalyst.trees.TreeNode. > mapProductIterator(TreeNode.scala:179) > at org.apache.spark.sql.catalyst.trees.TreeNode. > transformChildren(TreeNode.scala:319) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( > TreeNode.scala:284) > at org.apache.spar