Re: Spark 2.0 issue with left_outer join

2017-03-04 Thread ayan guha
How about running this -

select * from
(select * , count() over (partition by id order by id) c from filteredDS) f
where f.cnt < 7500


On Sun, Mar 5, 2017 at 12:05 PM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:

> Yes every time I run this code with production scale data it fails. Test
> case with small dataset of 50 records on local box runs fine.
>
> Thanks
> Ankur
>
> Sent from my iPhone
>
> On Mar 4, 2017, at 12:09 PM, ayan guha  wrote:
>
> Just to be sure, can you reproduce the error using sql api?
>
> On Sat, 4 Mar 2017 at 2:32 pm, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> Adding DEV.
>>
>> Or is there any other way to do subtractByKey using Dataset APIs?
>>
>> Thanks
>> Ankur
>>
>> On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava <
>> ankur.srivast...@gmail.com> wrote:
>>
>> Hi Users,
>>
>> We are facing an issue with left_outer join using Spark Dataset api in
>> 2.0 Java API. Below is the code we have
>>
>> Dataset badIds = filteredDS.groupBy(col("id").alias("bid")).count()
>> .filter((FilterFunction) row -> (Long) row.getAs("count") > 
>> 75000);
>> _logger.info("Id count with over 75K records that will be filtered: " + 
>> badIds.count());
>>
>> Dataset fiteredRows = filteredDS.join(broadcast(badIds), 
>> filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer")
>> .filter((FilterFunction) row ->  row.getAs("bid") == null)
>> .map((MapFunction) row -> 
>> SomeDataFactory.createObjectFromDDRow(row), Encoders.bean(DeviceData.class));
>>
>>
>> We get the counts in the log file and then the application fils with
>> below exception
>> Exception in thread "main" java.lang.UnsupportedOperationException: Only
>> code-generated evaluation is supported.
>> at org.apache.spark.sql.catalyst.expressions.objects.Invoke.
>> eval(objects.scala:118)
>> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
>> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$
>> canFilterOutNull(joins.scala:109)
>> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
>> anonfun$7.apply(joins.scala:118)
>> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
>> anonfun$7.apply(joins.scala:118)
>> at scala.collection.LinearSeqOptimized$class.
>> exists(LinearSeqOptimized.scala:93)
>> at scala.collection.immutable.List.exists(List.scala:84)
>> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
>> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$
>> buildNewJoinType(joins.scala:118)
>> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
>> anonfun$apply$2.applyOrElse(joins.scala:133)
>> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
>> anonfun$apply$2.applyOrElse(joins.scala:131)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.
>> apply(TreeNode.scala:279)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.
>> apply(TreeNode.scala:279)
>> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.
>> withOrigin(TreeNode.scala:69)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
>> TreeNode.scala:278)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
>> apply(TreeNode.scala:321)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.
>> mapProductIterator(TreeNode.scala:179)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.
>> transformChildren(TreeNode.scala:319)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
>> TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
>> apply(TreeNode.scala:321)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.
>> mapProductIterator(TreeNode.scala:179)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.
>> transformChildren(TreeNode.scala:319)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
>> TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> transformDown$1.apply(TreeNode.scala:284)
>> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
>> apply(TreeNode.scala:321)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.
>> mapProductIterator(TreeNode.scala:179)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.
>> transformChildren(TreeNo

Re: Spark 2.0 issue with left_outer join

2017-03-04 Thread Ankur Srivastava
Yes every time I run this code with production scale data it fails. Test case 
with small dataset of 50 records on local box runs fine.

Thanks
Ankur

Sent from my iPhone

> On Mar 4, 2017, at 12:09 PM, ayan guha  wrote:
> 
> Just to be sure, can you reproduce the error using sql api?
> 
>> On Sat, 4 Mar 2017 at 2:32 pm, Ankur Srivastava  
>> wrote:
>> Adding DEV.
>> 
>> Or is there any other way to do subtractByKey using Dataset APIs?
>> 
>> Thanks
>> Ankur
>> 
>> On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava 
>>  wrote:
>> Hi Users,
>> 
>> We are facing an issue with left_outer join using Spark Dataset api in 2.0 
>> Java API. Below is the code we have
>> 
>> Dataset badIds = filteredDS.groupBy(col("id").alias("bid")).count()
>> .filter((FilterFunction) row -> (Long) row.getAs("count") > 
>> 75000);
>> _logger.info("Id count with over 75K records that will be filtered: " + 
>> badIds.count());
>> 
>> Dataset fiteredRows = filteredDS.join(broadcast(badIds), 
>> filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer")
>> .filter((FilterFunction) row ->  row.getAs("bid") == null)
>> .map((MapFunction) row -> 
>> SomeDataFactory.createObjectFromDDRow(row), Encoders.bean(DeviceData.class));
>> 
>> We get the counts in the log file and then the application fils with below 
>> exception
>> Exception in thread "main" java.lang.UnsupportedOperationException: Only 
>> code-generated evaluation is supported.
>> at 
>> org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:118)
>> at 
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:109)
>> at 
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:118)
>> at 
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:118)
>> at 
>> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
>> at scala.collection.immutable.List.exists(List.scala:84)
>> at 
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:118)
>> at 
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:133)
>> at 
>> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:131)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
>> at 
>> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
>> at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfu

