bogao007 commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173392468


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, 
safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a 
[[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def copy(
+      newNumRowsUpdated: Long,
+      newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = newNumRowsUpdated,
+      allUpdatesTimeMs = allUpdatesTimeMs,
+      numRowsRemoved = numRowsRemoved,
+      allRemovalsTimeMs = allRemovalsTimeMs,
+      commitTimeMs = commitTimeMs,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = customMetrics)
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject 
}.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] 
during a trigger. Each
+ * event relates to processing done for a single trigger of the streaming 
query. Events are
+ * emitted even when no new data is available to be processed.
+ *
+ * @param id
+ *   A unique query id that persists across restarts. See 
`StreamingQuery.id()`.
+ * @param runId
+ *   A query id that is unique for every start/restart. See 
`StreamingQuery.runId()`.
+ * @param name
+ *   User-specified name of the query, null if not specified.
+ * @param timestamp
+ *   Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
+ * @param batchId
+ *   A unique id for the current batch of data being processed. Note that in 
the case of retries
+ *   after a failure a given batchId my be executed more than once. Similarly, 
when there is no
+ *   data to be processed, the batchId will not be incremented.
+ * @param batchDuration
+ *   The process duration of each batch.
+ * @param durationMs
+ *   The amount of time taken to perform various operations in milliseconds.
+ * @param eventTime
+ *   Statistics of event time seen in this batch. It may contain the following 
keys:
+ *   {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event 
time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event 
time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event 
time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark 
used in this trigger
+ *   }}}
+ *   All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators
+ *   Information about operators in the query that store state.
+ * @param sources
+ *   detailed statistics on data being read from each of the streaming sources.
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryProgress private[spark] (
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress,
+    @JsonSetter(nulls = Nulls.AS_EMPTY)
+    @JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
+    val observedMetrics: ju.Map[String, Row])
+    extends Serializable {
+
+  /** The aggregate (across all sources) number of records processed in a 
trigger. */
+  def numInputRows: Long = sources.map(_.numInputRows).sum

Review Comment:
   Aggregated metrics like `numInputRows`, `inputRowsPerSecond` and 
`processedRowsPerSecond` are being calculated inside this class. Can they be 
safely deserialized from json string? Might also worth to add them into the 
test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to