Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21617#discussion_r197980605
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
    @@ -48,12 +49,13 @@ class StateOperatorProgress private[sql](
       def prettyJson: String = pretty(render(jsonValue))
     
       private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
    -    new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes)
    +    new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes, numLateInputRows)
     
       private[sql] def jsonValue: JValue = {
         ("numRowsTotal" -> JInt(numRowsTotal)) ~
         ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
    -    ("memoryUsedBytes" -> JInt(memoryUsedBytes))
    +    ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
    +    ("numLateInputRows" -> JInt(numLateInputRows))
    --- End diff --
    
    Here you are measuring the number of "keys" filtered out of the state store 
since they have crossed the late threshold correct ? It may be better to rename 
this metrics here and at other places to "number of evicted rows". Its better 
if we could rather expose the actual number of events that were late.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to