HeartSaVioR commented on a change in pull request #35574:
URL: https://github.com/apache/spark/pull/35574#discussion_r811472916
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression],
numPartitions: Int)
expressions.length == h.expressions.length &&
expressions.zip(h.expressions).forall {
case (l, r) => l.semanticEquals(r)
}
- case ClusteredDistribution(requiredClustering, _) =>
- expressions.forall(x =>
requiredClustering.exists(_.semanticEquals(x)))
+ case c @ ClusteredDistribution(requiredClustering, _) =>
+ if (SQLConf.get.requireAllClusterKeysForHashPartition) {
Review comment:
OK, 100% agree with that the class name does not tell the behavior very
clear, especially there are both ClusteredDistribution and
HashClusteredDistribution. In the other hands, the classdocs were very clear
for both classes hence I didn't have to look into HashPartitioning.satisfy when
figuring out the problem on required clustered distribution for stateful
operator. But I agree everyone has a different view, so both can be right.
The concern comes from the fact that the unique functionality in
HashClusteredDistribution is lost when unifying the classes. I'm not strongly
concerning that we need to bring back HashClusteredDistribution. I'm concerning
that requiring full keys is a valid requirement we shouldn't simply drop, at
least I see from inputs.
> They have to first remember that these two configs exist, understand the
differences between the two distributions and then write code to choose them
accordingly, while in most cases it should just be transparent.
Seems like you'd like to say these are exceptional cases, but I'd wonder how
we can say "exceptional cases" given we had to make exceptions two times in a
couple of months, and the impact of latter is quite broader (not pinpointing
the scope). I'd rather say it is no longer exceptional case.
The needs of requiring full grouping keys doesn't seem to be trivial. In the
comment https://github.com/apache/spark/pull/35552#issuecomment-1045149556,
@sigmod pointed out the valid case, and said AQE can't kick in because they are
in the same stage. And the reason they are in the same stage is due of loose
requirement of ClusteredDistribution. We can't simply prey AQE to deal with
this nicely, at least we need to split out stages and ask AQE to handle the
proper number of partitions.
What we make exceptions are all about the requirement of
ClusteredDistribution. There is no problem for HashPartitioning. The reason we
fix HashPartitioning is just due to the design that we use `satisfy`. Limiting
the change to Partitioning and saying "We change nothing on
ClusteredDistribution" doesn't seem right to me. We are technically changing
ClusteredDistribution, and we need to make it clear. Otherwise this is also
going to be an undocumented one. Instead of just documenting, I prefer having
the flag explicitly so that any partitioning would know about the detailed
requirement very clear. You don't need to remember the config when you work on
DataSourcePartitioning. ClusteredDistribution will remember it for you.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression],
numPartitions: Int)
expressions.length == h.expressions.length &&
expressions.zip(h.expressions).forall {
case (l, r) => l.semanticEquals(r)
}
- case ClusteredDistribution(requiredClustering, _) =>
- expressions.forall(x =>
requiredClustering.exists(_.semanticEquals(x)))
+ case c @ ClusteredDistribution(requiredClustering, _) =>
+ if (SQLConf.get.requireAllClusterKeysForHashPartition) {
Review comment:
OK, 100% agree with that the class name does not tell the behavior very
clear, especially there are both ClusteredDistribution and
HashClusteredDistribution. In the other hands, the classdocs were very clear
for both classes hence I didn't have to look into HashPartitioning.satisfy when
figuring out the problem on required clustered distribution for stateful
operator. But I agree everyone has a different view, so both can be right.
The concern comes from the fact that the unique functionality in
HashClusteredDistribution is lost when unifying the classes. I'm not strongly
concerning that we need to bring back HashClusteredDistribution. I'm concerning
that requiring full keys is a valid requirement we shouldn't simply drop, at
least I see from inputs.
> They have to first remember that these two configs exist, understand the
differences between the two distributions and then write code to choose them
accordingly, while in most cases it should just be transparent.
Seems like you'd like to say these are exceptional cases, but I'd wonder how
we can say "exceptional cases" given we had to make exceptions two times in a
couple of months, and the impact of latter is quite broader (not pinpointing
the scope). I'd rather say it is no longer exceptional case.
The needs of requiring full grouping keys doesn't seem to be trivial. In the
comment https://github.com/apache/spark/pull/35552#issuecomment-1045149556,
@sigmod pointed out the valid case, and said AQE can't kick in because they are
in the same stage. And the reason they are in the same stage is due of loose
requirement of ClusteredDistribution. We can't simply prey AQE to deal with
this nicely, at least we need to split out stages and ask AQE to handle the
proper number of partitions.
What we make exceptions are all about the requirement of
ClusteredDistribution. There is no problem for HashPartitioning. The reason we
fix HashPartitioning is just due to the design that we use `satisfy`. Limiting
the change to Partitioning and saying "We change nothing on
ClusteredDistribution" doesn't seem right to me. We are technically changing
ClusteredDistribution, and we need to make it clear. Otherwise this is also
going to be an undocumented one. Instead of just documenting, I prefer having
the flag explicitly so that any partitioning would know about the detailed
requirement very clear. You don't need to remember the config when you work on
DataSourcePartitioning. ClusteredDistribution will provide it for you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]