[jira] [Commented] (SPARK-15390) Memory management issue in complex DataFrame join and filter

2016-10-05 Thread Iulian Dragos (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15547955#comment-15547955
 ] 

Iulian Dragos commented on SPARK-15390:
---

Thanks [~davies]!

> Memory management issue in complex DataFrame join and filter
> 
>
> Key: SPARK-15390
> URL: https://issues.apache.org/jira/browse/SPARK-15390
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: branch-2.0, 16 workers
>Reporter: Joseph K. Bradley
>Assignee: Davies Liu
> Fix For: 2.0.1
>
>
> See [SPARK-15389] for a description of the code which produces this bug.  I 
> am filing this as a separate JIRA since the bug in 2.0 is different.
> In 2.0, the code fails with some memory management error.  Here is the 
> stacktrace:
> {code}
> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support 
> was removed in 8.0
> 16/05/18 19:23:16 ERROR Uncaught throwable from user code: 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> Exchange SinglePartition, None
> +- WholeStageCodegen
>:  +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#170L])
>: +- Project
>:+- BroadcastHashJoin [id#70L], [id#110L], Inner, BuildLeft, None
>:   :- INPUT
>:   +- Project [id#110L]
>:  +- Filter (degree#115 > 200)
>: +- TungstenAggregate(key=[id#110L], 
> functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[id#110L,degree#115])
>:+- INPUT
>:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint]))
>:  +- WholeStageCodegen
>: :  +- Project [row#66.id AS id#70L]
>: : +- Filter isnotnull(row#66.id)
>: :+- INPUT
>: +- Scan ExistingRDD[row#66,uniq_id#67]
>+- Exchange hashpartitioning(id#110L, 200), None
>   +- WholeStageCodegen
>  :  +- TungstenAggregate(key=[id#110L], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[id#110L,count#136L])
>  : +- Filter isnotnull(id#110L)
>  :+- INPUT
>  +- Generate explode(array(src#2L, dst#3L)), false, false, [id#110L]
> +- WholeStageCodegen
>:  +- Filter ((isnotnull(src#2L) && isnotnull(dst#3L)) && NOT 
> (src#2L = dst#3L))
>: +- INPUT
>+- InMemoryTableScan [src#2L,dst#3L], 
> [isnotnull(src#2L),isnotnull(dst#3L),NOT (src#2L = dst#3L)], InMemoryRelation 
> [src#2L,dst#3L], true, 1, StorageLevel(disk=true, memory=true, 
> offheap=false, deserialized=true, replication=1), WholeStageCodegen, None
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate.inputRDDs(TungstenAggregate.scala:134)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:348)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>   at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287)
>   at 
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2122)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>

[jira] [Commented] (SPARK-15390) Memory management issue in complex DataFrame join and filter

2016-10-04 Thread Davies Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15546675#comment-15546675
 ] 

Davies Liu commented on SPARK-15390:


@lulian Dragos I think this is a different issue, fixed by 
https://github.com/apache/spark/pull/14373 and 
https://github.com/apache/spark/pull/14464/files in 2.0.1.

> Memory management issue in complex DataFrame join and filter
> 
>
> Key: SPARK-15390
> URL: https://issues.apache.org/jira/browse/SPARK-15390
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: branch-2.0, 16 workers
>Reporter: Joseph K. Bradley
>Assignee: Davies Liu
> Fix For: 2.0.0
>
>
> See [SPARK-15389] for a description of the code which produces this bug.  I 
> am filing this as a separate JIRA since the bug in 2.0 is different.
> In 2.0, the code fails with some memory management error.  Here is the 
> stacktrace:
> {code}
> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support 
> was removed in 8.0
> 16/05/18 19:23:16 ERROR Uncaught throwable from user code: 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> Exchange SinglePartition, None
> +- WholeStageCodegen
>:  +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#170L])
>: +- Project
>:+- BroadcastHashJoin [id#70L], [id#110L], Inner, BuildLeft, None
>:   :- INPUT
>:   +- Project [id#110L]
>:  +- Filter (degree#115 > 200)
>: +- TungstenAggregate(key=[id#110L], 
> functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[id#110L,degree#115])
>:+- INPUT
>:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint]))
>:  +- WholeStageCodegen
>: :  +- Project [row#66.id AS id#70L]
>: : +- Filter isnotnull(row#66.id)
>: :+- INPUT
>: +- Scan ExistingRDD[row#66,uniq_id#67]
>+- Exchange hashpartitioning(id#110L, 200), None
>   +- WholeStageCodegen
>  :  +- TungstenAggregate(key=[id#110L], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[id#110L,count#136L])
>  : +- Filter isnotnull(id#110L)
>  :+- INPUT
>  +- Generate explode(array(src#2L, dst#3L)), false, false, [id#110L]
> +- WholeStageCodegen
>:  +- Filter ((isnotnull(src#2L) && isnotnull(dst#3L)) && NOT 
> (src#2L = dst#3L))
>: +- INPUT
>+- InMemoryTableScan [src#2L,dst#3L], 
> [isnotnull(src#2L),isnotnull(dst#3L),NOT (src#2L = dst#3L)], InMemoryRelation 
> [src#2L,dst#3L], true, 1, StorageLevel(disk=true, memory=true, 
> offheap=false, deserialized=true, replication=1), WholeStageCodegen, None
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate.inputRDDs(TungstenAggregate.scala:134)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:348)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>   at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287)
>   at 
> 

[jira] [Commented] (SPARK-15390) Memory management issue in complex DataFrame join and filter

2016-09-22 Thread Iulian Dragos (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15513206#comment-15513206
 ] 

Iulian Dragos commented on SPARK-15390:
---

I'm still seeing a similar stack trace with the 2.0 release.

{code}
scala> res.count
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#286L])
   +- *Project
  +- *BroadcastHashJoin [userId#0L], [selUserId#169L], Inner, BuildRight
 :- *Project [userId#0L]
 :  +- *Filter isnotnull(userId#0L)
 : +- *Scan avro [userId#0L] Format: 
com.databricks.spark.avro.DefaultSource@451b7faf, InputPaths: 
file:/Users/dragos/workspace/consulting/teralytics/11-000.avro, PushedFilters: 
[IsNotNull(userId)], ReadSchema: struct
 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
true]))
+- *GlobalLimit 1
   +- Exchange SinglePartition
  +- *LocalLimit 1
 +- *Project [userId#0L AS selUserId#169L]
+- *Filter isnotnull(userId#0L)
   +- *Scan avro [userId#0L] Format: 
com.databricks.spark.avro.DefaultSource@451b7faf, InputPaths: 
file:/Users/dragos/workspace/consulting/teralytics/11-000.avro, PushedFilters: 
[IsNotNull(userId)], ReadSchema: struct

  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
  at 
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
  at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:138)
  at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:361)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
  at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
  at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287)
  at 
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
  at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
  at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2217)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2216)
  at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2545)
  at org.apache.spark.sql.Dataset.count(Dataset.scala:2216)
  ... 50 elided
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
  at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
  at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
  at 
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
  at 
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
  at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
  at 

[jira] [Commented] (SPARK-15390) Memory management issue in complex DataFrame join and filter

2016-05-18 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290020#comment-15290020
 ] 

Apache Spark commented on SPARK-15390:
--

User 'davies' has created a pull request for this issue:
https://github.com/apache/spark/pull/13182

> Memory management issue in complex DataFrame join and filter
> 
>
> Key: SPARK-15390
> URL: https://issues.apache.org/jira/browse/SPARK-15390
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: branch-2.0, 16 workers
>Reporter: Joseph K. Bradley
>
> See [SPARK-15389] for a description of the code which produces this bug.  I 
> am filing this as a separate JIRA since the bug in 2.0 is different.
> In 2.0, the code fails with some memory management error.  Here is the 
> stacktrace:
> {code}
> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support 
> was removed in 8.0
> 16/05/18 19:23:16 ERROR Uncaught throwable from user code: 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> Exchange SinglePartition, None
> +- WholeStageCodegen
>:  +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#170L])
>: +- Project
>:+- BroadcastHashJoin [id#70L], [id#110L], Inner, BuildLeft, None
>:   :- INPUT
>:   +- Project [id#110L]
>:  +- Filter (degree#115 > 200)
>: +- TungstenAggregate(key=[id#110L], 
> functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[id#110L,degree#115])
>:+- INPUT
>:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint]))
>:  +- WholeStageCodegen
>: :  +- Project [row#66.id AS id#70L]
>: : +- Filter isnotnull(row#66.id)
>: :+- INPUT
>: +- Scan ExistingRDD[row#66,uniq_id#67]
>+- Exchange hashpartitioning(id#110L, 200), None
>   +- WholeStageCodegen
>  :  +- TungstenAggregate(key=[id#110L], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[id#110L,count#136L])
>  : +- Filter isnotnull(id#110L)
>  :+- INPUT
>  +- Generate explode(array(src#2L, dst#3L)), false, false, [id#110L]
> +- WholeStageCodegen
>:  +- Filter ((isnotnull(src#2L) && isnotnull(dst#3L)) && NOT 
> (src#2L = dst#3L))
>: +- INPUT
>+- InMemoryTableScan [src#2L,dst#3L], 
> [isnotnull(src#2L),isnotnull(dst#3L),NOT (src#2L = dst#3L)], InMemoryRelation 
> [src#2L,dst#3L], true, 1, StorageLevel(disk=true, memory=true, 
> offheap=false, deserialized=true, replication=1), WholeStageCodegen, None
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate.inputRDDs(TungstenAggregate.scala:134)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:348)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>   at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287)
>   at 
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2122)
>   at 
>