HeartSaVioR commented on code in PR #41540:
URL: https://github.com/apache/spark/pull/41540#discussion_r1227476433


##########
python/pyspark/sql/tests/streaming/test_streaming_listener.py:
##########
@@ -299,6 +327,166 @@ def onQueryTerminated(self, event):
         self.spark.streams.removeListener(test_listener)
         self.assertEqual(num_listeners, 
len(self.spark.streams._jsqm.listListeners()))
 
+    def test_query_started_event_fromJson(self):
+        start_event = """
+            {
+                "id" : "78923ec2-8f4d-4266-876e-1f50cf3c283b",
+                "runId" : "55a95d45-e932-4e08-9caa-0a8ecd9391e8",
+                "name" : null,
+                "timestamp" : "2023-06-09T18:13:29.741Z"
+            }
+        """
+        start_event = QueryStartedEvent.fromJson(json.loads(start_event))
+        self.check_start_event(start_event)
+        self.assertTrue(start_event.id == 
uuid.UUID("78923ec2-8f4d-4266-876e-1f50cf3c283b"))
+        self.assertTrue(start_event.runId == 
uuid.UUID("55a95d45-e932-4e08-9caa-0a8ecd9391e8"))
+        self.assertTrue(start_event.name is None)
+        self.assertTrue(start_event.timestamp == "2023-06-09T18:13:29.741Z")
+
+    def test_query_terminated_event_fromJson(self):
+        terminated_json = """
+            {
+                "id" : "78923ec2-8f4d-4266-876e-1f50cf3c283b",
+                "runId" : "55a95d45-e932-4e08-9caa-0a8ecd9391e8",
+                "exception" : "org.apache.spark.SparkException: Job aborted 
due to stage failure",
+                "errorClassOnException" : null}
+        """
+        terminated_event = 
QueryTerminatedEvent.fromJson(json.loads(terminated_json))
+        self.check_terminated_event(terminated_event, "SparkException")
+        self.assertTrue(terminated_event.id == 
uuid.UUID("78923ec2-8f4d-4266-876e-1f50cf3c283b"))
+        self.assertTrue(terminated_event.runId == 
uuid.UUID("55a95d45-e932-4e08-9caa-0a8ecd9391e8"))
+        self.assertTrue("SparkException" in terminated_event.exception)
+        self.assertTrue(terminated_event.errorClassOnException is None)
+
+    def test_streaming_query_progress_fromJson(self):
+        progress_json = """
+            {
+              "id" : "00000000-0000-0001-0000-000000000001",
+              "runId" : "00000000-0000-0001-0000-000000000002",
+              "name" : "test",
+              "timestamp" : "2016-12-05T20:54:20.827Z",
+              "batchId" : 2,
+              "numInputRows" : 678,
+              "inputRowsPerSecond" : 10.0,
+              "processedRowsPerSecond" : 5.4,
+              "batchDuration": 5,
+              "durationMs" : {
+                "getBatch" : 0
+              },
+              "eventTime" : {},

Review Comment:
   probably better to set watermark and ensure related fields in here are also 
properly handled



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala:
##########
@@ -123,7 +128,17 @@ object StreamingQueryListener {
       val id: UUID,
       val runId: UUID,
       val name: String,
-      val timestamp: String) extends Event
+      val timestamp: String) extends Event {
+
+    def json: String = compact(render(jsonValue))
+
+    private def jsonValue: JValue = {

Review Comment:
   I'm not too concerned about the performance for this (this is happening at 
most once per every microbatch execution), but I'd agree with the movement on 
removing json4s as a dependency, unless we decide to shade it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to