sunchao commented on a change in pull request #24559:
URL: https://github.com/apache/spark/pull/24559#discussion_r596497034



##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/AggregateFunction.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog.functions;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * Interface for a function that produces a result value by aggregating over 
multiple input rows.
+ * <p>
+ * The JVM type of result values produced by this function must be the type 
used by Spark's
+ * InternalRow API for the {@link DataType SQL data type} returned by {@link 
#resultType()}.
+ * <p>
+ * Most implementations should also implement {@link PartialAggregateFunction} 
so that Spark can
+ * partially aggregate and shuffle intermediate results, instead of shuffling 
all rows for an
+ * aggregate. This reduces the impact of data skew and the amount of data 
shuffled to produce the
+ * result.
+ *
+ * @param <S> the JVM type for the aggregation's intermediate state
+ * @param <R> the JVM type of result values
+ */
+public interface AggregateFunction<S, R> extends BoundFunction {
+
+  /**
+   * Initialize state for an aggregation.
+   * <p>
+   * This method is called one or more times for every group of values to 
initialize intermediate
+   * aggregation state. More than one intermediate aggregation state variable 
may be used when the
+   * aggregation is run in parallel tasks.
+   * <p>
+   * The object returned may passed to {@link #update(Object, InternalRow)},
+   * and {@link #produceResult(Object)}. Implementations that return null must 
support null state
+   * passed into all other methods.
+   *
+   * @return a state instance or null
+   */
+  S newAggregationState();
+
+  /**
+   * Update the aggregation state with a new row.
+   * <p>
+   * This is called for each row in a group to update an intermediate 
aggregation state.
+   *
+   * @param state intermediate aggregation state
+   * @param input an input row
+   * @return updated aggregation state
+   */
+  S update(S state, InternalRow input);
+
+  /**
+   * Produce the aggregation result based on intermediate state.
+   *
+   * @param state intermediate aggregation state
+   * @return a result value
+   */
+  R produceResult(S state);
+

Review comment:
       One issue I found with the `Serializable` approach is that currently in 
Spark the `SerializerInstance` as well as `ExpressionEncoder` all require 
`ClassTag`, which is not available from Java. This makes it hard to reuse the 
existing machinery in Spark for the serialization/deserialization work. Another 
issue, which is reflected by the CI failure, is that simple classes such as:
   ```scala
   class IntAverage extends AggregateFunction[(Int, Int), Int]
   ```
   will not work out-of-box, as `(Int, Int)` doesn't implement `Serializable`
   
   The `ClassTag` constraint for `SerializerInstance` was added in #700 for 
supporting Scala Pickling as one of the serializer implementation but seems the 
PR never ended in Spark, so not quite sure if it is still needed today. Thanks 
@viirya for having a offline discussion with me on this.
   
   Because of this, I'm wondering if it makes sense to replace the 
`Serializable` with something else, such as another method:
   ```java
   Encoder<S> encoder();
   ```
   This can be implemented pretty easily by Spark users with 
[`Encoders`](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala).
 The approach is similar to the `udaf` API today. For Scala users, we can 
optionally provide another version of `AggregateFunction` in Scala with 
implicit, so users don't need to do this.
   
   Would like to hear your opinion on this @rdblue @cloud-fan 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to