Tzu-Li (Gordon) Tai created FLINK-7721:
------------------------------------------

             Summary: StatusWatermarkValve should output a new min watermark 
only if it was aggregated from aligned chhanels
                 Key: FLINK-7721
                 URL: https://issues.apache.org/jira/browse/FLINK-7721
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.3.2, 1.2.1, 1.4.0
            Reporter: Tzu-Li (Gordon) Tai
            Assignee: Tzu-Li (Gordon) Tai
            Priority: Blocker
             Fix For: 1.2.2, 1.4.0, 1.3.3


Context:
{code}
long newMinWatermark = Long.MAX_VALUE;

for (InputChannelStatus channelStatus : channelStatuses) {
    if (channelStatus.isWatermarkAligned) {
        newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
    }
}
{code}

In the calculation of the new min watermark in 
{{StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()}}, 
there is not verification that the calculated new min watermark 
{{newMinWatermark}} really is aggregated from some aligned channel.

In the corner case where all input channels are currently not aligned but 
actually some are active, we would then incorrectly determine that the final 
aggregation of {{newMinWatermark}} is {{Long.MAX_VALUE}} and emit that.

The fix would simply be to only emit the aggregated watermark IFF it was really 
calculated from some aligned input channel (as well as the already existing 
constraint that it needs to be larger than the last emitted watermark). This 
change should also safely cover the case that a {{Long.MAX_VALUE}} was 
genuinely aggregated from one of the input channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to