peter-toth commented on issue #23731: [SPARK-26572][SQL] fix aggregate codegen 
result evaluation
URL: https://github.com/apache/spark/pull/23731#issuecomment-460158941
 
 
   The reason why I think this is a code generation issue is that if you 
disable `spark.sql.codegen.wholeStage` then the result is correct. 
   
   This is the physical plan of the example in the ticket:
   ```
   == Physical Plan ==
   *(3) Project [idx#4, id#6L]
   +- *(3) BroadcastHashJoin [idx#4], [idx#9], Inner, BuildLeft
      :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
false] as bigint)))
      :  +- *(1) Project [value#1 AS idx#4]
      :     +- LocalTableScan [value#1]
      +- *(3) HashAggregate(keys=[idx#9], functions=[], output=[idx#9, id#6L])
         +- Exchange hashpartitioning(idx#9, 5)
            +- *(2) HashAggregate(keys=[idx#9], functions=[], output=[idx#9])
               +- *(2) Project [value#1 AS idx#9]
                  +- LocalTableScan [value#1]
   ```
   and if you take a look the code of stage 3 (left some comments in it 
regarding what my PR does):
   ```
       ...
       // this method is called for every aggregation key
       private void agg_doAggregateWithKeysOutput_0(UnsafeRow agg_keyTerm_0, 
UnsafeRow agg_bufferTerm_0)
               throws java.io.IOException {
           ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* 
numOutputRows */).add(1);
   
           int agg_value_4 = agg_keyTerm_0.getInt(0);
           // this PR moves agg_value_5 calculation and agg_count_0 increment 
from boradcast join loop to here
   
           // generate join key for stream side
           boolean bhj_isNull_0 = false;
           long bhj_value_0 = -1L;
           if (!false) {
               bhj_value_0 = (long) agg_value_4;
           }
           // find matches from HashRelation
           scala.collection.Iterator bhj_matches_0 = bhj_isNull_0 ? null
                   : (scala.collection.Iterator) 
bhj_relation_0.get(bhj_value_0);
           if (bhj_matches_0 != null) {
               while (bhj_matches_0.hasNext()) {
                   UnsafeRow bhj_matched_0 = (UnsafeRow) bhj_matches_0.next();
                   {
                       ((org.apache.spark.sql.execution.metric.SQLMetric) 
references[6] /* numOutputRows */).add(1);
   
                       int bhj_value_2 = bhj_matched_0.getInt(0);
                       boolean project_isNull_0 = false;
                       UTF8String project_value_0 = null;
                       if (!false) {
                           project_value_0 = 
UTF8String.fromString(String.valueOf(bhj_value_2));
                       }
                       final long agg_value_5 = partitionMask + agg_count_0;
                       agg_count_0++;
                       boolean project_isNull_2 = false;
                       UTF8String project_value_2 = null;
                       if (!false) {
                           project_value_2 = 
UTF8String.fromString(String.valueOf(agg_value_5));
                       }
       ...
   ```
   So both hash aggregate and broadcast join are required in one codegen stage 
to experience this issue and also important that aggregate has to be on the 
"stream" side. This might be a rare case and explains why this issue hasn't 
come up earlier.
   (I also think that there might be other operators than broadcast join that 
generate loop and so are affected, but I didn't look into that.)
   But I think this is an issue with the generated code of `HashAggregateExec` 
and it seems to me that we can force evaluation of `resultExpressions` before 
generating broadcast join code (ie. calling `consume()`).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to