sunchao commented on code in PR #45267:
URL: https://github.com/apache/spark/pull/45267#discussion_r1529047571


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A 'reducer' for output of user-defined functions.
+ *
+ * A user_defined function f_source(x) is 'reducible' on another user_defined 
function f_target(x),
+ * if there exists a 'reducer' r(x) such that r(f_source(x)) = f_target(x) for 
all input x.
+ * @param <I> reducer input type

Review Comment:
   nit: leave a blank line before the first `@param`



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A 'reducer' for output of user-defined functions.
+ *
+ * A user_defined function f_source(x) is 'reducible' on another user_defined 
function f_target(x),

Review Comment:
   nit: `user_defined` -> `user defined`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -846,6 +881,20 @@ case class KeyGroupedShuffleSpec(
   }
 }
 
+object KeyGroupedShuffleSpec {
+  def reducePartitionValue(row: InternalRow,
+                           expressions: Seq[Expression],

Review Comment:
   nit: indentation



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;

Review Comment:
   It's not a good idea to have `scala.Option` in a Java interface.



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala:
##########
@@ -85,6 +85,23 @@ object BucketFunction extends ScalarFunction[Int] {
   override def produceResult(input: InternalRow): Int = {
     (input.getLong(1) % input.getInt(0)).toInt
   }
+
+  override def reducer(func: ReducibleFunction[_, _],
+                       thisNumBuckets: Option[_],

Review Comment:
   nit: indentation



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -505,11 +506,28 @@ case class EnsureRequirements(
           }
         }
 
-        // Now we need to push-down the common partition key to the scan in 
each child
-        newLeft = populatePartitionValues(left, mergedPartValues, 
leftSpec.joinKeyPositions,
-          applyPartialClustering, replicateLeftSide)
-        newRight = populatePartitionValues(right, mergedPartValues, 
rightSpec.joinKeyPositions,
-          applyPartialClustering, replicateRightSide)
+        // in case of compatible but not identical partition expressions, we 
apply 'reduce'
+        // transforms to group one side's partitions as well as the common 
partition values
+        val leftReducers = leftSpec.reducers(rightSpec)
+        val rightReducers = rightSpec.reducers(leftSpec)
+
+        if (leftReducers.isDefined || rightReducers.isDefined) {

Review Comment:
   looks like we should be able to support the case where the number of buckets 
are not divisible but have a greatest common divisor? 
   such as `bucket(16, x)` vs `bucket(12, y)`? in this case, since the common 
divisor is 4, we can have reducers for both `x` and `y`, dividing inputs by 4 
and 3 respectively.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -527,25 +545,38 @@ case class EnsureRequirements(
         joinType == LeftAnti || joinType == LeftOuter
   }
 
-  // Populate the common partition values down to the scan nodes
-  private def populatePartitionValues(
+  // Populate the common partition information down to the scan nodes
+  private def populateCommonPartitionInfo(
       plan: SparkPlan,
       values: Seq[(InternalRow, Int)],
       joinKeyPositions: Option[Seq[Int]],
+      reducers: Option[Seq[Option[Reducer[_, _]]]],
       applyPartialClustering: Boolean,
       replicatePartitions: Boolean): SparkPlan = plan match {
     case scan: BatchScanExec =>
       scan.copy(
         spjParams = scan.spjParams.copy(
           commonPartitionValues = Some(values),
           joinKeyPositions = joinKeyPositions,
+          reducers = reducers,
           applyPartialClustering = applyPartialClustering,
           replicatePartitions = replicatePartitions
         )
       )
     case node =>
-      node.mapChildren(child => populatePartitionValues(
-        child, values, joinKeyPositions, applyPartialClustering, 
replicatePartitions))
+      node.mapChildren(child => populateCommonPartitionInfo(
+        child, values, joinKeyPositions, reducers, applyPartialClustering, 
replicatePartitions))
+  }
+
+  private def reduceCommonPartValues(commonPartValues: Seq[(InternalRow, Int)],
+                                     expressions: Seq[Expression],

Review Comment:
   nit: indentation



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ReducibleFunction.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+import scala.Option;
+
+/**
+ * Base class for user-defined functions that can be 'reduced' on another 
function.
+ *
+ * A function f_source(x) is 'reducible' on another function f_target(x) if
+ * there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) 
for all input x.
+ *
+ * <p>
+ * Examples:
+ * <ul>
+ *    <li>Bucket functions
+ *    <ul>
+ *        <li>f_source(x) = bucket(4, x)</li>
+ *        <li>f_target(x) = bucket(2, x)</li>
+ *        <li>r(x) = x / 2</li>
+ *    </ul>
+ *    <li>Date functions
+ *    <ul>
+ *        <li>f_source(x) = days(x)</li>
+ *        <li>f_target(x) = hours(x)</li>
+ *        <li>r(x) = x / 24</li>
+ *     </ul>
+ * </ul>
+ * @param <I> reducer function input type
+ * @param <O> reducer function output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface ReducibleFunction<I, O> {
+
+    /**
+     * If this function is 'reducible' on another function, return the {@link 
Reducer} function.
+     * <p>
+     * Example:
+     * <ul>
+     *     <li>this_function = bucket(4, x)
+     *     <li>other function = bucket(2, x)
+     * </ul>
+     * Invoke with arguments
+     * <ul>
+     *     <li>other = bucket</li>
+     *     <li>this param = Int(4)</li>
+     *     <li>other param = Int(2)</li>
+     * </ul>
+     * @param other the other function
+     * @param thisParam param for this function

Review Comment:
   It is unclear to me what should this be beyond the bucketing case. Should we 
add a separate method just for the special case of bucketing?
   
   ```java
       Option<Reducer<I, O>> reducer(ReducibleFunction<I, O> other);
   
       Option<Reducer<I, O>> bucketReducer(ReducibleFunction<I, O> other, int 
numBuckets,
                                     int otherNumBuckets);
   ```



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A 'reducer' for output of user-defined functions.

Review Comment:
   Maybe point out where it is used, i.e., `ReducibleFunction`. We can also add 
a `@see` pointing to it.



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A 'reducer' for output of user-defined functions.
+ *
+ * A user_defined function f_source(x) is 'reducible' on another user_defined 
function f_target(x),
+ * if there exists a 'reducer' r(x) such that r(f_source(x)) = f_target(x) for 
all input x.
+ * @param <I> reducer input type
+ * @param <O> reducer output type
+ * @since 4.0.0
+ */
+@Evolving
+public interface Reducer<I, O> {
+    O reduce(I arg1);

Review Comment:
   nit: `arg1` -> `arg`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -833,10 +834,44 @@ case class KeyGroupedShuffleSpec(
     (left, right) match {
       case (_: LeafExpression, _: LeafExpression) => true
       case (left: TransformExpression, right: TransformExpression) =>
-        left.isSameFunction(right)
+        if (SQLConf.get.v2BucketingPushPartValuesEnabled &&
+          !SQLConf.get.v2BucketingPartiallyClusteredDistributionEnabled &&
+          SQLConf.get.v2BucketingAllowCompatibleTransforms) {
+          left.isCompatible(right)
+        } else {
+          left.isSameFunction(right)
+        }
       case _ => false
     }
 
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle 
spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle 
spec if it is
+   * 'reducible' on the corresponding partition expression function of the 
other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition 
expression.
+   * A None value in the set indicates that the particular partition 
expression is not reducible
+   * on the corresponding expression on the other shuffle spec.
+   * <p>
+   * Returning none also indicates that none of the partition expressions can 
be reduced on the
+   * corresponding expression on the other shuffle spec.
+   */
+  def reducers(other: ShuffleSpec): Option[Seq[Option[Reducer[_, _]]]] = {

Review Comment:
   I think the input can only be `KeyGroupedShuffleSpec` so we can just use it 
instead of `ShuffleSpec`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -833,10 +834,44 @@ case class KeyGroupedShuffleSpec(
     (left, right) match {
       case (_: LeafExpression, _: LeafExpression) => true
       case (left: TransformExpression, right: TransformExpression) =>
-        left.isSameFunction(right)
+        if (SQLConf.get.v2BucketingPushPartValuesEnabled &&
+          !SQLConf.get.v2BucketingPartiallyClusteredDistributionEnabled &&
+          SQLConf.get.v2BucketingAllowCompatibleTransforms) {
+          left.isCompatible(right)
+        } else {
+          left.isSameFunction(right)
+        }
       case _ => false
     }
 
+  /**
+   * Return a set of [[Reducer]] for the partition expressions of this shuffle 
spec,
+   * on the partition expressions of another shuffle spec.
+   * <p>
+   * A [[Reducer]] exists for a partition expression function of this shuffle 
spec if it is
+   * 'reducible' on the corresponding partition expression function of the 
other shuffle spec.
+   * <p>
+   * If a value is returned, there must be one Option[[Reducer]] per partition 
expression.

Review Comment:
   nit: `Option[[Reducer]]` is not rendered properly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to