[jira] [Commented] (SPARK-15390) Memory management issue in complex DataFrame join and filter
[ https://issues.apache.org/jira/browse/SPARK-15390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15547955#comment-15547955 ] Iulian Dragos commented on SPARK-15390: --- Thanks [~davies]! > Memory management issue in complex DataFrame join and filter > > > Key: SPARK-15390 > URL: https://issues.apache.org/jira/browse/SPARK-15390 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: branch-2.0, 16 workers >Reporter: Joseph K. Bradley >Assignee: Davies Liu > Fix For: 2.0.1 > > > See [SPARK-15389] for a description of the code which produces this bug. I > am filing this as a separate JIRA since the bug in 2.0 is different. > In 2.0, the code fails with some memory management error. Here is the > stacktrace: > {code} > OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support > was removed in 8.0 > 16/05/18 19:23:16 ERROR Uncaught throwable from user code: > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: > Exchange SinglePartition, None > +- WholeStageCodegen >: +- TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#170L]) >: +- Project >:+- BroadcastHashJoin [id#70L], [id#110L], Inner, BuildLeft, None >: :- INPUT >: +- Project [id#110L] >: +- Filter (degree#115 > 200) >: +- TungstenAggregate(key=[id#110L], > functions=[(count(1),mode=Final,isDistinct=false)], > output=[id#110L,degree#115]) >:+- INPUT >:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint])) >: +- WholeStageCodegen >: : +- Project [row#66.id AS id#70L] >: : +- Filter isnotnull(row#66.id) >: :+- INPUT >: +- Scan ExistingRDD[row#66,uniq_id#67] >+- Exchange hashpartitioning(id#110L, 200), None > +- WholeStageCodegen > : +- TungstenAggregate(key=[id#110L], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[id#110L,count#136L]) > : +- Filter isnotnull(id#110L) > :+- INPUT > +- Generate explode(array(src#2L, dst#3L)), false, false, [id#110L] > +- WholeStageCodegen >: +- Filter ((isnotnull(src#2L) && isnotnull(dst#3L)) && NOT > (src#2L = dst#3L)) >: +- INPUT >+- InMemoryTableScan [src#2L,dst#3L], > [isnotnull(src#2L),isnotnull(dst#3L),NOT (src#2L = dst#3L)], InMemoryRelation > [src#2L,dst#3L], true, 1, StorageLevel(disk=true, memory=true, > offheap=false, deserialized=true, replication=1), WholeStageCodegen, None > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50) > at > org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.inputRDDs(TungstenAggregate.scala:134) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:348) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2122) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecutio
[jira] [Commented] (SPARK-15390) Memory management issue in complex DataFrame join and filter
[ https://issues.apache.org/jira/browse/SPARK-15390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15546675#comment-15546675 ] Davies Liu commented on SPARK-15390: @lulian Dragos I think this is a different issue, fixed by https://github.com/apache/spark/pull/14373 and https://github.com/apache/spark/pull/14464/files in 2.0.1. > Memory management issue in complex DataFrame join and filter > > > Key: SPARK-15390 > URL: https://issues.apache.org/jira/browse/SPARK-15390 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: branch-2.0, 16 workers >Reporter: Joseph K. Bradley >Assignee: Davies Liu > Fix For: 2.0.0 > > > See [SPARK-15389] for a description of the code which produces this bug. I > am filing this as a separate JIRA since the bug in 2.0 is different. > In 2.0, the code fails with some memory management error. Here is the > stacktrace: > {code} > OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support > was removed in 8.0 > 16/05/18 19:23:16 ERROR Uncaught throwable from user code: > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: > Exchange SinglePartition, None > +- WholeStageCodegen >: +- TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#170L]) >: +- Project >:+- BroadcastHashJoin [id#70L], [id#110L], Inner, BuildLeft, None >: :- INPUT >: +- Project [id#110L] >: +- Filter (degree#115 > 200) >: +- TungstenAggregate(key=[id#110L], > functions=[(count(1),mode=Final,isDistinct=false)], > output=[id#110L,degree#115]) >:+- INPUT >:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint])) >: +- WholeStageCodegen >: : +- Project [row#66.id AS id#70L] >: : +- Filter isnotnull(row#66.id) >: :+- INPUT >: +- Scan ExistingRDD[row#66,uniq_id#67] >+- Exchange hashpartitioning(id#110L, 200), None > +- WholeStageCodegen > : +- TungstenAggregate(key=[id#110L], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[id#110L,count#136L]) > : +- Filter isnotnull(id#110L) > :+- INPUT > +- Generate explode(array(src#2L, dst#3L)), false, false, [id#110L] > +- WholeStageCodegen >: +- Filter ((isnotnull(src#2L) && isnotnull(dst#3L)) && NOT > (src#2L = dst#3L)) >: +- INPUT >+- InMemoryTableScan [src#2L,dst#3L], > [isnotnull(src#2L),isnotnull(dst#3L),NOT (src#2L = dst#3L)], InMemoryRelation > [src#2L,dst#3L], true, 1, StorageLevel(disk=true, memory=true, > offheap=false, deserialized=true, replication=1), WholeStageCodegen, None > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50) > at > org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.inputRDDs(TungstenAggregate.scala:134) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:348) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spa
[jira] [Commented] (SPARK-15390) Memory management issue in complex DataFrame join and filter
[ https://issues.apache.org/jira/browse/SPARK-15390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15513206#comment-15513206 ] Iulian Dragos commented on SPARK-15390: --- I'm still seeing a similar stack trace with the 2.0 release. {code} scala> res.count org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange SinglePartition +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#286L]) +- *Project +- *BroadcastHashJoin [userId#0L], [selUserId#169L], Inner, BuildRight :- *Project [userId#0L] : +- *Filter isnotnull(userId#0L) : +- *Scan avro [userId#0L] Format: com.databricks.spark.avro.DefaultSource@451b7faf, InputPaths: file:/Users/dragos/workspace/consulting/teralytics/11-000.avro, PushedFilters: [IsNotNull(userId)], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])) +- *GlobalLimit 1 +- Exchange SinglePartition +- *LocalLimit 1 +- *Project [userId#0L AS selUserId#169L] +- *Filter isnotnull(userId#0L) +- *Scan avro [userId#0L] Format: com.databricks.spark.avro.DefaultSource@451b7faf, InputPaths: file:/Users/dragos/workspace/consulting/teralytics/11-000.avro, PushedFilters: [IsNotNull(userId)], ReadSchema: struct at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50) at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:138) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:361) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189) at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2217) at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2216) at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2545) at org.apache.spark.sql.Dataset.count(Dataset.scala:2216) ... 50 elided Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120) at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98) at
[jira] [Commented] (SPARK-15390) Memory management issue in complex DataFrame join and filter
[ https://issues.apache.org/jira/browse/SPARK-15390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15290020#comment-15290020 ] Apache Spark commented on SPARK-15390: -- User 'davies' has created a pull request for this issue: https://github.com/apache/spark/pull/13182 > Memory management issue in complex DataFrame join and filter > > > Key: SPARK-15390 > URL: https://issues.apache.org/jira/browse/SPARK-15390 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: branch-2.0, 16 workers >Reporter: Joseph K. Bradley > > See [SPARK-15389] for a description of the code which produces this bug. I > am filing this as a separate JIRA since the bug in 2.0 is different. > In 2.0, the code fails with some memory management error. Here is the > stacktrace: > {code} > OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support > was removed in 8.0 > 16/05/18 19:23:16 ERROR Uncaught throwable from user code: > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: > Exchange SinglePartition, None > +- WholeStageCodegen >: +- TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#170L]) >: +- Project >:+- BroadcastHashJoin [id#70L], [id#110L], Inner, BuildLeft, None >: :- INPUT >: +- Project [id#110L] >: +- Filter (degree#115 > 200) >: +- TungstenAggregate(key=[id#110L], > functions=[(count(1),mode=Final,isDistinct=false)], > output=[id#110L,degree#115]) >:+- INPUT >:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint])) >: +- WholeStageCodegen >: : +- Project [row#66.id AS id#70L] >: : +- Filter isnotnull(row#66.id) >: :+- INPUT >: +- Scan ExistingRDD[row#66,uniq_id#67] >+- Exchange hashpartitioning(id#110L, 200), None > +- WholeStageCodegen > : +- TungstenAggregate(key=[id#110L], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[id#110L,count#136L]) > : +- Filter isnotnull(id#110L) > :+- INPUT > +- Generate explode(array(src#2L, dst#3L)), false, false, [id#110L] > +- WholeStageCodegen >: +- Filter ((isnotnull(src#2L) && isnotnull(dst#3L)) && NOT > (src#2L = dst#3L)) >: +- INPUT >+- InMemoryTableScan [src#2L,dst#3L], > [isnotnull(src#2L),isnotnull(dst#3L),NOT (src#2L = dst#3L)], InMemoryRelation > [src#2L,dst#3L], true, 1, StorageLevel(disk=true, memory=true, > offheap=false, deserialized=true, replication=1), WholeStageCodegen, None > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50) > at > org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.inputRDDs(TungstenAggregate.scala:134) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:348) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2122) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutio