twalthr commented on a change in pull request #18152: URL: https://github.com/apache/flink/pull/18152#discussion_r772170828
########## File path: flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupAggregateFunctionOperatorTest.java ########## @@ -169,35 +174,61 @@ public AbstractArrowPythonAggregateFunctionOperator getTestOperator( RowType outputType, int[] groupingSet, int[] udafInputOffsets) { + RowType userDefinedFunctionInputType = + (RowType) Projection.of(udafInputOffsets).project(inputType); + RowType userDefinedFunctionOutputType = + new RowType( + outputType + .getFields() + .subList(groupingSet.length, outputType.getFieldCount())); Review comment: maybe another useful constructor function for `Projection` like `Projection.range()` or so? ########## File path: flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperatorTest.java ########## @@ -216,57 +221,85 @@ public RowType getOutputType() { public AbstractArrowPythonAggregateFunctionOperator getTestOperator( Configuration config, PythonFunctionInfo[] pandasAggregateFunctions, - RowType inputType, - RowType outputType, + RowType inputRowType, + RowType outputRowType, int[] groupingSet, int[] udafInputOffsets) { + RowType userDefinedFunctionInputType = + (RowType) Projection.of(udafInputOffsets).project(inputRowType); + RowType userDefinedFunctionOutputType = + new RowType( + outputRowType + .getFields() + .subList( + inputRowType.getFieldCount(), + outputRowType.getFieldCount())); + return new PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator( config, pandasAggregateFunctions, - inputType, - outputType, + inputRowType, + userDefinedFunctionInputType, + userDefinedFunctionOutputType, new long[] {0L, Long.MIN_VALUE}, new long[] {0L, 2L}, new boolean[] {true, false}, new int[] {0}, - groupingSet, - groupingSet, - udafInputOffsets, 3, - true); + true, + ProjectionCodeGenerator.generateProjection( + CodeGeneratorContext.apply(new TableConfig()), + "UdafInputProjection", + inputRowType, + userDefinedFunctionInputType, + udafInputOffsets), + ProjectionCodeGenerator.generateProjection( + CodeGeneratorContext.apply(new TableConfig()), + "GroupKey", + inputRowType, + (RowType) Projection.of(groupingSet).project(inputRowType), + groupingSet), + ProjectionCodeGenerator.generateProjection( + CodeGeneratorContext.apply(new TableConfig()), + "GroupSet", + inputRowType, + (RowType) Projection.of(groupingSet).project(inputRowType), + groupingSet)); } private static class PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator extends BatchArrowPythonOverWindowAggregateFunctionOperator { - PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator( + public PassThroughBatchArrowPythonOverWindowAggregateFunctionOperator( Configuration config, PythonFunctionInfo[] pandasAggFunctions, RowType inputType, - RowType outputType, + RowType userDefinedFunctionInputType, + RowType userDefinedFunctionOutputType, long[] lowerBoundary, long[] upperBoundary, - boolean[] isRangeWindow, + boolean[] isRangeWindows, int[] aggWindowIndex, - int[] groupKey, - int[] groupingSet, - int[] udafInputOffsets, int inputTimeFieldIndex, - boolean asc) { + boolean asc, + GeneratedProjection inputGeneratedProjection, + GeneratedProjection groupKeyGeneratedProjection, + GeneratedProjection groupSetGeneratedProjection) { super( config, pandasAggFunctions, inputType, - outputType, + userDefinedFunctionInputType, + userDefinedFunctionOutputType, lowerBoundary, upperBoundary, - isRangeWindow, + isRangeWindows, Review comment: typo ########## File path: flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.java ########## @@ -317,54 +322,82 @@ public RowType getOutputType() { public AbstractArrowPythonAggregateFunctionOperator getTestOperator( Configuration config, PythonFunctionInfo[] pandasAggregateFunctions, - RowType inputType, - RowType outputType, + RowType inputRowType, + RowType outputRowType, int[] groupingSet, int[] udafInputOffsets) { + + RowType userDefinedFunctionInputType = + (RowType) Projection.of(udafInputOffsets).project(inputRowType); + RowType userDefinedFunctionOutputType = + new RowType( + outputRowType + .getFields() + .subList(groupingSet.length, outputRowType.getFieldCount() - 2)); + // SlidingWindow(10000L, 5000L) return new PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator( config, pandasAggregateFunctions, - inputType, - outputType, + inputRowType, + userDefinedFunctionInputType, + userDefinedFunctionOutputType, 3, 100000, 10000L, 5000L, new int[] {0, 1}, - groupingSet, - groupingSet, - udafInputOffsets); + ProjectionCodeGenerator.generateProjection( + CodeGeneratorContext.apply(new TableConfig()), + "UdafInputProjection", + inputRowType, + userDefinedFunctionInputType, + udafInputOffsets), + ProjectionCodeGenerator.generateProjection( + CodeGeneratorContext.apply(new TableConfig()), + "GroupKey", + inputRowType, + (RowType) Projection.of(groupingSet).project(inputRowType), + groupingSet), + ProjectionCodeGenerator.generateProjection( + CodeGeneratorContext.apply(new TableConfig()), + "GroupSet", + inputRowType, + (RowType) Projection.of(groupingSet).project(inputRowType), + groupingSet)); } private static class PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator extends BatchArrowPythonGroupWindowAggregateFunctionOperator { - PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator( + + public PassThroughBatchArrowPythonGroupWindowAggregateFunctionOperator( Review comment: why `public`? ########## File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java ########## @@ -53,17 +51,11 @@ /** The input logical type. */ protected final RowType inputType; - /** The output logical type. */ - protected final RowType outputType; - - /** The offsets of user-defined function inputs. */ - protected final int[] userDefinedFunctionInputOffsets; - /** The user-defined function input logical type. */ - protected transient RowType userDefinedFunctionInputType; + protected final RowType userDefinedFunctionInputType; /** The user-defined function output logical type. */ - protected transient RowType userDefinedFunctionOutputType; + protected final RowType userDefinedFunctionOutputType; Review comment: very nit: can we shorten the names, it is difficult to read at various locations ``` userDefinedFunctionOutputType -> udfOutputType userDefinedFunctionInputType -> udfInputType udafInputGeneratedProjection -> udafInputProjection groupKeyGeneratedProjection -> groupKeyProjection userDefinedFunctionInputOffsets -> udfInputOffsets ``` ########## File path: flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeBoundedRangeOperatorTest.java ########## @@ -109,38 +114,53 @@ public AbstractArrowPythonAggregateFunctionOperator getTestOperator( RowType outputType, int[] groupingSet, int[] udafInputOffsets) { + RowType userDefinedFunctionInputType = + (RowType) Projection.of(udafInputOffsets).project(inputType); + RowType userDefinedFunctionOutputType = + new RowType( + outputType + .getFields() + .subList(inputType.getFieldCount(), outputType.getFieldCount())); + GeneratedProjection generatedProjection = Review comment: nit: inline this into the parameter for consistency? currently the PR does some with a local variable and most of them inline as a parameter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org