swuferhong commented on code in PR #22684:
URL: https://github.com/apache/flink/pull/22684#discussion_r1214139725


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##########
@@ -231,25 +230,23 @@ object ProjectionCodeGenerator {
       inputTerm: String,
       targetType: LogicalType,
       index: Int): GeneratedExpression = {
-    val fieldType = getFieldTypes(inputType).get(index)
-    val resultTypeTerm = primitiveTypeTermForType(fieldType)
-    val defaultValue = primitiveDefaultValue(fieldType)
-    val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType)
-    val Seq(fieldTerm, nullTerm) =
-      ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean", 
"isNull"))
-
-    val inputCode =
-      s"""
-         |$nullTerm = $inputTerm.isNullAt($index);
-         |$fieldTerm = $defaultValue;
-         |if (!$nullTerm) {
-         |  $fieldTerm = $readCode;
-         |}
-           """.stripMargin.trim
-
-    val expression = GeneratedExpression(fieldTerm, nullTerm, inputCode, 
fieldType)
+    val fieldExpr = getReuseFieldExprForAggFunc(ctx, inputType, inputTerm, 
index)
     // Convert the projected value type to sum agg func target type.
-    ScalarOperatorGens.generateCast(ctx, expression, targetType, true)
+    ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, true)

Review Comment:
   nit: Removing the IDEA warning:  Changing to 
`ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, nullOnFailure = 
true)`



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##########
@@ -231,25 +230,23 @@ object ProjectionCodeGenerator {
       inputTerm: String,
       targetType: LogicalType,
       index: Int): GeneratedExpression = {
-    val fieldType = getFieldTypes(inputType).get(index)
-    val resultTypeTerm = primitiveTypeTermForType(fieldType)
-    val defaultValue = primitiveDefaultValue(fieldType)
-    val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType)
-    val Seq(fieldTerm, nullTerm) =
-      ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean", 
"isNull"))
-
-    val inputCode =
-      s"""
-         |$nullTerm = $inputTerm.isNullAt($index);
-         |$fieldTerm = $defaultValue;
-         |if (!$nullTerm) {
-         |  $fieldTerm = $readCode;
-         |}
-           """.stripMargin.trim
-
-    val expression = GeneratedExpression(fieldTerm, nullTerm, inputCode, 
fieldType)
+    val fieldExpr = getReuseFieldExprForAggFunc(ctx, inputType, inputTerm, 
index)
     // Convert the projected value type to sum agg func target type.
-    ScalarOperatorGens.generateCast(ctx, expression, targetType, true)
+    ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, true)
+  }
+
+  /** Get reuse field expr if it has been evaluated before for adaptive local 
hash aggregation. */
+  def getReuseFieldExprForAggFunc(
+      ctx: CodeGeneratorContext,
+      inputType: LogicalType,
+      inputTerm: String,
+      index: Int) = {
+    // reuse the field access code if it has been evaluated before

Review Comment:
   ->  // Reuse the field access code if it has been evaluated before.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##########
@@ -231,25 +230,23 @@ object ProjectionCodeGenerator {
       inputTerm: String,
       targetType: LogicalType,
       index: Int): GeneratedExpression = {
-    val fieldType = getFieldTypes(inputType).get(index)
-    val resultTypeTerm = primitiveTypeTermForType(fieldType)
-    val defaultValue = primitiveDefaultValue(fieldType)
-    val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType)
-    val Seq(fieldTerm, nullTerm) =
-      ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean", 
"isNull"))
-
-    val inputCode =
-      s"""
-         |$nullTerm = $inputTerm.isNullAt($index);
-         |$fieldTerm = $defaultValue;
-         |if (!$nullTerm) {
-         |  $fieldTerm = $readCode;
-         |}
-           """.stripMargin.trim
-
-    val expression = GeneratedExpression(fieldTerm, nullTerm, inputCode, 
fieldType)
+    val fieldExpr = getReuseFieldExprForAggFunc(ctx, inputType, inputTerm, 
index)
     // Convert the projected value type to sum agg func target type.
-    ScalarOperatorGens.generateCast(ctx, expression, targetType, true)
+    ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, true)
+  }
+
+  /** Get reuse field expr if it has been evaluated before for adaptive local 
hash aggregation. */
+  def getReuseFieldExprForAggFunc(
+      ctx: CodeGeneratorContext,
+      inputType: LogicalType,
+      inputTerm: String,
+      index: Int) = {

Review Comment:
   Add type annotation : ` index: Int): GeneratedExpression = {`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to