JoshRosen commented on code in PR #36885:
URL: https://github.com/apache/spark/pull/36885#discussion_r911507266


##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -663,66 +854,69 @@ private[spark] object JsonProtocol {
       case `stageExecutorMetrics` => stageExecutorMetricsFromJson(json)
       case `blockUpdate` => blockUpdateFromJson(json)
       case `resourceProfileAdded` => resourceProfileAddedFromJson(json)
-      case other => mapper.readValue(compact(render(json)), 
Utils.classForName(other))
+      case other => mapper.readValue(json.toString, Utils.classForName(other))
         .asInstanceOf[SparkListenerEvent]
     }
   }
 
-  def stageSubmittedFromJson(json: JValue): SparkListenerStageSubmitted = {
-    val stageInfo = stageInfoFromJson(json \ "Stage Info")
-    val properties = propertiesFromJson(json \ "Properties")
+  def stageSubmittedFromJson(json: JsonNode): SparkListenerStageSubmitted = {
+    val stageInfo = stageInfoFromJson(json.get("Stage Info"))
+    val properties = propertiesFromJson(json.get("Properties"))
     SparkListenerStageSubmitted(stageInfo, properties)
   }
 
-  def stageCompletedFromJson(json: JValue): SparkListenerStageCompleted = {
-    val stageInfo = stageInfoFromJson(json \ "Stage Info")
+  def stageCompletedFromJson(json: JsonNode): SparkListenerStageCompleted = {
+    val stageInfo = stageInfoFromJson(json.get("Stage Info"))
     SparkListenerStageCompleted(stageInfo)
   }
 
-  def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
-    val stageId = (json \ "Stage ID").extract[Int]
+  def taskStartFromJson(json: JsonNode): SparkListenerTaskStart = {
+    val stageId = json.get("Stage ID").intValue
     val stageAttemptId =
-      jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
-    val taskInfo = taskInfoFromJson(json \ "Task Info")
+      jsonOption(json.get("Stage Attempt ID")).map(_.intValue).getOrElse(0)
+    val taskInfo = taskInfoFromJson(json.get("Task Info"))
     SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
   }
 
-  def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult 
= {
-    val taskInfo = taskInfoFromJson(json \ "Task Info")
+  def taskGettingResultFromJson(json: JsonNode): 
SparkListenerTaskGettingResult = {
+    val taskInfo = taskInfoFromJson(json.get("Task Info"))
     SparkListenerTaskGettingResult(taskInfo)
   }
 
   /** Extract the executor metrics from JSON. */
-  def executorMetricsFromJson(json: JValue): ExecutorMetrics = {
+  def executorMetricsFromJson(maybeJson: JsonNode): ExecutorMetrics = {
+    // Executor metrics might be absent in JSON from very old Spark versions.
+    // In this case we return zero values for each metric.
     val metrics =
       ExecutorMetricType.metricToOffset.map { case (metric, _) =>
-        metric -> jsonOption(json \ metric).map(_.extract[Long]).getOrElse(0L)
+        val metricValueJson = jsonOption(maybeJson).flatMap(json => 
jsonOption(json.get(metric)))
+        metric -> metricValueJson.map(_.longValue).getOrElse(0L)
       }
     new ExecutorMetrics(metrics.toMap)
   }
 
-  def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
-    val stageId = (json \ "Stage ID").extract[Int]
+  def taskEndFromJson(json: JsonNode): SparkListenerTaskEnd = {
+    val stageId = json.get("Stage ID").intValue
     val stageAttemptId =
-      jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
-    val taskType = (json \ "Task Type").extract[String]
-    val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
-    val taskInfo = taskInfoFromJson(json \ "Task Info")
-    val executorMetrics = executorMetricsFromJson(json \ "Task Executor 
Metrics")
-    val taskMetrics = taskMetricsFromJson(json \ "Task Metrics")
+      jsonOption(json.get("Stage Attempt ID")).map(_.intValue).getOrElse(0)

Review Comment:
   Pushed 03555c6f6074a87c64c059eeddf665b9ffc69c9a to add implicit `extract*` 
methods that perform typechecking. This has maybe a _slight_ performance impact 
but the new parsing code is still ~2x faster than the old Json4s code in my 
local tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to