JoshRosen commented on code in PR #36885:
URL: https://github.com/apache/spark/pull/36885#discussion_r911493733
##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -663,66 +854,69 @@ private[spark] object JsonProtocol {
case `stageExecutorMetrics` => stageExecutorMetricsFromJson(json)
case `blockUpdate` => blockUpdateFromJson(json)
case `resourceProfileAdded` => resourceProfileAddedFromJson(json)
- case other => mapper.readValue(compact(render(json)),
Utils.classForName(other))
+ case other => mapper.readValue(json.toString, Utils.classForName(other))
.asInstanceOf[SparkListenerEvent]
}
}
- def stageSubmittedFromJson(json: JValue): SparkListenerStageSubmitted = {
- val stageInfo = stageInfoFromJson(json \ "Stage Info")
- val properties = propertiesFromJson(json \ "Properties")
+ def stageSubmittedFromJson(json: JsonNode): SparkListenerStageSubmitted = {
+ val stageInfo = stageInfoFromJson(json.get("Stage Info"))
+ val properties = propertiesFromJson(json.get("Properties"))
SparkListenerStageSubmitted(stageInfo, properties)
}
- def stageCompletedFromJson(json: JValue): SparkListenerStageCompleted = {
- val stageInfo = stageInfoFromJson(json \ "Stage Info")
+ def stageCompletedFromJson(json: JsonNode): SparkListenerStageCompleted = {
+ val stageInfo = stageInfoFromJson(json.get("Stage Info"))
SparkListenerStageCompleted(stageInfo)
}
- def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
- val stageId = (json \ "Stage ID").extract[Int]
+ def taskStartFromJson(json: JsonNode): SparkListenerTaskStart = {
+ val stageId = json.get("Stage ID").intValue
val stageAttemptId =
- jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
- val taskInfo = taskInfoFromJson(json \ "Task Info")
+ jsonOption(json.get("Stage Attempt ID")).map(_.intValue).getOrElse(0)
+ val taskInfo = taskInfoFromJson(json.get("Task Info"))
SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
}
- def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult
= {
- val taskInfo = taskInfoFromJson(json \ "Task Info")
+ def taskGettingResultFromJson(json: JsonNode):
SparkListenerTaskGettingResult = {
+ val taskInfo = taskInfoFromJson(json.get("Task Info"))
SparkListenerTaskGettingResult(taskInfo)
}
/** Extract the executor metrics from JSON. */
- def executorMetricsFromJson(json: JValue): ExecutorMetrics = {
+ def executorMetricsFromJson(maybeJson: JsonNode): ExecutorMetrics = {
+ // Executor metrics might be absent in JSON from very old Spark versions.
+ // In this case we return zero values for each metric.
val metrics =
ExecutorMetricType.metricToOffset.map { case (metric, _) =>
- metric -> jsonOption(json \ metric).map(_.extract[Long]).getOrElse(0L)
+ val metricValueJson = jsonOption(maybeJson).flatMap(json =>
jsonOption(json.get(metric)))
+ metric -> metricValueJson.map(_.longValue).getOrElse(0L)
}
new ExecutorMetrics(metrics.toMap)
}
- def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
- val stageId = (json \ "Stage ID").extract[Int]
+ def taskEndFromJson(json: JsonNode): SparkListenerTaskEnd = {
+ val stageId = json.get("Stage ID").intValue
val stageAttemptId =
- jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
- val taskType = (json \ "Task Type").extract[String]
- val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
- val taskInfo = taskInfoFromJson(json \ "Task Info")
- val executorMetrics = executorMetricsFromJson(json \ "Task Executor
Metrics")
- val taskMetrics = taskMetricsFromJson(json \ "Task Metrics")
+ jsonOption(json.get("Stage Attempt ID")).map(_.intValue).getOrElse(0)
Review Comment:
I did some experiments in a notebook with code similar to this:
```scala
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonDSL._
import org.json4s.DefaultFormats
import com.fasterxml.jackson.core.{JsonEncoding, JsonGenerator}
import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode,
ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
implicit val format = DefaultFormats
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
```
and test cases like
```scala
import scala.util.Try
val json = """{"value": 21474836470}"""
val json4sResult = Try((parse(json) \ "value").extract[Int])
val jacksonResult = Try(mapper.readTree(json).get("value").intValue)
assert(json4sResult == jacksonResult, s"json4s: ${json4sResult}, jackson:
${jacksonResult}")
```
This uncovered one subtle corner-case: if you have a string field where a
number is expected then my new code will silently return the zero value for the
numeric type, whereas the old code would have failed with an exception:
```scala
import scala.util.Try
val json = """{"value": "1"}"""
val json4sResult = Try((parse(json) \ "value").extract[Long])
val jacksonResult = Try(mapper.readTree(json).get("value").longValue)
assert(json4sResult == jacksonResult, s"json4s: ${json4sResult}, jackson:
${jacksonResult}")
// outputs: AssertionError: assertion failed: json4s:
Failure(org.json4s.MappingException: Do not know how to convert JString(1) into
long), jackson: Success(0)
```
Similarly, `.textValue` will return `null` for non-Strings:
```scala
import scala.util.Try
val json = """{"value":1}"""
val json4sResult = Try((parse(json) \ "value").extract[String])
val jacksonResult = Try(mapper.readTree(json).get("value").textValue)
assert(json4sResult == jacksonResult, s"json4s: ${json4sResult}, jackson:
${jacksonResult}")
// outputs: AssertionError: assertion failed: json4s: Success(1), jackson:
Success(null)
```
These behavior differences aren't an issue if we assume that Spark is always
parsing well-formed JSON generated by itself, but it is a potential risk of
silent and confusing behaviors in case Spark reads malformed JSON. Given this,
I'm going to explore whether I can write helper functions to add type checking
so as to match the old behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]