Copilot commented on code in PR #5896:
URL: https://github.com/apache/texera/pull/5896#discussion_r3456378379


##########
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/aggregate/AggregateOpExec.scala:
##########
@@ -47,9 +47,14 @@ class AggregateOpExec(descString: String) extends 
OperatorExecutor {
 
     // Initialize distributedAggregations if it's not yet initialized
     if (distributedAggregations == null) {
-      distributedAggregations = desc.aggregations.map(agg =>
-        agg.getAggFunc(tuple.getSchema.getAttribute(agg.attribute).getType)
-      )
+      distributedAggregations = desc.aggregations.map { agg =>
+        // COUNT(*) has a blank attribute and no input column to look up; pass 
a null
+        // type since its result type does not depend on any input attribute.
+        val attrType =
+          if (agg.attribute == null || agg.attribute.trim.isEmpty) null
+          else tuple.getSchema.getAttribute(agg.attribute).getType
+        agg.getAggFunc(attrType)
+      }

Review Comment:
   COUNT(*) is supposed to ignore the attribute entirely, but this 
initialization still looks up `tuple.getSchema.getAttribute(agg.attribute)` 
whenever `attribute` is non-blank. If an attribute value "leaks through" (e.g., 
from an older workflow) and doesn't exist in the input schema, AggregateOpExec 
will throw even though COUNT(*) shouldn't depend on any column. Consider keying 
this guard off `agg.aggFunction == COUNT_STAR` instead of whether `attribute` 
is blank.



##########
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/aggregate/AggregateOpDesc.scala:
##########
@@ -78,9 +78,14 @@ class AggregateOpDesc extends LogicalOp {
           val inputSchema = inputSchemas(operatorInfo.inputPorts.head.id)
           val outputSchema = Schema(
             groupByKeys.map(key => inputSchema.getAttribute(key)) ++
-              localAggregations.map(agg =>
-                
agg.getAggregationAttribute(inputSchema.getAttribute(agg.attribute).getType)
-              )
+              localAggregations.map { agg =>
+                // A blank attribute (COUNT(*)) has no input column to look up
+                // so a null attrType is safe here.
+                val attrType =
+                  if (agg.attribute == null || agg.attribute.trim.isEmpty) null
+                  else inputSchema.getAttribute(agg.attribute).getType
+                agg.getAggregationAttribute(attrType)
+              }

Review Comment:
   Schema propagation has the same issue as the executor: it treats a blank 
`attribute` as the signal for COUNT(*), but COUNT(*) should ignore `attribute` 
even if it is non-blank (e.g., leaked config) and should not dereference a 
potentially non-existent input column. Using `agg.aggFunction == COUNT_STAR` 
here avoids erroneous `getAttribute(...)` lookups for COUNT(*).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to