ChangjiGuo created FLINK-25475:
----------------------------------

             Summary: When windowAgg and groupAgg are included at the same 
time, there is no assigner generated but MiniBatch optimization is still used.
                 Key: FLINK-25475
                 URL: https://issues.apache.org/jira/browse/FLINK-25475
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.14.0
            Reporter: ChangjiGuo
         Attachments: image-2021-12-29-16-04-50-211.png, 
image-2021-12-29-16-05-15-519.png

If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule will 
not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction or 
MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will still be 
generated when translated into transformation.

It will only judge whether to enable minibacth.
{code:java}
val operator = if (isMiniBatchEnabled) {
  val aggFunction = new MiniBatchGroupAggFunction(
    aggsHandler,
    recordEqualiser,
    accTypes,
    inputRowType,
    inputCountIndex,
    generateUpdateBefore)

  new KeyedMapBundleOperator(
    aggFunction,
    AggregateUtil.createMiniBatchTrigger(tableConfig))
} else {
  val aggFunction = new GroupAggFunction(
    tableConfig.getMinIdleStateRetentionTime,
    tableConfig.getMaxIdleStateRetentionTime,
    aggsHandler,
    recordEqualiser,
    accTypes,
    inputCountIndex,
    generateUpdateBefore)

  val operator = new KeyedProcessOperator[RowData, RowData, 
RowData](aggFunction)
  operator
} {code}
for example:

before:

!image-2021-12-29-16-04-50-211.png!

after:
!image-2021-12-29-16-05-15-519.png!

The WatermarkAssigner will send watermark to downstream, and the finishBundle 
method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to