HeartSaVioR opened a new pull request #30427: URL: https://github.com/apache/spark/pull/30427
### What changes were proposed in this pull request? This PR proposes to add the watermark gap information in SS UI page. Please refer below screenshots to see what we'd like to show in UI.   Please note that this PR doesn't plot the watermark value - knowing the gap between actual wall clock and watermark looks more useful than the absolute value. ### Why are the changes needed? Watermark is the one of major metrics the end users need to track for stateful queries. Watermark defines "when" the output will be emitted for append mode, hence knowing how much gap between wall clock and watermark (input data) is very helpful to make expectation of the output. ### Does this PR introduce _any_ user-facing change? Yes, SS UI query page will contain the watermark gap information. ### How was this patch tested? Basic UT added. Manually tested with two queries: > simple case You'll see consistent watermark gap with (15 seconds + a) = 10 seconds are from delay in watermark definition, 5 seconds are trigger interval. ``` import org.apache.spark.sql.streaming.Trigger spark.conf.set("spark.sql.shuffle.partitions", "10") val query = spark .readStream .format("rate") .option("rowsPerSecond", 1000) .option("rampUpTime", "10s") .load() .selectExpr("timestamp", "mod(value, 100) as mod", "value") .withWatermark("timestamp", "10 seconds") .groupBy(window($"timestamp", "1 minute", "10 seconds"), $"mod") .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value")) .writeStream .format("console") .trigger(Trigger.ProcessingTime("5 seconds")) .outputMode("append") .start() query.awaitTermination() ``` > complicated case This randomizes the timestamp, hence producing random watermark gap. This won't be smaller than 15 seconds as I described earlier. ``` import org.apache.spark.sql.streaming.Trigger spark.conf.set("spark.sql.shuffle.partitions", "10") val query = spark .readStream .format("rate") .option("rowsPerSecond", 1000) .option("rampUpTime", "10s") .load() .selectExpr("timestamp", "mod(value, 100) as mod", "value") .withWatermark("timestamp", "10 seconds") .groupBy(window($"timestamp", "1 minute", "10 seconds"), $"mod") .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value")) .writeStream .format("console") .trigger(Trigger.ProcessingTime("5 seconds")) .outputMode("append") .start() query.awaitTermination() ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
