LuciferYang commented on code in PR #36885:
URL: https://github.com/apache/spark/pull/36885#discussion_r898806559


##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -731,183 +925,180 @@ private[spark] object JsonProtocol {
     SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
   }
 
-  def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
-    val jobId = (json \ "Job ID").extract[Int]
+  def jobEndFromJson(json: JsonNode): SparkListenerJobEnd = {
+    val jobId = json.get("Job ID").intValue
     val completionTime =
-      jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
-    val jobResult = jobResultFromJson(json \ "Job Result")
+      jsonOption(json.get("Completion Time")).map(_.longValue).getOrElse(-1L)
+    val jobResult = jobResultFromJson(json.get("Job Result"))
     SparkListenerJobEnd(jobId, completionTime, jobResult)
   }
 
-  def resourceProfileAddedFromJson(json: JValue): 
SparkListenerResourceProfileAdded = {
-    val profId = (json \ "Resource Profile Id").extract[Int]
-    val executorReqs = executorResourceRequestMapFromJson(json \ "Executor 
Resource Requests")
-    val taskReqs = taskResourceRequestMapFromJson(json \ "Task Resource 
Requests")
+  def resourceProfileAddedFromJson(json: JsonNode): 
SparkListenerResourceProfileAdded = {
+    val profId = json.get("Resource Profile Id").intValue
+    val executorReqs = executorResourceRequestMapFromJson(json.get("Executor 
Resource Requests"))
+    val taskReqs = taskResourceRequestMapFromJson(json.get("Task Resource 
Requests"))
     val rp = new ResourceProfile(executorReqs.toMap, taskReqs.toMap)
     rp.setResourceProfileId(profId)
     SparkListenerResourceProfileAdded(rp)
   }
 
-  def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest = 
{
-    val rName = (json \ "Resource Name").extract[String]
-    val amount = (json \ "Amount").extract[Int]
-    val discoveryScript = (json \ "Discovery Script").extract[String]
-    val vendor = (json \ "Vendor").extract[String]
+  def executorResourceRequestFromJson(json: JsonNode): ExecutorResourceRequest 
= {
+    val rName = json.get("Resource Name").textValue
+    val amount = json.get("Amount").intValue
+    val discoveryScript = json.get("Discovery Script").textValue
+    val vendor = json.get("Vendor").textValue
     new ExecutorResourceRequest(rName, amount, discoveryScript, vendor)
   }
 
-  def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = {
-    val rName = (json \ "Resource Name").extract[String]
-    val amount = (json \ "Amount").extract[Int]
+  def taskResourceRequestFromJson(json: JsonNode): TaskResourceRequest = {
+    val rName = json.get("Resource Name").textValue
+    val amount = json.get("Amount").intValue
     new TaskResourceRequest(rName, amount)
   }
 
-  def taskResourceRequestMapFromJson(json: JValue): Map[String, 
TaskResourceRequest] = {
-    val jsonFields = json.asInstanceOf[JObject].obj
-    jsonFields.collect { case JField(k, v) =>
-      val req = taskResourceRequestFromJson(v)
-      (k, req)
+  def taskResourceRequestMapFromJson(json: JsonNode): Map[String, 
TaskResourceRequest] = {
+    json.fields().asScala.collect { case field =>
+      val req = taskResourceRequestFromJson(field.getValue)
+      (field.getKey, req)
     }.toMap
   }
 
-  def executorResourceRequestMapFromJson(json: JValue): Map[String, 
ExecutorResourceRequest] = {
-    val jsonFields = json.asInstanceOf[JObject].obj
-    jsonFields.collect { case JField(k, v) =>
-      val req = executorResourceRequestFromJson(v)
-      (k, req)
+  def executorResourceRequestMapFromJson(json: JsonNode): Map[String, 
ExecutorResourceRequest] = {
+    json.fields().asScala.collect { case field =>
+      val req = executorResourceRequestFromJson(field.getValue)
+      (field.getKey, req)
     }.toMap
   }
 
-  def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate 
= {
+  def environmentUpdateFromJson(json: JsonNode): 
SparkListenerEnvironmentUpdate = {
     // For compatible with previous event logs
-    val hadoopProperties = jsonOption(json \ "Hadoop 
Properties").map(mapFromJson(_).toSeq)
+    val hadoopProperties = jsonOption(json.get("Hadoop 
Properties")).map(mapFromJson(_).toSeq)
       .getOrElse(Seq.empty)
-    val metricsProperties = jsonOption(json \ "Metrics 
Properties").map(mapFromJson(_).toSeq)
+    val metricsProperties = jsonOption(json.get("Metrics 
Properties")).map(mapFromJson(_).toSeq)
       .getOrElse(Seq.empty)
     val environmentDetails = Map[String, Seq[(String, String)]](
-      "JVM Information" -> mapFromJson(json \ "JVM Information").toSeq,
-      "Spark Properties" -> mapFromJson(json \ "Spark Properties").toSeq,
+      "JVM Information" -> mapFromJson(json.get("JVM Information")).toSeq,
+      "Spark Properties" -> mapFromJson(json.get("Spark Properties")).toSeq,
       "Hadoop Properties" -> hadoopProperties,
-      "System Properties" -> mapFromJson(json \ "System Properties").toSeq,
+      "System Properties" -> mapFromJson(json.get("System Properties")).toSeq,
       "Metrics Properties" -> metricsProperties,
-      "Classpath Entries" -> mapFromJson(json \ "Classpath Entries").toSeq)
+      "Classpath Entries" -> mapFromJson(json.get("Classpath Entries")).toSeq)
     SparkListenerEnvironmentUpdate(environmentDetails)
   }
 
-  def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded 
= {
-    val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
-    val maxMem = (json \ "Maximum Memory").extract[Long]
-    val time = jsonOption(json \ 
"Timestamp").map(_.extract[Long]).getOrElse(-1L)
-    val maxOnHeapMem = jsonOption(json \ "Maximum Onheap 
Memory").map(_.extract[Long])
-    val maxOffHeapMem = jsonOption(json \ "Maximum Offheap 
Memory").map(_.extract[Long])
+  def blockManagerAddedFromJson(json: JsonNode): 
SparkListenerBlockManagerAdded = {
+    val blockManagerId = blockManagerIdFromJson(json.get("Block Manager ID"))
+    val maxMem = json.get("Maximum Memory").longValue
+    val time = 
jsonOption(json.get("Timestamp")).map(_.longValue).getOrElse(-1L)
+    val maxOnHeapMem = jsonOption(json.get("Maximum Onheap 
Memory")).map(_.longValue)
+    val maxOffHeapMem = jsonOption(json.get("Maximum Offheap 
Memory")).map(_.longValue)
     SparkListenerBlockManagerAdded(time, blockManagerId, maxMem, maxOnHeapMem, 
maxOffHeapMem)
   }
 
-  def blockManagerRemovedFromJson(json: JValue): 
SparkListenerBlockManagerRemoved = {
-    val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
-    val time = jsonOption(json \ 
"Timestamp").map(_.extract[Long]).getOrElse(-1L)
+  def blockManagerRemovedFromJson(json: JsonNode): 
SparkListenerBlockManagerRemoved = {
+    val blockManagerId = blockManagerIdFromJson(json.get("Block Manager ID"))
+    val time = 
jsonOption(json.get("Timestamp")).map(_.longValue).getOrElse(-1L)
     SparkListenerBlockManagerRemoved(time, blockManagerId)
   }
 
-  def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {
-    SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int])
+  def unpersistRDDFromJson(json: JsonNode): SparkListenerUnpersistRDD = {
+    SparkListenerUnpersistRDD(json.get("RDD ID").intValue)
   }
 
-  def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
-    val appName = (json \ "App Name").extract[String]
-    val appId = jsonOption(json \ "App ID").map(_.extract[String])
-    val time = (json \ "Timestamp").extract[Long]
-    val sparkUser = (json \ "User").extract[String]
-    val appAttemptId = jsonOption(json \ "App Attempt 
ID").map(_.extract[String])
-    val driverLogs = jsonOption(json \ "Driver Logs").map(mapFromJson)
-    val driverAttributes = jsonOption(json \ "Driver 
Attributes").map(mapFromJson)
+  def applicationStartFromJson(json: JsonNode): SparkListenerApplicationStart 
= {
+    val appName = json.get("App Name").textValue
+    val appId = jsonOption(json.get("App ID")).map(_.asText())
+    val time = json.get("Timestamp").longValue
+    val sparkUser = json.get("User").textValue
+    val appAttemptId = jsonOption(json.get("App Attempt ID")).map(_.asText())
+    val driverLogs = jsonOption(json.get("Driver Logs")).map(mapFromJson)
+    val driverAttributes = jsonOption(json.get("Driver 
Attributes")).map(mapFromJson)
     SparkListenerApplicationStart(appName, appId, time, sparkUser, 
appAttemptId, driverLogs,
       driverAttributes)
   }
 
-  def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
-    SparkListenerApplicationEnd((json \ "Timestamp").extract[Long])
+  def applicationEndFromJson(json: JsonNode): SparkListenerApplicationEnd = {
+    SparkListenerApplicationEnd(json.get("Timestamp").longValue)
   }
 
-  def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = {
-    val time = (json \ "Timestamp").extract[Long]
-    val executorId = (json \ "Executor ID").extract[String]
-    val executorInfo = executorInfoFromJson(json \ "Executor Info")
+  def executorAddedFromJson(json: JsonNode): SparkListenerExecutorAdded = {
+    val time = json.get("Timestamp").longValue
+    val executorId = json.get("Executor ID").textValue
+    val executorInfo = executorInfoFromJson(json.get("Executor Info"))
     SparkListenerExecutorAdded(time, executorId, executorInfo)
   }
 
-  def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = {
-    val time = (json \ "Timestamp").extract[Long]
-    val executorId = (json \ "Executor ID").extract[String]
-    val reason = (json \ "Removed Reason").extract[String]
+  def executorRemovedFromJson(json: JsonNode): SparkListenerExecutorRemoved = {
+    val time = json.get("Timestamp").longValue
+    val executorId = json.get("Executor ID").textValue
+    val reason = json.get("Removed Reason").textValue
     SparkListenerExecutorRemoved(time, executorId, reason)
   }
 
-  def logStartFromJson(json: JValue): SparkListenerLogStart = {
-    val sparkVersion = (json \ "Spark Version").extract[String]
+  def logStartFromJson(json: JsonNode): SparkListenerLogStart = {
+    val sparkVersion = json.get("Spark Version").textValue
     SparkListenerLogStart(sparkVersion)
   }
 
-  def executorMetricsUpdateFromJson(json: JValue): 
SparkListenerExecutorMetricsUpdate = {
-    val execInfo = (json \ "Executor ID").extract[String]
-    val accumUpdates = (json \ "Metrics Updated").extract[List[JValue]].map { 
json =>
-      val taskId = (json \ "Task ID").extract[Long]
-      val stageId = (json \ "Stage ID").extract[Int]
-      val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
+  def executorMetricsUpdateFromJson(json: JsonNode): 
SparkListenerExecutorMetricsUpdate = {
+    val execInfo = json.get("Executor ID").textValue
+    val accumUpdates = json.get("Metrics Updated").elements.asScala.map { json 
=>
+      val taskId = json.get("Task ID").longValue
+      val stageId = json.get("Stage ID").intValue
+      val stageAttemptId = json.get("Stage Attempt ID").intValue
       val updates =
-        (json \ "Accumulator 
Updates").extract[List[JValue]].map(accumulableInfoFromJson)
+        json.get("Accumulator 
Updates").elements.asScala.map(accumulableInfoFromJson).toArray.toSeq
       (taskId, stageId, stageAttemptId, updates)
-    }
-    val executorUpdates = (json \ "Executor Metrics Updated") match {
-      case JNothing => Map.empty[(Int, Int), ExecutorMetrics]
-      case value: JValue => value.extract[List[JValue]].map { json =>
-        val stageId = (json \ "Stage ID").extract[Int]
-        val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
-        val executorMetrics = executorMetricsFromJson(json \ "Executor 
Metrics")
+    }.toArray.toSeq
+    val executorUpdates = jsonOption(json.get("Executor Metrics Updated")).map 
{ value =>
+      value.elements.asScala.map { json =>
+        val stageId = json.get("Stage ID").intValue
+        val stageAttemptId = json.get("Stage Attempt ID").intValue
+        val executorMetrics = executorMetricsFromJson(json.get("Executor 
Metrics"))
         ((stageId, stageAttemptId) -> executorMetrics)
       }.toMap
-    }
+    }.getOrElse(Map.empty)

Review Comment:
   seems should be `Map.empty[(Int, Int), ExecutorMetrics]`, otherwise 
compilation will fail with `Scala-2.13`:
   
   ```
   [error] 
/home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:1061:64:
 type mismatch;
   [error]  found   : 
scala.collection.Map[_1,org.apache.spark.executor.ExecutorMetrics] where type 
_1 <: (Int, Int)
   [error]  required: scala.collection.Map[(Int, 
Int),org.apache.spark.executor.ExecutorMetrics]
   [error] Note: _1 <: (Int, Int), but trait Map is invariant in type K.
   [error] You may wish to investigate a wildcard type such as `_ <: (Int, 
Int)`. (SLS 3.2.10)
   [error]     SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates, 
executorUpdates)
   [error]                                                                ^
   [error] one error found
   [error] (core / Compile / compileIncremental) Compilation failed
   [error] Total time: 136 s (02:16), completed Jun 16, 2022 2:07:49 AM
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to