Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/21617#discussion_r197981651
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -48,12 +49,13 @@ class StateOperatorProgress private[sql](
def prettyJson: String = pretty(render(jsonValue))
private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
- new StateOperatorProgress(numRowsTotal, newNumRowsUpdated,
memoryUsedBytes)
+ new StateOperatorProgress(numRowsTotal, newNumRowsUpdated,
memoryUsedBytes, numLateInputRows)
private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
- ("memoryUsedBytes" -> JInt(memoryUsedBytes))
+ ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+ ("numLateInputRows" -> JInt(numLateInputRows))
--- End diff --
@arunmahadevan
> Here you are measuring the number of "keys" filtered out of the state
store since they have crossed the late threshold correct ?
No, it is based on "input" rows which are filtered out due to watermark
threshold. Note that the meaning of "input" is relative, cause it doesn't
represent for input rows in overall query, but represents for input rows in
state operator.
> Its better if we could rather expose the actual number of events that
were late.
I guess the comment is based on missing thing, but I would think that it
would be correct that we filtered out late events from the first phase of query
(not from state operator) so that we can get correct count of late events. For
now filters affect the count.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]