Hi, We have detected an issue with SparkRunner and Watermark.
*Pipeline*: Read from two Kafka Sources => Apply fixed window of duration 1 minute to both the PCollections => Apply SqlTransform with query "select c.datetime, c.country ,s.name, s.id from `kafka_source1` as s join `kafka_source2` as c on s.name = c.name" => write the emitted output to Kafka Sink we are using the watermark provided in https://github.com/apache/beam/blob/8869fcebdb9ddff375d8e7b408f1d971e1257815/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L74. We have given maxDelay as 0. As we have applied fixed window of 1 minute duration and as the elements timestamps are monotonically increasing, we are expecting the output to be emitted when the current processing time crosses 12-02-00 with a reasonable delay(say 10 seconds). But, we are getting the result of the window after a long delay. In Spark logs it seems that the watermark is lagging. Here are the logs: 19/09/05 12:02:50 INFO GlobalWatermarkHolder: Put new watermark block: {0=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.558Z, highWatermark=2019-09-05T11:57:06.302Z, synchronizedProcessingTime=2019-09-05T11:55:00.500Z}, 1=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.120Z, highWatermark=2019-09-05T11:57:06.686Z, synchronizedProcessingTime=2019-09-05T11:55:00.500Z}} 19/09/05 12:02:50 INFO GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: Batch with timestamp: 1567684500500 has completed, watermarks have been updated. As you can see, when the current processing time is 12:02:50, the highWatermark is 11:57:06. As the processing time progresses, the gap between processing time and highWatermark is increasing. We ran the same pipeline with same data in Flink Runner and Direct Runner and we have not seen this issue. In these runners, we can see that the Watermark is almost equal to Processing time. Sample Input Data : kafka_source1: value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-19 481704'} value:{'id': 1, 'name': 'test1', 'datetime': '2019-09-05 12-01-20 491764'} value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-21 493494'} kafka_source2: value:{'country': 'India', 'name': 'test0', 'datetime': '2019-09-05 12-01-26 704060'} value:{'country': 'USA', 'name': 'test1', 'datetime': '2019-09-05 12-01-27 712300'} value:{'country': 'USA', 'name': 'test2', 'datetime': '2019-09-05 12-01-28 713951'} what can be the issue here? Regards, shanta
