HeartSaVioR commented on a change in pull request #35552:
URL: https://github.com/apache/spark/pull/35552#discussion_r809736392
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -91,42 +91,42 @@ case class ClusteredDistribution(
}
/**
- * Represents the requirement of distribution on the stateful operator in
Structured Streaming.
+ * Represents data where tuples have been clustered according to the hash of
the given
+ * `expressions`. Since this distribution relies on [[HashPartitioning]] on
the physical
+ * partitioning, only [[HashPartitioning]] (and HashPartitioning in
[[PartitioningCollection]])
+ * can satisfy this distribution. When `requiredNumPartitions` is Some(1),
[[SinglePartition]]
+ * is essentially same as [[HashPartitioning]], so it can satisfy this
distribution as well.
*
- * Each partition in stateful operator initializes state store(s), which are
independent with state
- * store(s) in other partitions. Since it is not possible to repartition the
data in state store,
- * Spark should make sure the physical partitioning of the stateful operator
is unchanged across
- * Spark versions. Violation of this requirement may bring silent correctness
issue.
+ * This distribution is used majorly to represent the requirement of
distribution on the stateful
+ * operator in Structured Streaming, but this can be used for other cases as
well.
*
- * Since this distribution relies on [[HashPartitioning]] on the physical
partitioning of the
- * stateful operator, only [[HashPartitioning]] (and HashPartitioning in
- * [[PartitioningCollection]]) can satisfy this distribution.
- * When `_requiredNumPartitions` is 1, [[SinglePartition]] is essentially same
as
- * [[HashPartitioning]], so it can satisfy this distribution as well.
+ * NOTE 1: Each partition in stateful operator initializes state store(s),
which are independent
+ * with state store(s) in other partitions. Since it is not possible to
repartition the data in
+ * state store, Spark should make sure the physical partitioning of the
stateful operator is
+ * unchanged across Spark versions. Violation of this requirement may bring
silent correctness
+ * issue.
*
- * NOTE: This is applied only to stream-stream join as of now. For other
stateful operators, we
- * have been using ClusteredDistribution, which could construct the physical
partitioning of the
- * state in different way (ClusteredDistribution requires relaxed condition
and multiple
- * partitionings can satisfy the requirement.) We need to construct the way to
fix this with
- * minimizing possibility to break the existing checkpoints.
+ * NOTE 2: This is applied only to stream-stream join for stateful operators
as of now. For other
+ * stateful operators, we have been using ClusteredDistribution, which could
construct the physical
+ * partitioning of the state in different way (ClusteredDistribution requires
relaxed condition
+ * and multiple partitionings can satisfy the requirement.) We need to
construct the way to fix
+ * this with minimizing possibility to break the existing checkpoints.
*
- * TODO(SPARK-38204): address the issue explained in above note.
+ * TODO(SPARK-38204): address the issue explained in note 2.
*/
-case class StatefulOpClusteredDistribution(
+case class HashClusteredDistribution(
Review comment:
This and below lines basically restore the implementation of
HashClusteredDistribution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]