JoshRosen commented on code in PR #36885:
URL: https://github.com/apache/spark/pull/36885#discussion_r911493733


##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -663,66 +854,69 @@ private[spark] object JsonProtocol {
       case `stageExecutorMetrics` => stageExecutorMetricsFromJson(json)
       case `blockUpdate` => blockUpdateFromJson(json)
       case `resourceProfileAdded` => resourceProfileAddedFromJson(json)
-      case other => mapper.readValue(compact(render(json)), 
Utils.classForName(other))
+      case other => mapper.readValue(json.toString, Utils.classForName(other))
         .asInstanceOf[SparkListenerEvent]
     }
   }
 
-  def stageSubmittedFromJson(json: JValue): SparkListenerStageSubmitted = {
-    val stageInfo = stageInfoFromJson(json \ "Stage Info")
-    val properties = propertiesFromJson(json \ "Properties")
+  def stageSubmittedFromJson(json: JsonNode): SparkListenerStageSubmitted = {
+    val stageInfo = stageInfoFromJson(json.get("Stage Info"))
+    val properties = propertiesFromJson(json.get("Properties"))
     SparkListenerStageSubmitted(stageInfo, properties)
   }
 
-  def stageCompletedFromJson(json: JValue): SparkListenerStageCompleted = {
-    val stageInfo = stageInfoFromJson(json \ "Stage Info")
+  def stageCompletedFromJson(json: JsonNode): SparkListenerStageCompleted = {
+    val stageInfo = stageInfoFromJson(json.get("Stage Info"))
     SparkListenerStageCompleted(stageInfo)
   }
 
-  def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
-    val stageId = (json \ "Stage ID").extract[Int]
+  def taskStartFromJson(json: JsonNode): SparkListenerTaskStart = {
+    val stageId = json.get("Stage ID").intValue
     val stageAttemptId =
-      jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
-    val taskInfo = taskInfoFromJson(json \ "Task Info")
+      jsonOption(json.get("Stage Attempt ID")).map(_.intValue).getOrElse(0)
+    val taskInfo = taskInfoFromJson(json.get("Task Info"))
     SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
   }
 
-  def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult 
= {
-    val taskInfo = taskInfoFromJson(json \ "Task Info")
+  def taskGettingResultFromJson(json: JsonNode): 
SparkListenerTaskGettingResult = {
+    val taskInfo = taskInfoFromJson(json.get("Task Info"))
     SparkListenerTaskGettingResult(taskInfo)
   }
 
   /** Extract the executor metrics from JSON. */
-  def executorMetricsFromJson(json: JValue): ExecutorMetrics = {
+  def executorMetricsFromJson(maybeJson: JsonNode): ExecutorMetrics = {
+    // Executor metrics might be absent in JSON from very old Spark versions.
+    // In this case we return zero values for each metric.
     val metrics =
       ExecutorMetricType.metricToOffset.map { case (metric, _) =>
-        metric -> jsonOption(json \ metric).map(_.extract[Long]).getOrElse(0L)
+        val metricValueJson = jsonOption(maybeJson).flatMap(json => 
jsonOption(json.get(metric)))
+        metric -> metricValueJson.map(_.longValue).getOrElse(0L)
       }
     new ExecutorMetrics(metrics.toMap)
   }
 
-  def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
-    val stageId = (json \ "Stage ID").extract[Int]
+  def taskEndFromJson(json: JsonNode): SparkListenerTaskEnd = {
+    val stageId = json.get("Stage ID").intValue
     val stageAttemptId =
-      jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
-    val taskType = (json \ "Task Type").extract[String]
-    val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
-    val taskInfo = taskInfoFromJson(json \ "Task Info")
-    val executorMetrics = executorMetricsFromJson(json \ "Task Executor 
Metrics")
-    val taskMetrics = taskMetricsFromJson(json \ "Task Metrics")
+      jsonOption(json.get("Stage Attempt ID")).map(_.intValue).getOrElse(0)

Review Comment:
   I did some experiments in a notebook with code similar to this:
   
   ```scala
   import org.json4s._
   import org.json4s.jackson.JsonMethods._
   import org.json4s.JsonDSL._
   import org.json4s.DefaultFormats
   
   import com.fasterxml.jackson.core.{JsonEncoding, JsonGenerator}
   import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, 
ObjectMapper}
   import com.fasterxml.jackson.module.scala.DefaultScalaModule
   
   implicit val format = DefaultFormats
   
   private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
     .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
   ```
   
   and test cases like
   
   ```scala
   import scala.util.Try
   val json = """{"value": 21474836470}"""
   val json4sResult = Try((parse(json) \ "value").extract[Int])
   val jacksonResult = Try(mapper.readTree(json).get("value").intValue)
   assert(json4sResult == jacksonResult, s"json4s: ${json4sResult}, jackson: 
${jacksonResult}")
   ```
   
   This uncovered one subtle corner-case: if you have a string field where a 
number is expected then my new code will silently return the zero value for the 
numeric type, whereas the old code would have failed with an exception:
   
   ```scala
   import scala.util.Try
   val json = """{"value": "1"}"""
   val json4sResult = Try((parse(json) \ "value").extract[Long])
   val jacksonResult = Try(mapper.readTree(json).get("value").longValue)
   assert(json4sResult == jacksonResult, s"json4s: ${json4sResult}, jackson: 
${jacksonResult}")
   // outputs: AssertionError: assertion failed: json4s: 
Failure(org.json4s.MappingException: Do not know how to convert JString(1) into 
long), jackson: Success(0)
   ```
   
   Similarly, `.textValue` will return `null` for non-Strings:
   
   ```scala
   import scala.util.Try
   val json = """{"value":1}"""
   val json4sResult = Try((parse(json) \ "value").extract[String])
   val jacksonResult = Try(mapper.readTree(json).get("value").textValue)
   assert(json4sResult == jacksonResult, s"json4s: ${json4sResult}, jackson: 
${jacksonResult}")
   // outputs: AssertionError: assertion failed: json4s: Success(1), jackson: 
Success(null)
   ```
   
   These behavior differences aren't an issue if we assume that Spark is always 
parsing well-formed JSON generated by itself,  but it is a potential risk of 
silent and confusing behaviors in case Spark reads malformed JSON. Given this, 
I'm going to explore whether I can write helper functions to add type checking 
so as to match the old behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to