LuciferYang commented on code in PR #36885:
URL: https://github.com/apache/spark/pull/36885#discussion_r898806559
##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -731,183 +925,180 @@ private[spark] object JsonProtocol {
SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
}
- def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
- val jobId = (json \ "Job ID").extract[Int]
+ def jobEndFromJson(json: JsonNode): SparkListenerJobEnd = {
+ val jobId = json.get("Job ID").intValue
val completionTime =
- jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
- val jobResult = jobResultFromJson(json \ "Job Result")
+ jsonOption(json.get("Completion Time")).map(_.longValue).getOrElse(-1L)
+ val jobResult = jobResultFromJson(json.get("Job Result"))
SparkListenerJobEnd(jobId, completionTime, jobResult)
}
- def resourceProfileAddedFromJson(json: JValue):
SparkListenerResourceProfileAdded = {
- val profId = (json \ "Resource Profile Id").extract[Int]
- val executorReqs = executorResourceRequestMapFromJson(json \ "Executor
Resource Requests")
- val taskReqs = taskResourceRequestMapFromJson(json \ "Task Resource
Requests")
+ def resourceProfileAddedFromJson(json: JsonNode):
SparkListenerResourceProfileAdded = {
+ val profId = json.get("Resource Profile Id").intValue
+ val executorReqs = executorResourceRequestMapFromJson(json.get("Executor
Resource Requests"))
+ val taskReqs = taskResourceRequestMapFromJson(json.get("Task Resource
Requests"))
val rp = new ResourceProfile(executorReqs.toMap, taskReqs.toMap)
rp.setResourceProfileId(profId)
SparkListenerResourceProfileAdded(rp)
}
- def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest =
{
- val rName = (json \ "Resource Name").extract[String]
- val amount = (json \ "Amount").extract[Int]
- val discoveryScript = (json \ "Discovery Script").extract[String]
- val vendor = (json \ "Vendor").extract[String]
+ def executorResourceRequestFromJson(json: JsonNode): ExecutorResourceRequest
= {
+ val rName = json.get("Resource Name").textValue
+ val amount = json.get("Amount").intValue
+ val discoveryScript = json.get("Discovery Script").textValue
+ val vendor = json.get("Vendor").textValue
new ExecutorResourceRequest(rName, amount, discoveryScript, vendor)
}
- def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = {
- val rName = (json \ "Resource Name").extract[String]
- val amount = (json \ "Amount").extract[Int]
+ def taskResourceRequestFromJson(json: JsonNode): TaskResourceRequest = {
+ val rName = json.get("Resource Name").textValue
+ val amount = json.get("Amount").intValue
new TaskResourceRequest(rName, amount)
}
- def taskResourceRequestMapFromJson(json: JValue): Map[String,
TaskResourceRequest] = {
- val jsonFields = json.asInstanceOf[JObject].obj
- jsonFields.collect { case JField(k, v) =>
- val req = taskResourceRequestFromJson(v)
- (k, req)
+ def taskResourceRequestMapFromJson(json: JsonNode): Map[String,
TaskResourceRequest] = {
+ json.fields().asScala.collect { case field =>
+ val req = taskResourceRequestFromJson(field.getValue)
+ (field.getKey, req)
}.toMap
}
- def executorResourceRequestMapFromJson(json: JValue): Map[String,
ExecutorResourceRequest] = {
- val jsonFields = json.asInstanceOf[JObject].obj
- jsonFields.collect { case JField(k, v) =>
- val req = executorResourceRequestFromJson(v)
- (k, req)
+ def executorResourceRequestMapFromJson(json: JsonNode): Map[String,
ExecutorResourceRequest] = {
+ json.fields().asScala.collect { case field =>
+ val req = executorResourceRequestFromJson(field.getValue)
+ (field.getKey, req)
}.toMap
}
- def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate
= {
+ def environmentUpdateFromJson(json: JsonNode):
SparkListenerEnvironmentUpdate = {
// For compatible with previous event logs
- val hadoopProperties = jsonOption(json \ "Hadoop
Properties").map(mapFromJson(_).toSeq)
+ val hadoopProperties = jsonOption(json.get("Hadoop
Properties")).map(mapFromJson(_).toSeq)
.getOrElse(Seq.empty)
- val metricsProperties = jsonOption(json \ "Metrics
Properties").map(mapFromJson(_).toSeq)
+ val metricsProperties = jsonOption(json.get("Metrics
Properties")).map(mapFromJson(_).toSeq)
.getOrElse(Seq.empty)
val environmentDetails = Map[String, Seq[(String, String)]](
- "JVM Information" -> mapFromJson(json \ "JVM Information").toSeq,
- "Spark Properties" -> mapFromJson(json \ "Spark Properties").toSeq,
+ "JVM Information" -> mapFromJson(json.get("JVM Information")).toSeq,
+ "Spark Properties" -> mapFromJson(json.get("Spark Properties")).toSeq,
"Hadoop Properties" -> hadoopProperties,
- "System Properties" -> mapFromJson(json \ "System Properties").toSeq,
+ "System Properties" -> mapFromJson(json.get("System Properties")).toSeq,
"Metrics Properties" -> metricsProperties,
- "Classpath Entries" -> mapFromJson(json \ "Classpath Entries").toSeq)
+ "Classpath Entries" -> mapFromJson(json.get("Classpath Entries")).toSeq)
SparkListenerEnvironmentUpdate(environmentDetails)
}
- def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded
= {
- val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
- val maxMem = (json \ "Maximum Memory").extract[Long]
- val time = jsonOption(json \
"Timestamp").map(_.extract[Long]).getOrElse(-1L)
- val maxOnHeapMem = jsonOption(json \ "Maximum Onheap
Memory").map(_.extract[Long])
- val maxOffHeapMem = jsonOption(json \ "Maximum Offheap
Memory").map(_.extract[Long])
+ def blockManagerAddedFromJson(json: JsonNode):
SparkListenerBlockManagerAdded = {
+ val blockManagerId = blockManagerIdFromJson(json.get("Block Manager ID"))
+ val maxMem = json.get("Maximum Memory").longValue
+ val time =
jsonOption(json.get("Timestamp")).map(_.longValue).getOrElse(-1L)
+ val maxOnHeapMem = jsonOption(json.get("Maximum Onheap
Memory")).map(_.longValue)
+ val maxOffHeapMem = jsonOption(json.get("Maximum Offheap
Memory")).map(_.longValue)
SparkListenerBlockManagerAdded(time, blockManagerId, maxMem, maxOnHeapMem,
maxOffHeapMem)
}
- def blockManagerRemovedFromJson(json: JValue):
SparkListenerBlockManagerRemoved = {
- val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
- val time = jsonOption(json \
"Timestamp").map(_.extract[Long]).getOrElse(-1L)
+ def blockManagerRemovedFromJson(json: JsonNode):
SparkListenerBlockManagerRemoved = {
+ val blockManagerId = blockManagerIdFromJson(json.get("Block Manager ID"))
+ val time =
jsonOption(json.get("Timestamp")).map(_.longValue).getOrElse(-1L)
SparkListenerBlockManagerRemoved(time, blockManagerId)
}
- def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {
- SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int])
+ def unpersistRDDFromJson(json: JsonNode): SparkListenerUnpersistRDD = {
+ SparkListenerUnpersistRDD(json.get("RDD ID").intValue)
}
- def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
- val appName = (json \ "App Name").extract[String]
- val appId = jsonOption(json \ "App ID").map(_.extract[String])
- val time = (json \ "Timestamp").extract[Long]
- val sparkUser = (json \ "User").extract[String]
- val appAttemptId = jsonOption(json \ "App Attempt
ID").map(_.extract[String])
- val driverLogs = jsonOption(json \ "Driver Logs").map(mapFromJson)
- val driverAttributes = jsonOption(json \ "Driver
Attributes").map(mapFromJson)
+ def applicationStartFromJson(json: JsonNode): SparkListenerApplicationStart
= {
+ val appName = json.get("App Name").textValue
+ val appId = jsonOption(json.get("App ID")).map(_.asText())
+ val time = json.get("Timestamp").longValue
+ val sparkUser = json.get("User").textValue
+ val appAttemptId = jsonOption(json.get("App Attempt ID")).map(_.asText())
+ val driverLogs = jsonOption(json.get("Driver Logs")).map(mapFromJson)
+ val driverAttributes = jsonOption(json.get("Driver
Attributes")).map(mapFromJson)
SparkListenerApplicationStart(appName, appId, time, sparkUser,
appAttemptId, driverLogs,
driverAttributes)
}
- def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
- SparkListenerApplicationEnd((json \ "Timestamp").extract[Long])
+ def applicationEndFromJson(json: JsonNode): SparkListenerApplicationEnd = {
+ SparkListenerApplicationEnd(json.get("Timestamp").longValue)
}
- def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = {
- val time = (json \ "Timestamp").extract[Long]
- val executorId = (json \ "Executor ID").extract[String]
- val executorInfo = executorInfoFromJson(json \ "Executor Info")
+ def executorAddedFromJson(json: JsonNode): SparkListenerExecutorAdded = {
+ val time = json.get("Timestamp").longValue
+ val executorId = json.get("Executor ID").textValue
+ val executorInfo = executorInfoFromJson(json.get("Executor Info"))
SparkListenerExecutorAdded(time, executorId, executorInfo)
}
- def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = {
- val time = (json \ "Timestamp").extract[Long]
- val executorId = (json \ "Executor ID").extract[String]
- val reason = (json \ "Removed Reason").extract[String]
+ def executorRemovedFromJson(json: JsonNode): SparkListenerExecutorRemoved = {
+ val time = json.get("Timestamp").longValue
+ val executorId = json.get("Executor ID").textValue
+ val reason = json.get("Removed Reason").textValue
SparkListenerExecutorRemoved(time, executorId, reason)
}
- def logStartFromJson(json: JValue): SparkListenerLogStart = {
- val sparkVersion = (json \ "Spark Version").extract[String]
+ def logStartFromJson(json: JsonNode): SparkListenerLogStart = {
+ val sparkVersion = json.get("Spark Version").textValue
SparkListenerLogStart(sparkVersion)
}
- def executorMetricsUpdateFromJson(json: JValue):
SparkListenerExecutorMetricsUpdate = {
- val execInfo = (json \ "Executor ID").extract[String]
- val accumUpdates = (json \ "Metrics Updated").extract[List[JValue]].map {
json =>
- val taskId = (json \ "Task ID").extract[Long]
- val stageId = (json \ "Stage ID").extract[Int]
- val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
+ def executorMetricsUpdateFromJson(json: JsonNode):
SparkListenerExecutorMetricsUpdate = {
+ val execInfo = json.get("Executor ID").textValue
+ val accumUpdates = json.get("Metrics Updated").elements.asScala.map { json
=>
+ val taskId = json.get("Task ID").longValue
+ val stageId = json.get("Stage ID").intValue
+ val stageAttemptId = json.get("Stage Attempt ID").intValue
val updates =
- (json \ "Accumulator
Updates").extract[List[JValue]].map(accumulableInfoFromJson)
+ json.get("Accumulator
Updates").elements.asScala.map(accumulableInfoFromJson).toArray.toSeq
(taskId, stageId, stageAttemptId, updates)
- }
- val executorUpdates = (json \ "Executor Metrics Updated") match {
- case JNothing => Map.empty[(Int, Int), ExecutorMetrics]
- case value: JValue => value.extract[List[JValue]].map { json =>
- val stageId = (json \ "Stage ID").extract[Int]
- val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
- val executorMetrics = executorMetricsFromJson(json \ "Executor
Metrics")
+ }.toArray.toSeq
+ val executorUpdates = jsonOption(json.get("Executor Metrics Updated")).map
{ value =>
+ value.elements.asScala.map { json =>
+ val stageId = json.get("Stage ID").intValue
+ val stageAttemptId = json.get("Stage Attempt ID").intValue
+ val executorMetrics = executorMetricsFromJson(json.get("Executor
Metrics"))
((stageId, stageAttemptId) -> executorMetrics)
}.toMap
- }
+ }.getOrElse(Map.empty)
Review Comment:
seems should be `Map.empty[(Int, Int), ExecutorMetrics]`, otherwise
compilation will fail with `Scala-2.13`:
```
[error]
/home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:1061:64:
type mismatch;
[error] found :
scala.collection.Map[_1,org.apache.spark.executor.ExecutorMetrics] where type
_1 <: (Int, Int)
[error] required: scala.collection.Map[(Int,
Int),org.apache.spark.executor.ExecutorMetrics]
[error] Note: _1 <: (Int, Int), but trait Map is invariant in type K.
[error] You may wish to investigate a wildcard type such as `_ <: (Int,
Int)`. (SLS 3.2.10)
[error] SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates,
executorUpdates)
[error] ^
[error] one error found
[error] (core / Compile / compileIncremental) Compilation failed
[error] Total time: 136 s (02:16), completed Jun 16, 2022 2:07:49 AM
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]