This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0be132c  [SPARK-38124][SS][FOLLOWUP] Document the current challenge on 
fixing distribution of stateful operator
0be132c is described below

commit 0be132c128e80bc9d866001a64cb3f6331c85b1e
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Tue Feb 15 11:47:42 2022 +0900

    [SPARK-38124][SS][FOLLOWUP] Document the current challenge on fixing 
distribution of stateful operator
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add the context of current challenge on fixing 
distribution of stateful operator, even the distribution is a sort of "broken" 
now.
    
    This PR addresses the review comment 
https://github.com/apache/spark/pull/35419#discussion_r801343068
    
    ### Why are the changes needed?
    
    In SPARK-38124 we figured out the existing long-standing problem in 
stateful operator, but it is not easy to fix since the fix may break the 
existing query if the fix is not carefully designed. Anyone should also be 
pretty much careful when touching the required distribution. We want to 
document this explicitly to help others to be careful whenever someone is 
around the codebase.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Code comment only changes.
    
    Closes #35512 from HeartSaVioR/SPARK-38124-followup.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/catalyst/plans/physical/partitioning.scala     |  8 ++++++++
 .../streaming/FlatMapGroupsWithStateExec.scala         |  3 +++
 .../sql/execution/streaming/statefulOperators.scala    | 18 +++++++++++++++++-
 3 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 4418d32..5342c8e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -101,6 +101,14 @@ case class ClusteredDistribution(
  * Since this distribution relies on [[HashPartitioning]] on the physical 
partitioning of the
  * stateful operator, only [[HashPartitioning]] (and HashPartitioning in
  * [[PartitioningCollection]]) can satisfy this distribution.
+ *
+ * NOTE: This is applied only to stream-stream join as of now. For other 
stateful operators, we
+ * have been using ClusteredDistribution, which could construct the physical 
partitioning of the
+ * state in different way (ClusteredDistribution requires relaxed condition 
and multiple
+ * partitionings can satisfy the requirement.) We need to construct the way to 
fix this with
+ * minimizing possibility to break the existing checkpoints.
+ *
+ * TODO(SPARK-38204): address the issue explained in above note.
  */
 case class StatefulOpClusteredDistribution(
     expressions: Seq[Expression],
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index a00a622..93ed591 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -93,6 +93,9 @@ case class FlatMapGroupsWithStateExec(
    * to have the same grouping so that the data are co-lacated on the same 
task.
    */
   override def requiredChildDistribution: Seq[Distribution] = {
+    // NOTE: Please read through the NOTE on the classdoc of 
StatefulOpClusteredDistribution
+    // before making any changes.
+    // TODO(SPARK-38204)
     ClusteredDistribution(groupingAttributes, stateInfo.map(_.numPartitions)) 
::
     ClusteredDistribution(initialStateGroupAttrs, 
stateInfo.map(_.numPartitions)) ::
       Nil
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 3431823..3ab2ad4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -334,6 +334,9 @@ case class StateStoreRestoreExec(
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
   override def requiredChildDistribution: Seq[Distribution] = {
+    // NOTE: Please read through the NOTE on the classdoc of 
StatefulOpClusteredDistribution
+    // before making any changes.
+    // TODO(SPARK-38204)
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
@@ -493,6 +496,9 @@ case class StateStoreSaveExec(
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
   override def requiredChildDistribution: Seq[Distribution] = {
+    // NOTE: Please read through the NOTE on the classdoc of 
StatefulOpClusteredDistribution
+    // before making any changes.
+    // TODO(SPARK-38204)
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
@@ -573,6 +579,9 @@ case class SessionWindowStateStoreRestoreExec(
   }
 
   override def requiredChildDistribution: Seq[Distribution] = {
+    // NOTE: Please read through the NOTE on the classdoc of 
StatefulOpClusteredDistribution
+    // before making any changes.
+    // TODO(SPARK-38204)
     ClusteredDistribution(keyWithoutSessionExpressions, 
stateInfo.map(_.numPartitions)) :: Nil
   }
 
@@ -684,6 +693,9 @@ case class SessionWindowStateStoreSaveExec(
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
   override def requiredChildDistribution: Seq[Distribution] = {
+    // NOTE: Please read through the NOTE on the classdoc of 
StatefulOpClusteredDistribution
+    // before making any changes.
+    // TODO(SPARK-38204)
     ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: 
Nil
   }
 
@@ -741,8 +753,12 @@ case class StreamingDeduplicateExec(
   extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
 
   /** Distribute by grouping attributes */
-  override def requiredChildDistribution: Seq[Distribution] =
+  override def requiredChildDistribution: Seq[Distribution] = {
+    // NOTE: Please read through the NOTE on the classdoc of 
StatefulOpClusteredDistribution
+    // before making any changes.
+    // TODO(SPARK-38204)
     ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: 
Nil
+  }
 
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to