LuciferYang commented on code in PR #39642:
URL: https://github.com/apache/spark/pull/39642#discussion_r1083634888


##########
sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala:
##########
@@ -271,4 +278,254 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
       assert(result.endTimestamp == input.endTimestamp)
     }
   }
+
+  test("StreamingQueryProgressWrapper") {
+    val normalInput = {
+      val stateOperatorProgress0 = new StateOperatorProgress(
+        operatorName = "op-0",
+        numRowsTotal = 1L,
+        numRowsUpdated = 2L,
+        allUpdatesTimeMs = 3L,
+        numRowsRemoved = 4L,
+        allRemovalsTimeMs = 5L,
+        commitTimeMs = 6L,
+        memoryUsedBytes = 7L,
+        numRowsDroppedByWatermark = 8L,
+        numShufflePartitions = 9L,
+        numStateStoreInstances = 10L,
+        customMetrics = Map(
+          "custom-metrics-00" -> JLong.valueOf("10"),
+          "custom-metrics-01" -> JLong.valueOf("11")).asJava
+      )
+      val stateOperatorProgress1 = new StateOperatorProgress(
+        operatorName = null,
+        numRowsTotal = 11L,
+        numRowsUpdated = 12L,
+        allUpdatesTimeMs = 13L,
+        numRowsRemoved = 14L,
+        allRemovalsTimeMs = 15L,
+        commitTimeMs = 16L,
+        memoryUsedBytes = 17L,
+        numRowsDroppedByWatermark = 18L,
+        numShufflePartitions = 19L,
+        numStateStoreInstances = 20L,
+        customMetrics = Map(
+          "custom-metrics-10" -> JLong.valueOf("20"),
+          "custom-metrics-11" -> JLong.valueOf("21")).asJava
+      )
+      val source0 = new SourceProgress(
+        description = "description-0",
+        startOffset = "startOffset-0",
+        endOffset = "endOffset-0",
+        latestOffset = "latestOffset-0",
+        numInputRows = 10L,
+        inputRowsPerSecond = 11.0,
+        processedRowsPerSecond = 12.0,
+        metrics = Map(
+          "metrics-00" -> "10",
+          "metrics-01" -> "11").asJava
+      )
+      val source1 = new SourceProgress(
+        description = "description-1",
+        startOffset = "startOffset-1",
+        endOffset = "endOffset-1",
+        latestOffset = "latestOffset-1",
+        numInputRows = 20L,
+        inputRowsPerSecond = 21.0,
+        processedRowsPerSecond = 22.0,
+        metrics = Map(
+          "metrics-10" -> "20",
+          "metrics-11" -> "21").asJava
+      )
+      val sink = new SinkProgress(
+        description = "sink-0",
+        numOutputRows = 30,
+        metrics = Map(
+          "metrics-20" -> "30",
+          "metrics-21" -> "31").asJava
+      )
+      val schema1 = new StructType()
+        .add("c1", "long")
+        .add("c2", "double")
+      val schema2 = new StructType()
+        .add("rc", "long")
+        .add("min_q", "string")
+        .add("max_q", "string")
+      val observedMetrics = Map[String, Row](
+        "event1" -> new GenericRowWithSchema(Array(1L, 3.0d), schema1),
+        "event2" -> new GenericRowWithSchema(Array(1L, "hello", "world"), 
schema2)
+      ).asJava
+      val progress = new StreamingQueryProgress(
+        id = UUID.randomUUID(),
+        runId = UUID.randomUUID(),
+        name = "name-1",
+        timestamp = "2023-01-03T09:14:04.175Z",
+        batchId = 1L,
+        batchDuration = 2L,
+        durationMs = Map(
+          "duration-0" -> JLong.valueOf("10"),
+          "duration-1" -> JLong.valueOf("11")).asJava,
+        eventTime = Map(
+          "eventTime-0" -> "20",
+          "eventTime-1" -> "21").asJava,
+        stateOperators = Array(stateOperatorProgress0, stateOperatorProgress1),
+        sources = Array(source0, source1),
+        sink = sink,
+        observedMetrics = observedMetrics
+      )
+      new StreamingQueryProgressWrapper(progress)
+    }
+
+    val withNullInput = {
+      val stateOperatorProgress0 = new StateOperatorProgress(
+        operatorName = null,
+        numRowsTotal = 1L,
+        numRowsUpdated = 2L,
+        allUpdatesTimeMs = 3L,
+        numRowsRemoved = 4L,
+        allRemovalsTimeMs = 5L,
+        commitTimeMs = 6L,
+        memoryUsedBytes = 7L,
+        numRowsDroppedByWatermark = 8L,
+        numShufflePartitions = 9L,
+        numStateStoreInstances = 10L,
+        customMetrics = null
+      )
+      val stateOperatorProgress1 = new StateOperatorProgress(
+        operatorName = null,
+        numRowsTotal = 11L,
+        numRowsUpdated = 12L,
+        allUpdatesTimeMs = 13L,
+        numRowsRemoved = 14L,
+        allRemovalsTimeMs = 15L,
+        commitTimeMs = 16L,
+        memoryUsedBytes = 17L,
+        numRowsDroppedByWatermark = 18L,
+        numShufflePartitions = 19L,
+        numStateStoreInstances = 20L,
+        customMetrics = null
+      )
+      val source0 = new SourceProgress(
+        description = null,
+        startOffset = null,
+        endOffset = null,
+        latestOffset = null,
+        numInputRows = 10L,
+        inputRowsPerSecond = 11.0,
+        processedRowsPerSecond = 12.0,
+        metrics = null
+      )
+      val source1 = new SourceProgress(
+        description = null,
+        startOffset = null,
+        endOffset = null,
+        latestOffset = null,
+        numInputRows = 10L,
+        inputRowsPerSecond = 11.0,
+        processedRowsPerSecond = 12.0,
+        metrics = null
+      )
+      val sink = new SinkProgress(
+        description = null,
+        numOutputRows = 30,
+        metrics = null
+      )
+      val progress = new StreamingQueryProgress(
+        id = null,
+        runId = null,
+        name = null,
+        timestamp = null,
+        batchId = 1L,
+        batchDuration = 2L,
+        durationMs = null,
+        eventTime = null,
+        stateOperators = Array(stateOperatorProgress0, stateOperatorProgress1),
+        sources = Array(source0, source1),
+        sink = sink,
+        observedMetrics = null
+      )
+      new StreamingQueryProgressWrapper(progress)
+    }
+
+    Seq((false, normalInput), (true, withNullInput)).foreach { case 
(hasNullValue, input) =>
+      // Do serialization and deserialization
+      val bytes = serializer.serialize(input)
+      val result = serializer.deserialize(bytes, 
classOf[StreamingQueryProgressWrapper])
+
+      // Assertion results
+      val progress = input.progress
+      val resultProcess = result.progress
+      assert(progress.id == resultProcess.id)
+      assert(progress.runId == resultProcess.runId)
+      assert(progress.name == resultProcess.name)
+      assert(progress.timestamp == resultProcess.timestamp)
+      assert(progress.batchId == resultProcess.batchId)
+      assert(progress.batchDuration == resultProcess.batchDuration)
+      if (hasNullValue) {
+        assert(resultProcess.durationMs.isEmpty)

Review Comment:
   null map input return empty map



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to