David Christle created FLINK-34252:
--------------------------------------

             Summary: WatermarkAssignerOperator should not emit 
WatermarkStatus.IDLE under continuous data flow
                 Key: FLINK-34252
                 URL: https://issues.apache.org/jira/browse/FLINK-34252
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.18.1, 1.17.2, 1.16.3
            Reporter: David Christle


The WatermarkAssignerOperator in the table runtime incorrectly transitions to 
an IDLE state even when data is continuously flowing. This behavior, observed 
under normal operating conditions where the interval between data elements is 
shorter than the configured idleTimeout, leads to regular transitions between 
ACTIVE and IDLE states, which are unnecessary.

_Detail:_
In the current implementation, the lastRecordTime variable, which tracks the 
time of the last received data element, is updated only when the 
WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated 
when WatermarkStatus is ACTIVE, which means even under continuous data flow, 
the condition `(currentTime - lastRecordTime > idleTimeout)` will eventually 
always become true, and the WatermarkStatus will erroneously be marked IDLE. 

It is unclear to me if this bug produces any incorrectness downstream, since 
when the WatermarkStatus is in in the IDLE state, the next processElement will 
cause a WatermarkStatus.ACTIVE to be emitted. Nevertheless, we should eliminate 
this flip-flop behavior between states.

The test I wrote fails without the fix and illustrates the flip-flops:

{noformat}
[ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.030 s 
<<< FAILURE! -- in 
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest
[ERROR] 
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.testIdleStateAvoidanceWithConsistentDataFlow
 -- Time elapsed: 0.013 s <<< FAILURE!
java.lang.AssertionError:

Expecting
  [WatermarkStatus(IDLE),
    WatermarkStatus(ACTIVE),
    WatermarkStatus(IDLE),
    WatermarkStatus(ACTIVE),
    WatermarkStatus(IDLE),
    WatermarkStatus(ACTIVE),
    WatermarkStatus(IDLE),
    WatermarkStatus(ACTIVE),
    WatermarkStatus(IDLE)]
not to contain
  [WatermarkStatus(IDLE)]
but found
  [WatermarkStatus(IDLE)]
{noformat}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to