sunchao commented on code in PR #42306:
URL: https://github.com/apache/spark/pull/42306#discussion_r1305837139


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1510,6 +1510,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    
buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")

Review Comment:
   Maybe we should also mention that this also requires 
`spark.sql.requireAllClusterKeysForDistribution` to be `false` as well, even 
though by default it is already false.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -674,7 +711,8 @@ case class HashShuffleSpec(
 
 case class KeyGroupedShuffleSpec(
     partitioning: KeyGroupedPartitioning,
-    distribution: ClusteredDistribution) extends ShuffleSpec {
+    distribution: ClusteredDistribution,
+    joinKeyPositions: Option[Seq[Int]] = None) extends ShuffleSpec {

Review Comment:
   we can add some comments for `KeyGroupedShuffleSpec` to explain what is this 
for, otherwise it's a bit hard to understand.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -344,7 +344,14 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition 
expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
+
+            if (SQLConf.get.getConf(
+              SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)) {
+              requiredClustering.exists(x => 
attributes.exists(_.semanticEquals(x))) &&
+                expressions.forall(_.collectLeaves().size == 1)

Review Comment:
   this deserves some comments since otherwise it's a bit confusing why we need 
it.



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala:
##########
@@ -1276,4 +1279,266 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
       }
     }
   }
+
+  test("SPARK-44647: test join key is subset of cluster key " +
+      "with push values and partially-clustered") {
+    val table1 = "tab1e1"
+    val table2 = "table2"
+    val partition = Array(identity("id"), identity("data"))
+    createTable(table1, schema, partition)
+    sql(s"INSERT INTO testcat.ns.$table1 VALUES " +
+        "(1, 'aa', cast('2020-01-01' as timestamp)), " +
+        "(2, 'bb', cast('2020-01-01' as timestamp)), " +
+        "(2, 'cc', cast('2020-01-01' as timestamp)), " +
+        "(3, 'dd', cast('2020-01-01' as timestamp)), " +
+        "(3, 'dd', cast('2020-01-01' as timestamp)), " +
+        "(3, 'ee', cast('2020-01-01' as timestamp)), " +
+        "(3, 'ee', cast('2020-01-01' as timestamp))")
+
+    createTable(table2, schema, partition)
+    sql(s"INSERT INTO testcat.ns.$table2 VALUES " +
+        "(4, 'zz', cast('2020-01-01' as timestamp)), " +
+        "(4, 'zz', cast('2020-01-01' as timestamp)), " +
+        "(3, 'yy', cast('2020-01-01' as timestamp)), " +
+        "(3, 'yy', cast('2020-01-01' as timestamp)), " +
+        "(3, 'xx', cast('2020-01-01' as timestamp)), " +
+        "(3, 'xx', cast('2020-01-01' as timestamp)), " +
+        "(2, 'ww', cast('2020-01-01' as timestamp))")
+
+    Seq(true, false).foreach { pushDownValues =>
+      Seq(true, false).foreach { partiallyClustered =>
+        Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys =>
+
+          withSQLConf(
+            SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+            SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString,
+            SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key 
->
+                partiallyClustered.toString,
+            SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key 
->
+                allowJoinKeysSubsetOfPartitionKeys.toString) {
+
+            val df = sql("SELECT t1.id AS id, t1.data AS t1data, t2.data AS 
t2data " +
+                s"FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 " +
+                "ON t1.id = t2.id ORDER BY t1.id, t1data, t2data")
+
+            // Currently SPJ for case where join key not same as partition key
+            // only supported when push-part-values enabled

Review Comment:
   nit: the comment is out-dated



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -355,7 +355,14 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition 
expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
+
+            if (SQLConf.get.getConf(
+              SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)) {
+              requiredClustering.forall(x => 
attributes.exists(_.semanticEquals(x))) &&
+                  expressions.forall(_.collectLeaves().size == 1)

Review Comment:
   this should be guaranteed currently - it might be better to have this 
invariant check somewhere else like when constructing a 
`KeyGroupedPartitioning`, but OK to leave it here for now



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -355,7 +355,14 @@ case class KeyGroupedPartitioning(
           } else {
             // We'll need to find leaf attributes from the partition 
expressions first.
             val attributes = expressions.flatMap(_.collectLeaves())
-            attributes.forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
+
+            if (SQLConf.get.getConf(

Review Comment:
   nit: we can just use 
`SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys` - it's shorter



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1530,6 +1530,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
+    
buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
+      .doc("Whether to allow storage-partition join in the case where join 
keys are" +
+        "a subset of the partition keys of the source tables.  At planning 
time, " +
+        "Spark will group the partitions by only those keys that are in the 
join keys." +
+        "This is currently enabled only if 
spark.sql.sources.v2.bucketing.pushPartValues.enabled " +

Review Comment:
   nit: this is out-dated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to