bogao007 commented on code in PR #40895:
URL: https://github.com/apache/spark/pull/40895#discussion_r1173337245


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,265 @@
 
 package org.apache.spark.sql.streaming
 
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, 
safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] 
during
+ * a trigger.
+ * @param json A json string that contains entire streaming query progress.
+ */
+@Evolving
 class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+  private val jsonMapper = new 
ObjectMapper().registerModule(DefaultScalaModule)
+
+  /** Deserialize streaming query progress json string to 
[[LegacyStreamingQueryProgress]] */
+  def fromJson(): LegacyStreamingQueryProgress = {
+    jsonMapper.readValue(json, classOf[LegacyStreamingQueryProgress])
+  }
+}
+
+/**
+ * Information about updates made to stateful operators in a 
[[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark](
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap()
+  ) extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong) : JObject 
}.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] 
during
+ * a trigger. Each event relates to processing done for a single trigger of 
the streaming
+ * query. Events are emitted even when no new data is available to be 
processed.
+ *
+ * @param id A unique query id that persists across restarts. See 
`StreamingQuery.id()`.
+ * @param runId A query id that is unique for every start/restart. See 
`StreamingQuery.runId()`.
+ * @param name User-specified name of the query, null if not specified.
+ * @param timestamp Beginning time of the trigger in ISO8601 format, i.e. UTC 
timestamps.
+ * @param batchId A unique id for the current batch of data being processed.  
Note that in the
+ *                case of retries after a failure a given batchId my be 
executed more than once.
+ *                Similarly, when there is no data to be processed, the 
batchId will not be
+ *                incremented.
+ * @param numInputRows The total number of records read from all the sources.
+ * @param inputRowsPerSecond The total rate at which data is arriving from all 
the sources.
+ * @param processedRowsPerSecond The total rate at which data from all the 
sources is being
+ *                               processed by Spark.
+ * @param batchDuration The process duration of each batch.
+ * @param durationMs The amount of time taken to perform various operations in 
milliseconds.
+ * @param eventTime Statistics of event time seen in this batch. It may 
contain the following keys:
+ *                 {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event 
time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event 
time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event 
time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark 
used in this trigger
+ *                 }}}
+ *                 All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators Information about operators in the query that store 
state.
+ * @param sources detailed statistics on data being read from each of the 
streaming sources.
+ * @since 2.1.0
+ */
+@Evolving
+class LegacyStreamingQueryProgress private[spark](
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val numInputRows: Long,
+    val inputRowsPerSecond: Double,
+    val processedRowsPerSecond: Double,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress) extends Serializable {

Review Comment:
   I removed `observedMetrics` from the original class since it has some issues 
to deserialize the `GenericRowWithSchema` type. If we don't need this then it 
should be okay, otherwise I'll need to investigate this deserialization issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to