HeartSaVioR opened a new pull request #30427:
URL: https://github.com/apache/spark/pull/30427


   
   ### What changes were proposed in this pull request?
   
   This PR proposes to add the watermark gap information in SS UI page. Please 
refer below screenshots to see what we'd like to show in UI.
   
   ![Screen Shot 2020-11-19 at 6 56 47 
PM](https://user-images.githubusercontent.com/1317309/99669029-d5d4c080-2ab1-11eb-9c63-d05b3e1ab391.png)
   ![Screen Shot 2020-11-19 at 7 00 21 
PM](https://user-images.githubusercontent.com/1317309/99669049-dbcaa180-2ab1-11eb-8789-10b35857dda0.png)
   
   Please note that this PR doesn't plot the watermark value - knowing the gap 
between actual wall clock and watermark looks more useful than the absolute 
value.
   
   ### Why are the changes needed?
   
   Watermark is the one of major metrics the end users need to track for 
stateful queries. Watermark defines "when" the output will be emitted for 
append mode, hence knowing how much gap between wall clock and watermark (input 
data) is very helpful to make expectation of the output.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, SS UI query page will contain the watermark gap information.
   
   ### How was this patch tested?
   
   Basic UT added. Manually tested with two queries:
   
   > simple case
   
   You'll see consistent watermark gap with (15 seconds + a) = 10 seconds are 
from delay in watermark definition, 5 seconds are trigger interval.
   
   ```
   import org.apache.spark.sql.streaming.Trigger
   
   spark.conf.set("spark.sql.shuffle.partitions", "10")
   
   val query = spark
     .readStream
     .format("rate")
     .option("rowsPerSecond", 1000)
     .option("rampUpTime", "10s")
     .load()
     .selectExpr("timestamp", "mod(value, 100) as mod", "value")
     .withWatermark("timestamp", "10 seconds")
     .groupBy(window($"timestamp", "1 minute", "10 seconds"), $"mod")
     .agg(max("value").as("max_value"), min("value").as("min_value"), 
avg("value").as("avg_value"))
     .writeStream
     .format("console")
     .trigger(Trigger.ProcessingTime("5 seconds"))
     .outputMode("append")
     .start()
   
   query.awaitTermination()
   ```
   
   > complicated case
   
   This randomizes the timestamp, hence producing random watermark gap. This 
won't be smaller than 15 seconds as I described earlier.
   
   ```
   import org.apache.spark.sql.streaming.Trigger
   
   spark.conf.set("spark.sql.shuffle.partitions", "10")
   
   val query = spark
     .readStream
     .format("rate")
     .option("rowsPerSecond", 1000)
     .option("rampUpTime", "10s")
     .load()
     .selectExpr("timestamp", "mod(value, 100) as mod", "value")
     .withWatermark("timestamp", "10 seconds")
     .groupBy(window($"timestamp", "1 minute", "10 seconds"), $"mod")
     .agg(max("value").as("max_value"), min("value").as("min_value"), 
avg("value").as("avg_value"))
     .writeStream
     .format("console")
     .trigger(Trigger.ProcessingTime("5 seconds"))
     .outputMode("append")
     .start()
   
   query.awaitTermination()
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to