swuferhong commented on code in PR #22684: URL: https://github.com/apache/flink/pull/22684#discussion_r1214139725
########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala: ########## @@ -231,25 +230,23 @@ object ProjectionCodeGenerator { inputTerm: String, targetType: LogicalType, index: Int): GeneratedExpression = { - val fieldType = getFieldTypes(inputType).get(index) - val resultTypeTerm = primitiveTypeTermForType(fieldType) - val defaultValue = primitiveDefaultValue(fieldType) - val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType) - val Seq(fieldTerm, nullTerm) = - ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean", "isNull")) - - val inputCode = - s""" - |$nullTerm = $inputTerm.isNullAt($index); - |$fieldTerm = $defaultValue; - |if (!$nullTerm) { - | $fieldTerm = $readCode; - |} - """.stripMargin.trim - - val expression = GeneratedExpression(fieldTerm, nullTerm, inputCode, fieldType) + val fieldExpr = getReuseFieldExprForAggFunc(ctx, inputType, inputTerm, index) // Convert the projected value type to sum agg func target type. - ScalarOperatorGens.generateCast(ctx, expression, targetType, true) + ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, true) Review Comment: nit: Removing the IDEA warning: Changing to `ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, nullOnFailure = true)` ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala: ########## @@ -231,25 +230,23 @@ object ProjectionCodeGenerator { inputTerm: String, targetType: LogicalType, index: Int): GeneratedExpression = { - val fieldType = getFieldTypes(inputType).get(index) - val resultTypeTerm = primitiveTypeTermForType(fieldType) - val defaultValue = primitiveDefaultValue(fieldType) - val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType) - val Seq(fieldTerm, nullTerm) = - ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean", "isNull")) - - val inputCode = - s""" - |$nullTerm = $inputTerm.isNullAt($index); - |$fieldTerm = $defaultValue; - |if (!$nullTerm) { - | $fieldTerm = $readCode; - |} - """.stripMargin.trim - - val expression = GeneratedExpression(fieldTerm, nullTerm, inputCode, fieldType) + val fieldExpr = getReuseFieldExprForAggFunc(ctx, inputType, inputTerm, index) // Convert the projected value type to sum agg func target type. - ScalarOperatorGens.generateCast(ctx, expression, targetType, true) + ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, true) + } + + /** Get reuse field expr if it has been evaluated before for adaptive local hash aggregation. */ + def getReuseFieldExprForAggFunc( + ctx: CodeGeneratorContext, + inputType: LogicalType, + inputTerm: String, + index: Int) = { + // reuse the field access code if it has been evaluated before Review Comment: -> // Reuse the field access code if it has been evaluated before. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala: ########## @@ -231,25 +230,23 @@ object ProjectionCodeGenerator { inputTerm: String, targetType: LogicalType, index: Int): GeneratedExpression = { - val fieldType = getFieldTypes(inputType).get(index) - val resultTypeTerm = primitiveTypeTermForType(fieldType) - val defaultValue = primitiveDefaultValue(fieldType) - val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType) - val Seq(fieldTerm, nullTerm) = - ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean", "isNull")) - - val inputCode = - s""" - |$nullTerm = $inputTerm.isNullAt($index); - |$fieldTerm = $defaultValue; - |if (!$nullTerm) { - | $fieldTerm = $readCode; - |} - """.stripMargin.trim - - val expression = GeneratedExpression(fieldTerm, nullTerm, inputCode, fieldType) + val fieldExpr = getReuseFieldExprForAggFunc(ctx, inputType, inputTerm, index) // Convert the projected value type to sum agg func target type. - ScalarOperatorGens.generateCast(ctx, expression, targetType, true) + ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, true) + } + + /** Get reuse field expr if it has been evaluated before for adaptive local hash aggregation. */ + def getReuseFieldExprForAggFunc( + ctx: CodeGeneratorContext, + inputType: LogicalType, + inputTerm: String, + index: Int) = { Review Comment: Add type annotation : ` index: Int): GeneratedExpression = {` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org