Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21617#discussion_r197984227
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -48,12 +49,13 @@ class StateOperatorProgress private[sql](
def prettyJson: String = pretty(render(jsonValue))
private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
- new StateOperatorProgress(numRowsTotal, newNumRowsUpdated,
memoryUsedBytes)
+ new StateOperatorProgress(numRowsTotal, newNumRowsUpdated,
memoryUsedBytes, numLateInputRows)
private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
- ("memoryUsedBytes" -> JInt(memoryUsedBytes))
+ ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+ ("numLateInputRows" -> JInt(numLateInputRows))
--- End diff --
What I meant was, if the input to the state operator is the result of the
aggregate, then we would not be counting the actual input rows to the group by.
There would be max one row per key, so would give the impression that there are
not as many late events but in reality it may be more.
If this is not the case then I am fine.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]