[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator

2015-05-26 Thread Yin Huai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yin Huai updated SPARK-6012:

Target Version/s:   (was: 1.4.0)

 Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered 
 operator
 --

 Key: SPARK-6012
 URL: https://issues.apache.org/jira/browse/SPARK-6012
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.1
Reporter: Max Seiden
Priority: Critical

 h3. Summary
 I've found that a deadlock occurs when asking for the partitions from a 
 SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs 
 when a child RDDs asks the DAGScheduler for preferred partition locations 
 (which locks the scheduler) and eventually hits the #execute() of the 
 TakeOrdered operator, which submits tasks but is blocked when it also tries 
 to get preferred locations (in a separate thread). It seems like the 
 TakeOrdered op's #execute() method should not actually submit a task (it is 
 calling #executeCollect() and creating a new RDD) and should instead stay 
 more true to the comment a logically apply a Limit on top of a Sort. 
 In my particular case, I am forcing a repartition of a SchemaRDD with a 
 terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into 
 play.
 h3. Stack Traces
 h4. Task Submission
 {noformat}
 main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() 
 [0x00010ed5e000]
java.lang.Thread.State: WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 - waiting on 0x0007c4c239b8 (a 
 org.apache.spark.scheduler.JobWaiter)
 at java.lang.Object.wait(Object.java:503)
 at 
 org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
 - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter)
 at 
 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390)
 at org.apache.spark.rdd.RDD.reduce(RDD.scala:884)
 at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161)
 at 
 org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183)
 at 
 org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
 - locked 0x0007c36ce038 (a 
 org.apache.spark.sql.hive.HiveContext$$anon$7)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
 at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278)
 at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at org.apache.spark.ShuffleDependency.init(Dependency.scala:79)
 at 
 org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333)
 at 
 org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304)
 - locked 0x0007f55c2238 (a 
 org.apache.spark.scheduler.DAGScheduler)
 at 
 org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148)
 at 
 org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175)
 

[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator

2015-04-21 Thread Yin Huai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yin Huai updated SPARK-6012:

Assignee: (was: Yin Huai)

 Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered 
 operator
 --

 Key: SPARK-6012
 URL: https://issues.apache.org/jira/browse/SPARK-6012
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.1
Reporter: Max Seiden
Priority: Critical

 h3. Summary
 I've found that a deadlock occurs when asking for the partitions from a 
 SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs 
 when a child RDDs asks the DAGScheduler for preferred partition locations 
 (which locks the scheduler) and eventually hits the #execute() of the 
 TakeOrdered operator, which submits tasks but is blocked when it also tries 
 to get preferred locations (in a separate thread). It seems like the 
 TakeOrdered op's #execute() method should not actually submit a task (it is 
 calling #executeCollect() and creating a new RDD) and should instead stay 
 more true to the comment a logically apply a Limit on top of a Sort. 
 In my particular case, I am forcing a repartition of a SchemaRDD with a 
 terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into 
 play.
 h3. Stack Traces
 h4. Task Submission
 {noformat}
 main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() 
 [0x00010ed5e000]
java.lang.Thread.State: WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 - waiting on 0x0007c4c239b8 (a 
 org.apache.spark.scheduler.JobWaiter)
 at java.lang.Object.wait(Object.java:503)
 at 
 org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
 - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter)
 at 
 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390)
 at org.apache.spark.rdd.RDD.reduce(RDD.scala:884)
 at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161)
 at 
 org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183)
 at 
 org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
 - locked 0x0007c36ce038 (a 
 org.apache.spark.sql.hive.HiveContext$$anon$7)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
 at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278)
 at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at org.apache.spark.ShuffleDependency.init(Dependency.scala:79)
 at 
 org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333)
 at 
 org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304)
 - locked 0x0007f55c2238 (a 
 org.apache.spark.scheduler.DAGScheduler)
 at 
 org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148)
 at 
 org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175)
 at 