Re: Spark 2.0 issue with left_outer join

2017-03-04 Thread ayan guha
Just to be sure, can you reproduce the error using sql api?

On Sat, 4 Mar 2017 at 2:32 pm, Ankur Srivastava 
wrote:

> Adding DEV.
>
> Or is there any other way to do subtractByKey using Dataset APIs?
>
> Thanks
> Ankur
>
> On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
> Hi Users,
>
> We are facing an issue with left_outer join using Spark Dataset api in 2.0
> Java API. Below is the code we have
>
> Dataset badIds = filteredDS.groupBy(col("id").alias("bid")).count()
> .filter((FilterFunction) row -> (Long) row.getAs("count") > 
> 75000);
> _logger.info("Id count with over 75K records that will be filtered: " + 
> badIds.count());
>
> Dataset fiteredRows = filteredDS.join(broadcast(badIds), 
> filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer")
> .filter((FilterFunction) row ->  row.getAs("bid") == null)
> .map((MapFunction) row -> 
> SomeDataFactory.createObjectFromDDRow(row), Encoders.bean(DeviceData.class));
>
>
> We get the counts in the log file and then the application fils with below
> exception
> Exception in thread "main" java.lang.UnsupportedOperationException: Only
> code-generated evaluation is supported.
> at
> org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:118)
> at
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:109)
> at
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:118)
> at
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:118)
> at
> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
> at scala.collection.immutable.List.exists(List.scala:84)
> at
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:118)
> at
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:133)
> at
> org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:131)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
>

Re: Spark 2.0 issue with left_outer join

2017-03-03 Thread Ankur Srivastava
Adding DEV.

Or is there any other way to do subtractByKey using Dataset APIs?

Thanks
Ankur

On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava  wrote:

> Hi Users,
>
> We are facing an issue with left_outer join using Spark Dataset api in 2.0
> Java API. Below is the code we have
>
> Dataset badIds = filteredDS.groupBy(col("id").alias("bid")).count()
> .filter((FilterFunction) row -> (Long) row.getAs("count") > 
> 75000);
> _logger.info("Id count with over 75K records that will be filtered: " + 
> badIds.count());
>
> Dataset fiteredRows = filteredDS.join(broadcast(badIds), 
> filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer")
> .filter((FilterFunction) row ->  row.getAs("bid") == null)
> .map((MapFunction) row -> 
> SomeDataFactory.createObjectFromDDRow(row), Encoders.bean(DeviceData.class));
>
>
> We get the counts in the log file and then the application fils with below
> exception
> Exception in thread "main" java.lang.UnsupportedOperationException: Only
> code-generated evaluation is supported.
> at org.apache.spark.sql.catalyst.expressions.objects.Invoke.
> eval(objects.scala:118)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$
> canFilterOutNull(joins.scala:109)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
> anonfun$7.apply(joins.scala:118)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
> anonfun$7.apply(joins.scala:118)
> at scala.collection.LinearSeqOptimized$class.
> exists(LinearSeqOptimized.scala:93)
> at scala.collection.immutable.List.exists(List.scala:84)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$
> buildNewJoinType(joins.scala:118)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
> anonfun$apply$2.applyOrElse(joins.scala:133)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
> anonfun$apply$2.applyOrElse(joins.scala:131)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.
> apply(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.
> apply(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.
> withOrigin(TreeNode.scala:69)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:278)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
> apply(TreeNode.scala:321)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> mapProductIterator(TreeNode.scala:179)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> transformChildren(TreeNode.scala:319)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
> apply(TreeNode.scala:321)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> mapProductIterator(TreeNode.scala:179)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> transformChildren(TreeNode.scala:319)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
> apply(TreeNode.scala:321)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> mapProductIterator(TreeNode.scala:179)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> transformChildren(TreeNode.scala:319)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
> apply(TreeNode.scala:321)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> mapProductIterator(TreeNode.scala:179)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> transformChildren(TreeNode.scala:319)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:284)
> at org.apache.spar