JoshRosen opened a new pull request, #36885:
URL: https://github.com/apache/spark/pull/36885

   ### What changes were proposed in this pull request?
   
   This PR improves the performance of `org.apache.spark.util.JsonProtocol` by 
replacing all uses of Json4s with uses of Jackson `JsonGenerator`. In my 
benchmarking so far, I have seen ~2x speedups for reading events and ~3x 
speedups for writing them.
   
   JsonProtocol is used by `EventLoggingListener` to emit JSON event logs 
capturing the firehose of SparkListener events. The history server uses 
`ReplayListenerBus` to parse the event logs and replay the event stream in 
order to rebuild and display a terminated Spark application's web UI. 
   
   Today, JsonProtocol uses the Json4s library's ASTs when writing and reading 
JSON. This existing approach was chosen because we require fine-grained control 
over how the JSON is generated (so the listener events themselves can't be 
directly passed to an ObjectMapper). JsonProtocol needs to be backwards- and 
forwards-compatible so that a single history server application can display UIs 
from multiple versions of Spark: as a result, the code has special logic for 
handling fields which might be missing in event logs produced from older Spark 
versions.
   
   Performance profiling revealed that JsonProtocol spends a significant 
proportion of its time in Json4s AST calls. Much of this overhead comes from 
temporary Scala collections which are allocated when constructing the ASTs or 
when extracting fields from them.
   
   This PR aims to improve performance by replacing all direct uses of Json4s 
with calls to Jackson's `JsonGenerator`, a low-level imperative API for 
generating JSON. 
   
   As a simple example, let's consider the `SparkListenerApplicationStart` 
event: this produces a JSON record like:
   
   ```json
   {"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App 
ID":"app-20220615090229-0000","Timestamp":1655308948551,"User":"joshrosen"}
   ```
   
   With Json4s, this is expressed as:
   
   ```scala
   def applicationStartToJson(applicationStart: SparkListenerApplicationStart): 
JValue = {
       ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationStart) 
~
       ("App Name" -> applicationStart.appName) ~
       ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) 
~
       ("Timestamp" -> applicationStart.time) ~
       ("User" -> applicationStart.sparkUser) ~
       ("App Attempt ID" -> 
applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~
       ("Driver Logs" -> 
applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing)) ~
       ("Driver Attributes" -> 
applicationStart.driverAttributes.map(mapToJson).getOrElse(JNothing))
     }
   ```
   
   With Jackson JsonGenerator, this is expressed as:
   
   ```scala
   def applicationStartToJson(
         applicationStart: SparkListenerApplicationStart,
         g: JsonGenerator): Unit = {
       g.writeStartObject()
       g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationStart)
       g.writeStringField("App Name", applicationStart.appName)
       applicationStart.appId.foreach(g.writeStringField("App ID", _))
       g.writeNumberField("Timestamp", applicationStart.time)
       g.writeStringField("User", applicationStart.sparkUser)
       applicationStart.appAttemptId.foreach(g.writeStringField("App Attempt 
ID", _))
       applicationStart.driverLogs.foreach(writeMapField("Driver Logs", _, g))
       applicationStart.driverAttributes.foreach(writeMapField("Driver 
Attributes", _, g))
       g.writeEndObject()
     }
   ```
   
   The JsonGenerator code is more verbose but is significantly faster. This 
example is a relatively straightforward conversion, but there's a bit of 
trickiness in more complex cases (such as serialization of task metrics or 
resource profiles). I will leave comments in the GitHub PR review to highlight 
some of the less straightforward cases. 
   
   ### Why are the changes needed?
   
   Improving JsonProtocol performance has several benefits:
   
   - Decreased history server load times, especially for long-running Spark 
applications.
   - Improved EventLoggingListener throughput: this reduces the likelihood that 
the will fall behind and drop events (which can lead to confusing and 
inconsistent Spark UIs).
   - Reduced resource consumption on the driver, which may improve the 
throughput of task scheduling.
   
   In addition, this is a stepping-stone towards eventually being able to 
remove our Json4s dependency:
   
   - Today Spark uses Json4s 3.x and this causes library conflicts for end 
users who want to upgrade to 4.x; see 
https://github.com/apache/spark/pull/33630 for one example.
   - To _completely_ remove Json4s we'll need to update several other parts of 
Spark (including code used for ML model serialization); this PR is just a first 
step towards that goal if we decide to pursue it.
   - In this PR, I continue to use Json4s in test code; I think it's fine to 
keep Json4s as a test-only dependency.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   
   ### Correctness
   
   This PR passes all tests in `JsonProtocolSuite`, `SQLJsonProtocolSuite`, and 
`HistoryServerSuite`. I think that these suites will provide sufficient test 
coverage of our ability to parse old event logs. I believe it also contains 
output-comparison tests to check the generated JSON (but I still need to 
confirm this).
   
   ### Performance
   
   So far, I have only measured performance via laptop microbenchmarks.
   
   To measure event parsing performance, I generated a large event log by 
running `(1 to 5).foreach(_ => spark.range(1, 100 * 1000, 1, 100 * 
1000).foreach(_ => ()))` then used the history server's logs to measure the 
time taken to load the UI. This PR's improvements cut the loading time from ~34 
seconds to ~16 seconds.
   
   To measure event logging overheads, I used YourKit asynchronous CPU 
profiling to to measure `spark.range(1, 100 * 1000, 1, 100 * 1000).foreach(_ => 
())`. For this benchmark, this PR's changes cut total time in `logEvent` 
(including callees) from ~9.5 seconds to ~2.9 seconds.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to