[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator

2015-03-22 Thread Patrick Wendell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Wendell updated SPARK-6012:
---
Target Version/s: 1.4.0  (was: 1.3.1, 1.4.0)

 Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered 
 operator
 --

 Key: SPARK-6012
 URL: https://issues.apache.org/jira/browse/SPARK-6012
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.1
Reporter: Max Seiden
Priority: Critical

 h3. Summary
 I've found that a deadlock occurs when asking for the partitions from a 
 SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs 
 when a child RDDs asks the DAGScheduler for preferred partition locations 
 (which locks the scheduler) and eventually hits the #execute() of the 
 TakeOrdered operator, which submits tasks but is blocked when it also tries 
 to get preferred locations (in a separate thread). It seems like the 
 TakeOrdered op's #execute() method should not actually submit a task (it is 
 calling #executeCollect() and creating a new RDD) and should instead stay 
 more true to the comment a logically apply a Limit on top of a Sort. 
 In my particular case, I am forcing a repartition of a SchemaRDD with a 
 terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into 
 play.
 h3. Stack Traces
 h4. Task Submission
 {noformat}
 main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() 
 [0x00010ed5e000]
java.lang.Thread.State: WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 - waiting on 0x0007c4c239b8 (a 
 org.apache.spark.scheduler.JobWaiter)
 at java.lang.Object.wait(Object.java:503)
 at 
 org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
 - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter)
 at 
 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390)
 at org.apache.spark.rdd.RDD.reduce(RDD.scala:884)
 at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161)
 at 
 org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183)
 at 
 org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
 - locked 0x0007c36ce038 (a 
 org.apache.spark.sql.hive.HiveContext$$anon$7)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
 at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278)
 at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at org.apache.spark.ShuffleDependency.init(Dependency.scala:79)
 at 
 org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333)
 at 
 org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304)
 - locked 0x0007f55c2238 (a 
 org.apache.spark.scheduler.DAGScheduler)
 at 
 org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148)
 at 
 

[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator

2015-03-22 Thread Patrick Wendell (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Wendell updated SPARK-6012:
---
Target Version/s: 1.3.1, 1.4.0  (was: 1.4.0)

 Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered 
 operator
 --

 Key: SPARK-6012
 URL: https://issues.apache.org/jira/browse/SPARK-6012
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.1
Reporter: Max Seiden
Priority: Critical

 h3. Summary
 I've found that a deadlock occurs when asking for the partitions from a 
 SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs 
 when a child RDDs asks the DAGScheduler for preferred partition locations 
 (which locks the scheduler) and eventually hits the #execute() of the 
 TakeOrdered operator, which submits tasks but is blocked when it also tries 
 to get preferred locations (in a separate thread). It seems like the 
 TakeOrdered op's #execute() method should not actually submit a task (it is 
 calling #executeCollect() and creating a new RDD) and should instead stay 
 more true to the comment a logically apply a Limit on top of a Sort. 
 In my particular case, I am forcing a repartition of a SchemaRDD with a 
 terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into 
 play.
 h3. Stack Traces
 h4. Task Submission
 {noformat}
 main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() 
 [0x00010ed5e000]
java.lang.Thread.State: WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 - waiting on 0x0007c4c239b8 (a 
 org.apache.spark.scheduler.JobWaiter)
 at java.lang.Object.wait(Object.java:503)
 at 
 org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
 - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter)
 at 
 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390)
 at org.apache.spark.rdd.RDD.reduce(RDD.scala:884)
 at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161)
 at 
 org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183)
 at 
 org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
 - locked 0x0007c36ce038 (a 
 org.apache.spark.sql.hive.HiveContext$$anon$7)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
 at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278)
 at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at org.apache.spark.ShuffleDependency.init(Dependency.scala:79)
 at 
 org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333)
 at 
 org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304)
 - locked 0x0007f55c2238 (a 
 org.apache.spark.scheduler.DAGScheduler)
 at 
 org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148)
 at 
 

