Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21469#discussion_r206738287
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -48,12 +49,24 @@ class StateOperatorProgress private[sql](
def prettyJson: String = pretty(render(jsonValue))
private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
- new StateOperatorProgress(numRowsTotal, newNumRowsUpdated,
memoryUsedBytes)
+ new StateOperatorProgress(numRowsTotal, newNumRowsUpdated,
memoryUsedBytes, customMetrics)
private[sql] def jsonValue: JValue = {
- ("numRowsTotal" -> JInt(numRowsTotal)) ~
- ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
- ("memoryUsedBytes" -> JInt(memoryUsedBytes))
+ def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T =>
JValue): JValue = {
+ if (map.isEmpty) return JNothing
+ val keys = map.keySet.asScala.toSeq.sorted
+ keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_
~ _)
+ }
+
+ val jsonVal = ("numRowsTotal" -> JInt(numRowsTotal)) ~
+ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+ ("memoryUsedBytes" -> JInt(memoryUsedBytes))
+
+ if (!customMetrics.isEmpty) {
--- End diff --
You are already handling the case of map being empty in `safeMapToJValue`
by adding JNothing. Doesnt JNothing values just get dropped from the json text
any way?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]