JoshRosen commented on code in PR #36885:
URL: https://github.com/apache/spark/pull/36885#discussion_r900557554


##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -57,298 +57,398 @@ import org.apache.spark.util.Utils.weakIntern
 private[spark] object JsonProtocol {
   // TODO: Remove this file and put JSON serialization into each individual 
class.
 
-  private implicit val format = DefaultFormats
-
   private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
     .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
 
   /** ------------------------------------------------- *
    * JSON serialization methods for SparkListenerEvents |
    * -------------------------------------------------- */
 
-  def sparkEventToJson(event: SparkListenerEvent): JValue = {
+  def sparkEventToJsonString(event: SparkListenerEvent): String = {
+    toJsonString { generator =>
+      writeSparkEventToJson(event, generator)
+    }
+  }
+
+  def toJsonString(block: JsonGenerator => Unit): String = {
+    val baos = new ByteArrayOutputStream()
+    val generator = mapper.createGenerator(baos, JsonEncoding.UTF8)
+    block(generator)
+    generator.close()
+    new String(baos.toByteArray, StandardCharsets.UTF_8)
+  }
+
+  def writeSparkEventToJson(event: SparkListenerEvent, g: JsonGenerator): Unit 
= {
     event match {
       case stageSubmitted: SparkListenerStageSubmitted =>
-        stageSubmittedToJson(stageSubmitted)
+        stageSubmittedToJson(stageSubmitted, g)
       case stageCompleted: SparkListenerStageCompleted =>
-        stageCompletedToJson(stageCompleted)
+        stageCompletedToJson(stageCompleted, g)
       case taskStart: SparkListenerTaskStart =>
-        taskStartToJson(taskStart)
+        taskStartToJson(taskStart, g)
       case taskGettingResult: SparkListenerTaskGettingResult =>
-        taskGettingResultToJson(taskGettingResult)
+        taskGettingResultToJson(taskGettingResult, g)
       case taskEnd: SparkListenerTaskEnd =>
-        taskEndToJson(taskEnd)
+        taskEndToJson(taskEnd, g)
       case jobStart: SparkListenerJobStart =>
-        jobStartToJson(jobStart)
+        jobStartToJson(jobStart, g)
       case jobEnd: SparkListenerJobEnd =>
-        jobEndToJson(jobEnd)
+        jobEndToJson(jobEnd, g)
       case environmentUpdate: SparkListenerEnvironmentUpdate =>
-        environmentUpdateToJson(environmentUpdate)
+        environmentUpdateToJson(environmentUpdate, g)
       case blockManagerAdded: SparkListenerBlockManagerAdded =>
-        blockManagerAddedToJson(blockManagerAdded)
+        blockManagerAddedToJson(blockManagerAdded, g)
       case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
-        blockManagerRemovedToJson(blockManagerRemoved)
+        blockManagerRemovedToJson(blockManagerRemoved, g)
       case unpersistRDD: SparkListenerUnpersistRDD =>
-        unpersistRDDToJson(unpersistRDD)
+        unpersistRDDToJson(unpersistRDD, g)
       case applicationStart: SparkListenerApplicationStart =>
-        applicationStartToJson(applicationStart)
+        applicationStartToJson(applicationStart, g)
       case applicationEnd: SparkListenerApplicationEnd =>
-        applicationEndToJson(applicationEnd)
+        applicationEndToJson(applicationEnd, g)
       case executorAdded: SparkListenerExecutorAdded =>
-        executorAddedToJson(executorAdded)
+        executorAddedToJson(executorAdded, g)
       case executorRemoved: SparkListenerExecutorRemoved =>
-        executorRemovedToJson(executorRemoved)
+        executorRemovedToJson(executorRemoved, g)
       case logStart: SparkListenerLogStart =>
-        logStartToJson(logStart)
+        logStartToJson(logStart, g)
       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
-        executorMetricsUpdateToJson(metricsUpdate)
+        executorMetricsUpdateToJson(metricsUpdate, g)
       case stageExecutorMetrics: SparkListenerStageExecutorMetrics =>
-        stageExecutorMetricsToJson(stageExecutorMetrics)
+        stageExecutorMetricsToJson(stageExecutorMetrics, g)
       case blockUpdate: SparkListenerBlockUpdated =>
-        blockUpdateToJson(blockUpdate)
+        blockUpdateToJson(blockUpdate, g)
       case resourceProfileAdded: SparkListenerResourceProfileAdded =>
-        resourceProfileAddedToJson(resourceProfileAdded)
-      case _ => parse(mapper.writeValueAsString(event))
+        resourceProfileAddedToJson(resourceProfileAdded, g)
+      case _ =>
+        mapper.writeValue(g, event)
     }
   }
 
-  def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): 
JValue = {
-    val stageInfo = stageInfoToJson(stageSubmitted.stageInfo)
-    val properties = propertiesToJson(stageSubmitted.properties)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) ~
-    ("Stage Info" -> stageInfo) ~
-    ("Properties" -> properties)
+  def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted, g: 
JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted)
+    g.writeFieldName("Stage Info")
+    stageInfoToJson(stageSubmitted.stageInfo, g)
+    g.writeFieldName("Properties")
+    propertiesToJson(stageSubmitted.properties, g)
+    g.writeEndObject()
   }
 
