dongjoon-hyun commented on code in PR #36885:
URL: https://github.com/apache/spark/pull/36885#discussion_r1103462394
##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -360,256 +466,343 @@ private[spark] object JsonProtocol {
*
* The behavior here must match that of [[accumValueFromJson]]. Exposed for
testing.
*/
- private[util] def accumValueToJson(name: Option[String], value: Any): JValue
= {
+ private[util] def accumValueToJson(
+ name: Option[String],
+ value: Any,
+ g: JsonGenerator,
+ fieldName: Option[String] = None): Unit = {
if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
value match {
- case v: Int => JInt(v)
- case v: Long => JInt(v)
+ case v: Int =>
+ fieldName.foreach(g.writeFieldName)
+ g.writeNumber(v)
+ case v: Long =>
+ fieldName.foreach(g.writeFieldName)
+ g.writeNumber(v)
// We only have 3 kind of internal accumulator types, so if it's not
int or long, it must be
// the blocks accumulator, whose type is `java.util.List[(BlockId,
BlockStatus)]`
case v: java.util.List[_] =>
- JArray(v.asScala.toList.flatMap {
+ fieldName.foreach(g.writeFieldName)
+ g.writeStartArray()
+ v.asScala.foreach {
case (id: BlockId, status: BlockStatus) =>
- Some(
- ("Block ID" -> id.toString) ~
- ("Status" -> blockStatusToJson(status))
- )
+ g.writeStartObject()
+ g.writeStringField("Block ID", id.toString)
+ g.writeFieldName("Status")
+ blockStatusToJson(status, g)
+ g.writeEndObject()
case _ =>
// Ignore unsupported types. A user may put `METRICS_PREFIX` in
the name. We should
// not crash.
- None
- })
+ }
+ g.writeEndArray()
case _ =>
// Ignore unsupported types. A user may put `METRICS_PREFIX` in the
name. We should not
// crash.
- JNothing
}
} else {
// For all external accumulators, just use strings
- JString(value.toString)
+ fieldName.foreach(g.writeFieldName)
+ g.writeString(value.toString)
}
}
- def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
- val shuffleReadMetrics: JValue =
- ("Remote Blocks Fetched" ->
taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~
- ("Local Blocks Fetched" ->
taskMetrics.shuffleReadMetrics.localBlocksFetched) ~
- ("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~
- ("Remote Bytes Read" ->
taskMetrics.shuffleReadMetrics.remoteBytesRead) ~
- ("Remote Bytes Read To Disk" ->
taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk) ~
- ("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~
- ("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead)
- val shuffleWriteMetrics: JValue =
- ("Shuffle Bytes Written" ->
taskMetrics.shuffleWriteMetrics.bytesWritten) ~
- ("Shuffle Write Time" -> taskMetrics.shuffleWriteMetrics.writeTime) ~
- ("Shuffle Records Written" ->
taskMetrics.shuffleWriteMetrics.recordsWritten)
- val inputMetrics: JValue =
- ("Bytes Read" -> taskMetrics.inputMetrics.bytesRead) ~
- ("Records Read" -> taskMetrics.inputMetrics.recordsRead)
- val outputMetrics: JValue =
- ("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~
- ("Records Written" -> taskMetrics.outputMetrics.recordsWritten)
- val updatedBlocks =
- JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) =>
- ("Block ID" -> id.toString) ~
- ("Status" -> blockStatusToJson(status))
- })
- ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
- ("Executor Deserialize CPU Time" ->
taskMetrics.executorDeserializeCpuTime) ~
- ("Executor Run Time" -> taskMetrics.executorRunTime) ~
- ("Executor CPU Time" -> taskMetrics.executorCpuTime) ~
- ("Peak Execution Memory" -> taskMetrics.peakExecutionMemory) ~
- ("Result Size" -> taskMetrics.resultSize) ~
- ("JVM GC Time" -> taskMetrics.jvmGCTime) ~
- ("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~
- ("Memory Bytes Spilled" -> taskMetrics.memoryBytesSpilled) ~
- ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~
- ("Shuffle Read Metrics" -> shuffleReadMetrics) ~
- ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
- ("Input Metrics" -> inputMetrics) ~
- ("Output Metrics" -> outputMetrics) ~
- ("Updated Blocks" -> updatedBlocks)
+ def taskMetricsToJson(taskMetrics: TaskMetrics, g: JsonGenerator): Unit = {
+ def writeShuffleReadMetrics(): Unit = {
+ g.writeStartObject()
+ g.writeNumberField(
+ "Remote Blocks Fetched",
taskMetrics.shuffleReadMetrics.remoteBlocksFetched)
+ g.writeNumberField("Local Blocks Fetched",
taskMetrics.shuffleReadMetrics.localBlocksFetched)
+ g.writeNumberField("Fetch Wait Time",
taskMetrics.shuffleReadMetrics.fetchWaitTime)
+ g.writeNumberField("Remote Bytes Read",
taskMetrics.shuffleReadMetrics.remoteBytesRead)
+ g.writeNumberField(
+ "Remote Bytes Read To Disk",
taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk)
+ g.writeNumberField("Local Bytes Read",
taskMetrics.shuffleReadMetrics.localBytesRead)
+ g.writeNumberField("Total Records Read",
taskMetrics.shuffleReadMetrics.recordsRead)
+ g.writeEndObject()
+ }
+ def writeShuffleWriteMetrics(): Unit = {
+ g.writeStartObject()
+ g.writeNumberField("Shuffle Bytes Written",
taskMetrics.shuffleWriteMetrics.bytesWritten)
+ g.writeNumberField("Shuffle Write Time",
taskMetrics.shuffleWriteMetrics.writeTime)
+ g.writeNumberField("Shuffle Records Written",
taskMetrics.shuffleWriteMetrics.recordsWritten)
+ g.writeEndObject()
+ }
+ def writeInputMetrics(): Unit = {
+ g.writeStartObject()
+ g.writeNumberField("Bytes Read", taskMetrics.inputMetrics.bytesRead)
+ g.writeNumberField("Records Read", taskMetrics.inputMetrics.recordsRead)
+ g.writeEndObject()
+ }
+ def writeOutputMetrics(): Unit = {
+ g.writeStartObject()
+ g.writeNumberField("Bytes Written",
taskMetrics.outputMetrics.bytesWritten)
+ g.writeNumberField("Records Written",
taskMetrics.outputMetrics.recordsWritten)
+ g.writeEndObject()
+ }
+ def writeUpdatedBlocks(): Unit = {
+ g.writeStartArray()
+ taskMetrics.updatedBlockStatuses.foreach { case (id, status) =>
+ g.writeStartObject()
+ g.writeStringField("Block ID", id.toString)
+ g.writeFieldName("Status")
+ blockStatusToJson(status, g)
+ g.writeEndObject()
+ }
+ g.writeEndArray()
+ }
+
+ g.writeStartObject()
+ g.writeNumberField("Executor Deserialize Time",
taskMetrics.executorDeserializeTime)
+ g.writeNumberField("Executor Deserialize CPU Time",
taskMetrics.executorDeserializeCpuTime)
+ g.writeNumberField("Executor Run Time", taskMetrics.executorRunTime)
+ g.writeNumberField("Executor CPU Time", taskMetrics.executorCpuTime)
+ g.writeNumberField("Peak Execution Memory",
taskMetrics.peakExecutionMemory)
+ g.writeNumberField("Result Size", taskMetrics.resultSize)
+ g.writeNumberField("JVM GC Time", taskMetrics.jvmGCTime)
+ g.writeNumberField("Result Serialization Time",
taskMetrics.resultSerializationTime)
+ g.writeNumberField("Memory Bytes Spilled", taskMetrics.memoryBytesSpilled)
+ g.writeNumberField("Disk Bytes Spilled", taskMetrics.diskBytesSpilled)
+ g.writeFieldName("Shuffle Read Metrics")
+ writeShuffleReadMetrics()
+ g.writeFieldName("Shuffle Write Metrics")
+ writeShuffleWriteMetrics()
+ g.writeFieldName("Input Metrics")
+ writeInputMetrics()
+ g.writeFieldName("Output Metrics")
+ writeOutputMetrics()
+ g.writeFieldName("Updated Blocks")
+ writeUpdatedBlocks()
+ g.writeEndObject()
}
/** Convert executor metrics to JSON. */
- def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = {
- val metrics = ExecutorMetricType.metricToOffset.map { case (m, _) =>
- JField(m, executorMetrics.getMetricValue(m))
+ def executorMetricsToJson(executorMetrics: ExecutorMetrics, g:
JsonGenerator): Unit = {
+ g.writeStartObject()
+ ExecutorMetricType.metricToOffset.foreach { case (m, _) =>
+ g.writeNumberField(m, executorMetrics.getMetricValue(m))
}
- JObject(metrics.toSeq: _*)
+ g.writeEndObject()
}
- def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
- val reason = Utils.getFormattedClassName(taskEndReason)
- val json: JObject = taskEndReason match {
+ def taskEndReasonToJson(taskEndReason: TaskEndReason, g: JsonGenerator):
Unit = {
+ g.writeStartObject()
+ g.writeStringField("Reason", Utils.getFormattedClassName(taskEndReason))
+ taskEndReason match {
case fetchFailed: FetchFailed =>
- val blockManagerAddress = Option(fetchFailed.bmAddress).
- map(blockManagerIdToJson).getOrElse(JNothing)
- ("Block Manager Address" -> blockManagerAddress) ~
- ("Shuffle ID" -> fetchFailed.shuffleId) ~
- ("Map ID" -> fetchFailed.mapId) ~
- ("Map Index" -> fetchFailed.mapIndex) ~
- ("Reduce ID" -> fetchFailed.reduceId) ~
- ("Message" -> fetchFailed.message)
+ Option(fetchFailed.bmAddress).foreach { id =>
+ g.writeFieldName("Block Manager Address")
+ blockManagerIdToJson(id, g)
+ }
+ g.writeNumberField("Shuffle ID", fetchFailed.shuffleId)
+ g.writeNumberField("Map ID", fetchFailed.mapId)
+ g.writeNumberField("Map Index", fetchFailed.mapIndex)
+ g.writeNumberField("Reduce ID", fetchFailed.reduceId)
+ g.writeStringField("Message", fetchFailed.message)
case exceptionFailure: ExceptionFailure =>
- val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
- val accumUpdates = accumulablesToJson(exceptionFailure.accumUpdates)
- ("Class Name" -> exceptionFailure.className) ~
- ("Description" -> exceptionFailure.description) ~
- ("Stack Trace" -> stackTrace) ~
- ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
- ("Accumulator Updates" -> accumUpdates)
+ g.writeStringField("Class Name", exceptionFailure.className)
+ g.writeStringField("Description", exceptionFailure.description)
+ g.writeFieldName("Stack Trace")
+ stackTraceToJson(exceptionFailure.stackTrace, g)
+ g.writeStringField("Full Stack Trace", exceptionFailure.fullStackTrace)
+ g.writeFieldName("Accumulator Updates")
+ accumulablesToJson(exceptionFailure.accumUpdates, g)
case taskCommitDenied: TaskCommitDenied =>
- ("Job ID" -> taskCommitDenied.jobID) ~
- ("Partition ID" -> taskCommitDenied.partitionID) ~
- ("Attempt Number" -> taskCommitDenied.attemptNumber)
+ g.writeNumberField("Job ID", taskCommitDenied.jobID)
+ g.writeNumberField("Partition ID", taskCommitDenied.partitionID)
+ g.writeNumberField("Attempt Number", taskCommitDenied.attemptNumber)
case ExecutorLostFailure(executorId, exitCausedByApp, reason) =>
- ("Executor ID" -> executorId) ~
- ("Exit Caused By App" -> exitCausedByApp) ~
- ("Loss Reason" -> reason)
+ g.writeStringField("Executor ID", executorId)
+ g.writeBooleanField("Exit Caused By App", exitCausedByApp)
+ reason.foreach(g.writeStringField("Loss Reason", _))
case taskKilled: TaskKilled =>
- val accumUpdates =
JArray(taskKilled.accumUpdates.map(accumulableInfoToJson).toList)
- ("Kill Reason" -> taskKilled.reason) ~
- ("Accumulator Updates" -> accumUpdates)
- case _ => emptyJson
+ g.writeStringField("Kill Reason", taskKilled.reason)
+ g.writeArrayFieldStart("Accumulator Updates")
+ taskKilled.accumUpdates.foreach { info =>
+ accumulableInfoToJson(info, g)
+ }
+ g.writeEndArray()
+ case _ =>
+ // no extra fields to write
}
- ("Reason" -> reason) ~ json
+ g.writeEndObject()
}
- def blockManagerIdToJson(blockManagerId: BlockManagerId): JValue = {
- ("Executor ID" -> blockManagerId.executorId) ~
- ("Host" -> blockManagerId.host) ~
- ("Port" -> blockManagerId.port)
+ def blockManagerIdToJson(blockManagerId: BlockManagerId, g: JsonGenerator):
Unit = {
+ g.writeStartObject()
+ g.writeStringField("Executor ID", blockManagerId.executorId)
+ g.writeStringField("Host", blockManagerId.host)
+ g.writeNumberField("Port", blockManagerId.port)
+ g.writeEndObject()
}
- def jobResultToJson(jobResult: JobResult): JValue = {
- val result = Utils.getFormattedClassName(jobResult)
- val json = jobResult match {
- case JobSucceeded => emptyJson
+ def jobResultToJson(jobResult: JobResult, g: JsonGenerator): Unit = {
+ g.writeStartObject()
+ g.writeStringField("Result", Utils.getFormattedClassName(jobResult))
+ jobResult match {
case jobFailed: JobFailed =>
- JObject("Exception" -> exceptionToJson(jobFailed.exception))
+ g.writeFieldName("Exception")
+ exceptionToJson(jobFailed.exception, g)
+ case JobSucceeded =>
+ // Nothing else to write in case of success
}
- ("Result" -> result) ~ json
- }
-
- def rddInfoToJson(rddInfo: RDDInfo): JValue = {
- val storageLevel = storageLevelToJson(rddInfo.storageLevel)
- val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList)
- ("RDD ID" -> rddInfo.id) ~
- ("Name" -> rddInfo.name) ~
- ("Scope" -> rddInfo.scope.map(_.toJson)) ~
- ("Callsite" -> rddInfo.callSite) ~
- ("Parent IDs" -> parentIds) ~
- ("Storage Level" -> storageLevel) ~
- ("Barrier" -> rddInfo.isBarrier) ~
- ("DeterministicLevel" -> rddInfo.outputDeterministicLevel.toString) ~
- ("Number of Partitions" -> rddInfo.numPartitions) ~
- ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
- ("Memory Size" -> rddInfo.memSize) ~
- ("Disk Size" -> rddInfo.diskSize)
- }
-
- def storageLevelToJson(storageLevel: StorageLevel): JValue = {
- ("Use Disk" -> storageLevel.useDisk) ~
- ("Use Memory" -> storageLevel.useMemory) ~
- ("Use Off Heap" -> storageLevel.useOffHeap) ~
- ("Deserialized" -> storageLevel.deserialized) ~
- ("Replication" -> storageLevel.replication)
- }
-
- def blockStatusToJson(blockStatus: BlockStatus): JValue = {
- val storageLevel = storageLevelToJson(blockStatus.storageLevel)
- ("Storage Level" -> storageLevel) ~
- ("Memory Size" -> blockStatus.memSize) ~
- ("Disk Size" -> blockStatus.diskSize)
- }
-
- def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
- ("Host" -> executorInfo.executorHost) ~
- ("Total Cores" -> executorInfo.totalCores) ~
- ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
- ("Attributes" -> mapToJson(executorInfo.attributes)) ~
- ("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo)) ~
- ("Resource Profile Id" -> executorInfo.resourceProfileId) ~
- ("Registration Time" -> executorInfo.registrationTime) ~
- ("Request Time" -> executorInfo.requestTime)
- }
-
- def resourcesMapToJson(m: Map[String, ResourceInformation]): JValue = {
- val jsonFields = m.map {
- case (k, v) => JField(k, v.toJson)
- }
- JObject(jsonFields.toList)
- }
-
- def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = {
- ("Block Manager ID" ->
blockManagerIdToJson(blockUpdatedInfo.blockManagerId)) ~
- ("Block ID" -> blockUpdatedInfo.blockId.toString) ~
- ("Storage Level" -> storageLevelToJson(blockUpdatedInfo.storageLevel)) ~
- ("Memory Size" -> blockUpdatedInfo.memSize) ~
- ("Disk Size" -> blockUpdatedInfo.diskSize)
+ g.writeEndObject()
}
- def executorResourceRequestToJson(execReq: ExecutorResourceRequest): JValue
= {
- ("Resource Name" -> execReq.resourceName) ~
- ("Amount" -> execReq.amount) ~
- ("Discovery Script" -> execReq.discoveryScript) ~
- ("Vendor" -> execReq.vendor)
- }
-
- def executorResourceRequestMapToJson(m: Map[String,
ExecutorResourceRequest]): JValue = {
- val jsonFields = m.map {
- case (k, execReq) =>
- JField(k, executorResourceRequestToJson(execReq))
+ def rddInfoToJson(rddInfo: RDDInfo, g: JsonGenerator): Unit = {
+ g.writeStartObject()
+ g.writeNumberField("RDD ID", rddInfo.id)
+ g.writeStringField("Name", rddInfo.name)
+ rddInfo.scope.foreach { s =>
+ g.writeStringField("Scope", s.toJson)
+ }
+ g.writeStringField("Callsite", rddInfo.callSite)
+ g.writeArrayFieldStart("Parent IDs")
+ rddInfo.parentIds.foreach(g.writeNumber)
+ g.writeEndArray()
+ g.writeFieldName("Storage Level")
+ storageLevelToJson(rddInfo.storageLevel, g)
+ g.writeBooleanField("Barrier", rddInfo.isBarrier)
+ g.writeStringField("DeterministicLevel",
rddInfo.outputDeterministicLevel.toString)
+ g.writeNumberField("Number of Partitions", rddInfo.numPartitions)
+ g.writeNumberField("Number of Cached Partitions",
rddInfo.numCachedPartitions)
+ g.writeNumberField("Memory Size", rddInfo.memSize)
+ g.writeNumberField("Disk Size", rddInfo.diskSize)
+ g.writeEndObject()
+ }
+
+ def storageLevelToJson(storageLevel: StorageLevel, g: JsonGenerator): Unit =
{
+ g.writeStartObject()
+ g.writeBooleanField("Use Disk", storageLevel.useDisk)
+ g.writeBooleanField("Use Memory", storageLevel.useMemory)
+ g.writeBooleanField("Use Off Heap", storageLevel.useOffHeap)
+ g.writeBooleanField("Deserialized", storageLevel.deserialized)
+ g.writeNumberField("Replication", storageLevel.replication)
+ g.writeEndObject()
+ }
+
+ def blockStatusToJson(blockStatus: BlockStatus, g: JsonGenerator): Unit = {
+ g.writeStartObject()
+ g.writeFieldName("Storage Level")
+ storageLevelToJson(blockStatus.storageLevel, g)
+ g.writeNumberField("Memory Size", blockStatus.memSize)
+ g.writeNumberField("Disk Size", blockStatus.diskSize)
+ g.writeEndObject()
+ }
+
+ def executorInfoToJson(executorInfo: ExecutorInfo, g: JsonGenerator): Unit =
{
+ g.writeStartObject()
+ g.writeStringField("Host", executorInfo.executorHost)
+ g.writeNumberField("Total Cores", executorInfo.totalCores)
+ writeMapField("Log Urls", executorInfo.logUrlMap, g)
+ writeMapField("Attributes", executorInfo.attributes, g)
+ g.writeObjectFieldStart("Resources")
+ // TODO(SPARK-39658): here we are taking a Json4s JValue and are
converting it to
+ // a JSON string then are combining that string with Jackson-generated
JSON. This is
+ // done because ResourceInformation.toJson is a public class and exposes
Json4s
+ // JValues as part of its public API. We should reconsider the design of
that interface
+ // and explore whether we can avoid exposing third-party symbols in this
public API.
+ executorInfo.resourcesInfo.foreach { case (k, v) =>
+ g.writeFieldName(k)
+ g.writeRawValue(compact(v.toJson()))
}
- JObject(jsonFields.toList)
+ g.writeEndObject()
+ g.writeNumberField("Resource Profile Id", executorInfo.resourceProfileId)
+ executorInfo.registrationTime.foreach(g.writeNumberField("Registration
Time", _))
+ executorInfo.requestTime.foreach(g.writeNumberField("Request Time", _))
+ g.writeEndObject()
+ }
+
+ def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo, g:
JsonGenerator): Unit = {
+ g.writeStartObject()
+ g.writeFieldName("Block Manager ID")
+ blockManagerIdToJson(blockUpdatedInfo.blockManagerId, g)
+ g.writeStringField("Block ID", blockUpdatedInfo.blockId.toString)
+ g.writeFieldName("Storage Level")
+ storageLevelToJson(blockUpdatedInfo.storageLevel, g)
+ g.writeNumberField("Memory Size", blockUpdatedInfo.memSize)
+ g.writeNumberField("Disk Size", blockUpdatedInfo.diskSize)
+ g.writeEndObject()
+ }
+
+ def executorResourceRequestToJson(execReq: ExecutorResourceRequest, g:
JsonGenerator): Unit = {
+ g.writeStartObject()
+ g.writeStringField("Resource Name", execReq.resourceName)
+ g.writeNumberField("Amount", execReq.amount)
+ g.writeStringField("Discovery Script", execReq.discoveryScript)
+ g.writeStringField("Vendor", execReq.vendor)
+ g.writeEndObject()
+ }
+
+ def executorResourceRequestMapToJson(
+ m: Map[String, ExecutorResourceRequest],
+ g: JsonGenerator): Unit = {
+ g.writeStartObject()
+ m.foreach { case (k, execReq) =>
+ g.writeFieldName(k)
+ executorResourceRequestToJson(execReq, g)
+ }
+ g.writeEndObject()
}
- def taskResourceRequestToJson(taskReq: TaskResourceRequest): JValue = {
- ("Resource Name" -> taskReq.resourceName) ~
- ("Amount" -> taskReq.amount)
+ def taskResourceRequestToJson(taskReq: TaskResourceRequest, g:
JsonGenerator): Unit = {
+ g.writeStartObject()
+ g.writeStringField("Resource Name", taskReq.resourceName)
+ g.writeNumberField("Amount", taskReq.amount)
+ g.writeEndObject()
}
- def taskResourceRequestMapToJson(m: Map[String, TaskResourceRequest]):
JValue = {
- val jsonFields = m.map {
- case (k, taskReq) =>
- JField(k, taskResourceRequestToJson(taskReq))
+ def taskResourceRequestMapToJson(m: Map[String, TaskResourceRequest], g:
JsonGenerator): Unit = {
+ g.writeStartObject()
+ m.foreach { case (k, taskReq) =>
+ g.writeFieldName(k)
+ taskResourceRequestToJson(taskReq, g)
}
- JObject(jsonFields.toList)
+ g.writeEndObject()
}
/** ------------------------------ *
* Util JSON serialization methods |
* ------------------------------- */
- def mapToJson(m: Map[String, String]): JValue = {
- val jsonFields = m.map { case (k, v) => JField(k, JString(v)) }
- JObject(jsonFields.toList)
+ def writeMapField(name: String, m: Map[String, String], g: JsonGenerator):
Unit = {
+ g.writeObjectFieldStart(name)
+ m.foreach { case (k, v) => g.writeStringField(k, v) }
+ g.writeEndObject()
}
- def propertiesToJson(properties: Properties): JValue = {
- Option(properties).map { p =>
- mapToJson(p.asScala)
- }.getOrElse(JNothing)
+ def propertiesToJson(properties: Properties, g: JsonGenerator): Unit = {
+ g.writeStartObject()
+ properties.asScala.foreach { case (k, v) => g.writeStringField(k, v) }
+ g.writeEndObject()
}
- def UUIDToJson(id: UUID): JValue = {
- ("Least Significant Bits" -> id.getLeastSignificantBits) ~
- ("Most Significant Bits" -> id.getMostSignificantBits)
+ def UUIDToJson(id: UUID, g: JsonGenerator): Unit = {
+ g.writeStartObject()
+ g.writeNumberField("Least Significant Bits", id.getLeastSignificantBits)
+ g.writeNumberField("Most Significant Bits", id.getMostSignificantBits)
+ g.writeEndObject()
}
- def stackTraceToJson(stackTrace: Array[StackTraceElement]): JValue = {
- JArray(stackTrace.map { case line =>
- ("Declaring Class" -> line.getClassName) ~
- ("Method Name" -> line.getMethodName) ~
- ("File Name" -> line.getFileName) ~
- ("Line Number" -> line.getLineNumber)
- }.toList)
+ def stackTraceToJson(stackTrace: Array[StackTraceElement], g:
JsonGenerator): Unit = {
+ g.writeStartArray()
+ stackTrace.foreach { line =>
+ g.writeStartObject()
+ g.writeStringField("Declaring Class", line.getClassName)
+ g.writeStringField("Method Name", line.getMethodName)
+ g.writeStringField("File Name", line.getFileName)
Review Comment:
Hi, @JoshRosen and all.
This seems to break Spark History Server.
```
{"Declaring
Class":"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1","Method
Name":"columnartorow_nextBatch_0$","File Name":null,"Line Number":-1}
```
```
3/02/10 16:54:46 ERROR ReplayListenerBus: Exception parsing Spark event log:
file:/Users/dongjoon/data/history/eventlog_v2_spark-1676069204164-1qq70hioosynfzib9rmi77wbavnao-driver-job/events_1_spark-1676069204164-1qq70hioosynfzib9rmi77wbavnao-driver-job.zstd
java.lang.IllegalArgumentException: requirement failed: Expected string, got
NULL
at scala.Predef$.require(Predef.scala:281)
at
org.apache.spark.util.JsonProtocol$JsonNodeImplicits.extractString(JsonProtocol.scala:1614)
at
org.apache.spark.util.JsonProtocol$.$anonfun$stackTraceFromJson$1(JsonProtocol.scala:1561)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at
scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at scala.collection.AbstractIterator.to(Iterator.scala:1431)
at
scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at
scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
at
scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at
scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
at
org.apache.spark.util.JsonProtocol$.stackTraceFromJson(JsonProtocol.scala:1564)
at
org.apache.spark.util.JsonProtocol$.taskEndReasonFromJson(JsonProtocol.scala:1361)
at
org.apache.spark.util.JsonProtocol$.taskEndFromJson(JsonProtocol.scala:938)
at
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:876)
at
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:865)
at
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:88)
at
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:59)
at
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$parseAppEventLogs$3(FsHistoryProvider.scala:1140)
at
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$parseAppEventLogs$3$adapted(FsHistoryProvider.scala:1138)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2777)
at
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$parseAppEventLogs$1(FsHistoryProvider.scala:1138)
at
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$parseAppEventLogs$1$adapted(FsHistoryProvider.scala:1136)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at
org.apache.spark.deploy.history.FsHistoryProvider.parseAppEventLogs(FsHistoryProvider.scala:1136)
at
org.apache.spark.deploy.history.FsHistoryProvider.rebuildAppStore(FsHistoryProvider.scala:1117)
```
Since this is a release blocker, cc @xinrong-meng , too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]