twalthr commented on a change in pull request #18152:
URL: https://github.com/apache/flink/pull/18152#discussion_r772170828



##########
File path: 
flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java
##########
@@ -169,35 +174,61 @@ public AbstractArrowPythonAggregateFunctionOperator 
getTestOperator(
             RowType outputType,
             int[] groupingSet,
             int[] udafInputOffsets) {
+        RowType userDefinedFunctionInputType =
+                (RowType) Projection.of(udafInputOffsets).project(inputType);
+        RowType userDefinedFunctionOutputType =
+                new RowType(
+                        outputType
+                                .getFields()
+                                .subList(groupingSet.length, 
outputType.getFieldCount()));

Review comment:
       maybe another useful constructor function for `Projection` like 
`Projection.range()` or so?

##########
File path: 
flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java
##########
@@ -216,57 +221,85 @@ public RowType getOutputType() {
     public AbstractArrowPythonAggregateFunctionOperator getTestOperator(
             Configuration config,
             PythonFunctionInfo[] pandasAggregateFunctions,
-            RowType inputType,
-            RowType outputType,
+            RowType inputRowType,
+            RowType outputRowType,
             int[] groupingSet,
             int[] udafInputOffsets) {
+        RowType userDefinedFunctionInputType =
+                (RowType) 
Projection.of(udafInputOffsets).project(inputRowType);
+        RowType userDefinedFunctionOutputType =
+                new RowType(
+                        outputRowType
+                                .getFields()
+                                .subList(
+                                        inputRowType.getFieldCount(),
+                                        outputRowType.getFieldCount()));
+
         return new 
PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator(
                 config,
                 pandasAggregateFunctions,
-                inputType,
-                outputType,
+                inputRowType,
+                userDefinedFunctionInputType,
+                userDefinedFunctionOutputType,
                 new long[] {0L, Long.MIN_VALUE},
                 new long[] {0L, 2L},
                 new boolean[] {true, false},
                 new int[] {0},
-                groupingSet,
-                groupingSet,
-                udafInputOffsets,
                 3,
-                true);
+                true,
+                ProjectionCodeGenerator.generateProjection(
+                        CodeGeneratorContext.apply(new TableConfig()),
+                        "UdafInputProjection",
+                        inputRowType,
+                        userDefinedFunctionInputType,
+                        udafInputOffsets),
+                ProjectionCodeGenerator.generateProjection(
+                        CodeGeneratorContext.apply(new TableConfig()),
+                        "GroupKey",
+                        inputRowType,
+                        (RowType) 
Projection.of(groupingSet).project(inputRowType),
+                        groupingSet),
+                ProjectionCodeGenerator.generateProjection(
+                        CodeGeneratorContext.apply(new TableConfig()),
+                        "GroupSet",
+                        inputRowType,
+                        (RowType) 
Projection.of(groupingSet).project(inputRowType),
+                        groupingSet));
     }
 
     private static class 
PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator
             extends BatchArrowPythonOverWindowAggregateFunctionOperator {
 
-        PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator(
+        public PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator(
                 Configuration config,
                 PythonFunctionInfo[] pandasAggFunctions,
                 RowType inputType,
-                RowType outputType,
+                RowType userDefinedFunctionInputType,
+                RowType userDefinedFunctionOutputType,
                 long[] lowerBoundary,
                 long[] upperBoundary,
-                boolean[] isRangeWindow,
+                boolean[] isRangeWindows,
                 int[] aggWindowIndex,
-                int[] groupKey,
-                int[] groupingSet,
-                int[] udafInputOffsets,
                 int inputTimeFieldIndex,
-                boolean asc) {
+                boolean asc,
+                GeneratedProjection inputGeneratedProjection,
+                GeneratedProjection groupKeyGeneratedProjection,
+                GeneratedProjection groupSetGeneratedProjection) {
             super(
                     config,
                     pandasAggFunctions,
                     inputType,
-                    outputType,
+                    userDefinedFunctionInputType,
+                    userDefinedFunctionOutputType,
                     lowerBoundary,
                     upperBoundary,
-                    isRangeWindow,
+                    isRangeWindows,

Review comment:
       typo

##########
File path: 
flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java
##########
@@ -317,54 +322,82 @@ public RowType getOutputType() {
     public AbstractArrowPythonAggregateFunctionOperator getTestOperator(
             Configuration config,
             PythonFunctionInfo[] pandasAggregateFunctions,
-            RowType inputType,
-            RowType outputType,
+            RowType inputRowType,
+            RowType outputRowType,
             int[] groupingSet,
             int[] udafInputOffsets) {
+
+        RowType userDefinedFunctionInputType =
+                (RowType) 
Projection.of(udafInputOffsets).project(inputRowType);
+        RowType userDefinedFunctionOutputType =
+                new RowType(
+                        outputRowType
+                                .getFields()
+                                .subList(groupingSet.length, 
outputRowType.getFieldCount() - 2));
+
         // SlidingWindow(10000L, 5000L)
         return new 
PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator(
                 config,
                 pandasAggregateFunctions,
-                inputType,
-                outputType,
+                inputRowType,
+                userDefinedFunctionInputType,
+                userDefinedFunctionOutputType,
                 3,
                 100000,
                 10000L,
                 5000L,
                 new int[] {0, 1},
-                groupingSet,
-                groupingSet,
-                udafInputOffsets);
+                ProjectionCodeGenerator.generateProjection(
+                        CodeGeneratorContext.apply(new TableConfig()),
+                        "UdafInputProjection",
+                        inputRowType,
+                        userDefinedFunctionInputType,
+                        udafInputOffsets),
+                ProjectionCodeGenerator.generateProjection(
+                        CodeGeneratorContext.apply(new TableConfig()),
+                        "GroupKey",
+                        inputRowType,
+                        (RowType) 
Projection.of(groupingSet).project(inputRowType),
+                        groupingSet),
+                ProjectionCodeGenerator.generateProjection(
+                        CodeGeneratorContext.apply(new TableConfig()),
+                        "GroupSet",
+                        inputRowType,
+                        (RowType) 
Projection.of(groupingSet).project(inputRowType),
+                        groupingSet));
     }
 
     private static class 
PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator
             extends BatchArrowPythonGroupWindowAggregateFunctionOperator {
-        PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator(
+
+        public PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator(

Review comment:
       why `public`?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
##########
@@ -53,17 +51,11 @@
     /** The input logical type. */
     protected final RowType inputType;
 
-    /** The output logical type. */
-    protected final RowType outputType;
-
-    /** The offsets of user-defined function inputs. */
-    protected final int[] userDefinedFunctionInputOffsets;
-
     /** The user-defined function input logical type. */
-    protected transient RowType userDefinedFunctionInputType;
+    protected final RowType userDefinedFunctionInputType;
 
     /** The user-defined function output logical type. */
-    protected transient RowType userDefinedFunctionOutputType;
+    protected final RowType userDefinedFunctionOutputType;

Review comment:
       very nit: can we shorten the names, it is difficult to read at various 
locations
   ```
   userDefinedFunctionOutputType -> udfOutputType
   userDefinedFunctionInputType -> udfInputType
   udafInputGeneratedProjection -> udafInputProjection
   groupKeyGeneratedProjection -> groupKeyProjection
   userDefinedFunctionInputOffsets -> udfInputOffsets
   ```
   
   

##########
File path: 
flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java
##########
@@ -109,38 +114,53 @@ public AbstractArrowPythonAggregateFunctionOperator 
getTestOperator(
             RowType outputType,
             int[] groupingSet,
             int[] udafInputOffsets) {
+        RowType userDefinedFunctionInputType =
+                (RowType) Projection.of(udafInputOffsets).project(inputType);
+        RowType userDefinedFunctionOutputType =
+                new RowType(
+                        outputType
+                                .getFields()
+                                .subList(inputType.getFieldCount(), 
outputType.getFieldCount()));
+        GeneratedProjection generatedProjection =

Review comment:
       nit: inline this into the parameter for consistency? currently the PR 
does some with a local variable and most of them inline as a parameter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to