-  def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): 
JValue = {
-    val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) ~
-    ("Stage Info" -> stageInfo)
+  def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted, g: 
JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted)
+    g.writeFieldName("Stage Info")
+    stageInfoToJson(stageCompleted.stageInfo, g)
+    g.writeEndObject()
   }
 
-  def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
-    val taskInfo = taskStart.taskInfo
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) ~
-    ("Stage ID" -> taskStart.stageId) ~
-    ("Stage Attempt ID" -> taskStart.stageAttemptId) ~
-    ("Task Info" -> taskInfoToJson(taskInfo))
+  def taskStartToJson(taskStart: SparkListenerTaskStart, g: JsonGenerator): 
Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart)
+    g.writeNumberField("Stage ID", taskStart.stageId)
+    g.writeNumberField("Stage Attempt ID", taskStart.stageAttemptId)
+    g.writeFieldName("Task Info")
+    taskInfoToJson(taskStart.taskInfo, g)
+    g.writeEndObject()
   }
 
-  def taskGettingResultToJson(taskGettingResult: 
SparkListenerTaskGettingResult): JValue = {
+  def taskGettingResultToJson(
+      taskGettingResult: SparkListenerTaskGettingResult,
+      g: JsonGenerator): Unit = {
     val taskInfo = taskGettingResult.taskInfo
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) ~
-    ("Task Info" -> taskInfoToJson(taskInfo))
-  }
-
-  def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = {
-    val taskEndReason = taskEndReasonToJson(taskEnd.reason)
-    val taskInfo = taskEnd.taskInfo
-    val executorMetrics = taskEnd.taskExecutorMetrics
-    val taskMetrics = taskEnd.taskMetrics
-    val taskMetricsJson = if (taskMetrics != null) 
taskMetricsToJson(taskMetrics) else JNothing
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) ~
-    ("Stage ID" -> taskEnd.stageId) ~
-    ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
-    ("Task Type" -> taskEnd.taskType) ~
-    ("Task End Reason" -> taskEndReason) ~
-    ("Task Info" -> taskInfoToJson(taskInfo)) ~
-    ("Task Executor Metrics" -> executorMetricsToJson(executorMetrics)) ~
-    ("Task Metrics" -> taskMetricsJson)
-  }
-
-  def jobStartToJson(jobStart: SparkListenerJobStart): JValue = {
-    val properties = propertiesToJson(jobStart.properties)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) ~
-    ("Job ID" -> jobStart.jobId) ~
-    ("Submission Time" -> jobStart.time) ~
-    ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~  // Added in 
Spark 1.2.0
-    ("Stage IDs" -> jobStart.stageIds) ~
-    ("Properties" -> properties)
-  }
-
-  def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = {
-    val jobResult = jobResultToJson(jobEnd.jobResult)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobEnd) ~
-    ("Job ID" -> jobEnd.jobId) ~
-    ("Completion Time" -> jobEnd.time) ~
-    ("Job Result" -> jobResult)
-  }
-
-  def environmentUpdateToJson(environmentUpdate: 
SparkListenerEnvironmentUpdate): JValue = {
+    g.writeStartObject()
+    g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult)
+    g.writeFieldName("Task Info")
+    taskInfoToJson(taskInfo, g)
+    g.writeEndObject()
+  }
+
+  def taskEndToJson(taskEnd: SparkListenerTaskEnd, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd)
+    g.writeNumberField("Stage ID", taskEnd.stageId)
+    g.writeNumberField("Stage Attempt ID", taskEnd.stageAttemptId)
+    g.writeStringField("Task Type", taskEnd.taskType)
+    g.writeFieldName("Task End Reason")
+    taskEndReasonToJson(taskEnd.reason, g)
+    g.writeFieldName("Task Info")
+    taskInfoToJson(taskEnd.taskInfo, g)
+    g.writeFieldName("Task Executor Metrics")
+    executorMetricsToJson(taskEnd.taskExecutorMetrics, g)
+    Option(taskEnd.taskMetrics).foreach { m =>
+      g.writeFieldName("Task Metrics")
+      taskMetricsToJson(m, g)
+    }

Review Comment:
   This is correct and matches the Json4s behavior. When writing out an object, 
Json4s omits fields whose values are `JNothing`. Here's an example illustrating 
this:
   
   ```
   import org.json4s._
   import org.json4s.jackson.JsonMethods._
   import org.json4s.JsonDSL._
   
   val record = ("Task ID" -> 1) ~ ("Task Metrics" -> JNothing)
   
   println(compact(render(record)))
   ```
   
   outputs
   
   ```
   {"Task ID":1}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to