[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator

2015-03-21 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-6012:

Target Version/s: 1.4.0

 Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered 
 operator
 --

 Key: SPARK-6012
 URL: https://issues.apache.org/jira/browse/SPARK-6012
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.1
Reporter: Max Seiden
Priority: Critical

 h3. Summary
 I've found that a deadlock occurs when asking for the partitions from a 
 SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs 
 when a child RDDs asks the DAGScheduler for preferred partition locations 
 (which locks the scheduler) and eventually hits the #execute() of the 
 TakeOrdered operator, which submits tasks but is blocked when it also tries 
 to get preferred locations (in a separate thread). It seems like the 
 TakeOrdered op's #execute() method should not actually submit a task (it is 
 calling #executeCollect() and creating a new RDD) and should instead stay 
 more true to the comment a logically apply a Limit on top of a Sort. 
 In my particular case, I am forcing a repartition of a SchemaRDD with a 
 terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into 
 play.
 h3. Stack Traces
 h4. Task Submission
 {noformat}
 main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() 
 [0x00010ed5e000]
java.lang.Thread.State: WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 - waiting on 0x0007c4c239b8 (a 
 org.apache.spark.scheduler.JobWaiter)
 at java.lang.Object.wait(Object.java:503)
 at 
 org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
 - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter)
 at 
 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390)
 at org.apache.spark.rdd.RDD.reduce(RDD.scala:884)
 at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161)
 at 
 org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183)
 at 
 org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
 - locked 0x0007c36ce038 (a 
 org.apache.spark.sql.hive.HiveContext$$anon$7)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
 at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278)
 at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at org.apache.spark.ShuffleDependency.init(Dependency.scala:79)
 at 
 org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333)
 at 
 org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304)
 - locked 0x0007f55c2238 (a 
 org.apache.spark.scheduler.DAGScheduler)
 at 
 org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148)
 at 
 org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175)
   

[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator

2015-02-25 Thread Max Seiden (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Seiden updated SPARK-6012:
--
Summary: Deadlock when asking for partitions from CoalescedRDD on top of a 
TakeOrdered operator  (was: Deadlock when asking for SchemaRDD partitions with 
TakeOrdered operator)

 Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered 
 operator
 --

 Key: SPARK-6012
 URL: https://issues.apache.org/jira/browse/SPARK-6012
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.1
Reporter: Max Seiden
Priority: Critical

 h3. Summary
 I've found that a deadlock occurs when asking for the partitions from a 
 SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs 
 when a child RDDs asks the DAGScheduler for preferred partition locations 
 (which locks the scheduler) and eventually hits the #execute() of the 
 TakeOrdered operator, which submits tasks but is blocked when it also tries 
 to get preferred locations (in a separate thread). It seems like the 
 TakeOrdered op's #execute() method should not actually submit a task (it is 
 calling #executeCollect() and creating a new RDD) and should instead stay 
 more true to the comment a logically apply a Limit on top of a Sort. 
 In my particular case, I am forcing a repartition of a SchemaRDD with a 
 terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into 
 play.
 h3. Stack Traces
 h4. Task Submission
 {noformat}
 main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() 
 [0x00010ed5e000]
java.lang.Thread.State: WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 - waiting on 0x0007c4c239b8 (a 
 org.apache.spark.scheduler.JobWaiter)
 at java.lang.Object.wait(Object.java:503)
 at 
 org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
 - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter)
 at 
 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390)
 at org.apache.spark.rdd.RDD.reduce(RDD.scala:884)
 at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161)
 at 
 org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183)
 at 
 org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
 - locked 0x0007c36ce038 (a 
 org.apache.spark.sql.hive.HiveContext$$anon$7)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
 at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278)
 at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at org.apache.spark.ShuffleDependency.init(Dependency.scala:79)
 at 
 org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333)
 at 
 org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304)
 - locked 0x0007f55c2238 (a 
 org.apache.spark.scheduler.DAGScheduler)
 at