[GitHub] spark pull request #21927: [SPARK-24820][SPARK-24821][Core] Fail fast when s...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21927#discussion_r208123913 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1946,4 +1990,11 @@ private[spark] object DAGScheduler { // Number of consecutive stage attempts allowed before a stage is aborted val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4 + + // Error message when running a barrier stage that have unsupported RDD chain pattern. + val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN = +"[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " + + "RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " + + "partitions from the resulting RDD (eg. union()/coalesce()/first()/PartitionPruningRDD);\n" + --- End diff -- collect() is expensive though? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21927: [SPARK-24820][SPARK-24821][Core] Fail fast when s...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21927 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21927: [SPARK-24820][SPARK-24821][Core] Fail fast when s...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21927#discussion_r207249848 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -340,6 +340,22 @@ class DAGScheduler( } } + /** + * Check to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The + * following patterns are not supported: + * 1. Ancestor RDDs that have different number of partitions from the resulting RDD (eg. + * union()/coalesce()/first()/PartitionPruningRDD); --- End diff -- but anyway, I guess its also fine to not support this case, I was just trying to understand myself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21927: [SPARK-24820][SPARK-24821][Core] Fail fast when s...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21927#discussion_r207249569 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -340,6 +340,22 @@ class DAGScheduler( } } + /** + * Check to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The + * following patterns are not supported: + * 1. Ancestor RDDs that have different number of partitions from the resulting RDD (eg. + * union()/coalesce()/first()/PartitionPruningRDD); --- End diff -- OK I see that it'll be a different number of partitions, but conceptually it should be OK, right? the user just wants all tasks launched together, even if its a different number of tasks than the number of partitions in the original barrier rdd. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21927: [SPARK-24820][SPARK-24821][Core] Fail fast when s...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21927#discussion_r207108174 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -340,6 +340,22 @@ class DAGScheduler( } } + /** + * Check to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The + * following patterns are not supported: + * 1. Ancestor RDDs that have different number of partitions from the resulting RDD (eg. + * union()/coalesce()/first()/PartitionPruningRDD); + * 2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)). + */ + private def checkBarrierStageWithRDDChainPattern(rdd: RDD[_], numPartitions: Int): Unit = { --- End diff -- It would be nice to rename `numPartitions` to `numTasksInStage` (or a better name). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21927: [SPARK-24820][SPARK-24821][Core] Fail fast when s...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21927#discussion_r207108898 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1946,4 +1990,11 @@ private[spark] object DAGScheduler { // Number of consecutive stage attempts allowed before a stage is aborted val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4 + + // Error message when running a barrier stage that have unsupported RDD chain pattern. + val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN = +"[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " + + "RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " + + "partitions from the resulting RDD (eg. union()/coalesce()/first()/PartitionPruningRDD);\n" + --- End diff -- Please also list `take()`. It would be nice to provide a workaround for `first()` and `take()`: `barrierRdd.collect().head (scala), barrierRdd.collect()[0] (python)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21927: [SPARK-24820][SPARK-24821][Core] Fail fast when s...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21927#discussion_r207107889 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -340,6 +340,22 @@ class DAGScheduler( } } + /** + * Check to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The + * following patterns are not supported: + * 1. Ancestor RDDs that have different number of partitions from the resulting RDD (eg. + * union()/coalesce()/first()/PartitionPruningRDD); --- End diff -- `coalesce()` is not safe when shuffle is false because it may cause the number of tasks doesn't match the number of partitions for the RDD that uses barrier mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21927: [SPARK-24820][SPARK-24821][Core] Fail fast when s...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21927#discussion_r206985923 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -340,6 +340,22 @@ class DAGScheduler( } } + /** + * Check to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The + * following patterns are not supported: + * 1. Ancestor RDDs that have different number of partitions from the resulting RDD (eg. + * union()/coalesce()/first()/PartitionPruningRDD); --- End diff -- but coalesce should be OK, right? Is it just too fragile to allow coalesce while excluding the others? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org