venkata91 commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r656785629



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -148,6 +153,18 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
     }
   }
 
+  def resetShuffleMergeState(): Unit = {
+    _shuffleMergeEnabled = canShuffleMergeBeEnabled()
+    _shuffleMergedFinalized = false
+    mergerLocs = Nil

Review comment:
       Since we are anyway going to recompute the shuffle data in the case of 
indeterminate stage retries, we might as well fetch the latest merger locations 
right?

##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -122,6 +119,14 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
    */
   private[this] var _shuffleMergedFinalized: Boolean = false
 
+  /**
+   * shuffleSequenceId is used to give temporal ordering to the executions of 
a ShuffleDependency.
+   * This is required in order to handle indeterministic stage retries for 
push-based shuffle.
+   */
+  private[this] var nextShuffleSequenceId: Int = 0
+
+  def shuffleSequenceId: Int = nextShuffleSequenceId

Review comment:
       Yes, will handle it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to