????????????????watermark????????????????????????watermark????????????????????????????
public void inputWatermark(Watermark watermark, int channelIndex) {
// ignore the input watermark if its input channel, or all
input channels are idle (i.e. overall the valve is idle).
if (lastOutputStreamStatus.isActive() &&
channelStatuses[channelIndex].streamStatus.isActive()) {
long watermarkMillis = watermark.getTimestamp();
// if the input watermark's value is less than the last
received watermark for its input channel, ignore it also.
if (watermarkMillis >
channelStatuses[channelIndex].watermark) {
channelStatuses[channelIndex].watermark =
watermarkMillis;
// previously unaligned input channels are now
aligned if its watermark has caught up
if
(!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis
>= lastOutputWatermark) {
channelStatuses[channelIndex].isWatermarkAligned = true;
}
// now, attempt to find a new min watermark
across all aligned channels
findAndOutputNewMinWatermarkAcrossAlignedChannels();
}
}
}
private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
long newMinWatermark = Long.MAX_VALUE;
boolean hasAlignedChannels = false;
// determine new overall watermark by considering only
watermark-aligned channels across all channels
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned) {
hasAlignedChannels = true;
newMinWatermark =
Math.min(channelStatus.watermark, newMinWatermark);
}
}
// we acknowledge and output the new overall watermark if it
really is aggregated
// from some remaining aligned channel, and is also larger than
the last output watermark
if (hasAlignedChannels && newMinWatermark >
lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
outputHandler.handleWatermark(new
Watermark(lastOutputWatermark));
}
}
???????????????????????? channelIndex
????????watermark???????????????????????????????????? channelIndex
??????watermark????????????????????????