This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f1acedad4e [SPARK-42100][SQL][UI] Protect NPE in 
`SQLExecutionUIDataSerializer#serialize`
0f1acedad4e is described below

commit 0f1acedad4e73bca1b60fd4feb244a5d910a6682
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Tue Jan 17 23:33:45 2023 -0800

    [SPARK-42100][SQL][UI] Protect NPE in 
`SQLExecutionUIDataSerializer#serialize`
    
    ### What changes were proposed in this pull request?
    This pr aims to protect NPE  in `SQLExecutionUIDataSerializer#serialize`  
because `description`, `description`, `physicalPlanDescription` and 
`modifiedConfigs` init as null in `LiveExecutionData`.
    
    ### Why are the changes needed?
    The submitted `SparkListenerJobStart` event may use an unregistered 
`executionId`, such as the `Listing leaf files and directories for ${number} 
paths`  job before executing sql job. In this scenario, 
`SQLAppStatusListener#onJobStart` will register a new 
`LiveExecutionData(executionId)` object with the default value(such as null 
description, null details and so on) and write it into kvstore. NPE will be 
thrown when data is serialized with Protobuf serializer.
    
    For example, run the following commands:
    
    ```
    export LIVE_UI_LOCAL_STORE_DIR = /tmp/spark-ui
    mvn clean install -pl sql/core -Dtest=none 
-DwildcardSuites=org.apache.spark.sql.DynamicPartitionPruningV1SuiteAEOff -am
    ```
    
    the test will success, but some error messages will be printed:
    
    ```
    14:46:44.514 ERROR org.apache.spark.scheduler.AsyncEventQueue: Listener 
SQLAppStatusListener threw an exception
    java.lang.NullPointerException
            at 
org.apache.spark.status.protobuf.StoreTypes$SQLExecutionUIData$Builder.setDescription(StoreTypes.java:46500)
            at 
org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer.serialize(SQLExecutionUIDataSerializer.scala:34)
            at 
org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer.serialize(SQLExecutionUIDataSerializer.scala:28)
            at 
org.apache.spark.status.protobuf.KVStoreProtobufSerializer.serialize(KVStoreProtobufSerializer.scala:30)
            at org.apache.spark.util.kvstore.RocksDB.write(RocksDB.java:188)
            at 
org.apache.spark.status.ElementTrackingStore.write(ElementTrackingStore.scala:123)
            at 
org.apache.spark.status.ElementTrackingStore.write(ElementTrackingStore.scala:127)
            at org.apache.spark.status.LiveEntity.write(LiveEntity.scala:50)
            at 
org.apache.spark.sql.execution.ui.SQLAppStatusListener.update(SQLAppStatusListener.scala:456)
            at 
org.apache.spark.sql.execution.ui.SQLAppStatusListener.onJobStart(SQLAppStatusListener.scala:124)
            at 
org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
            at 
org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
            at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
            at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
            at 
org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
            at 
org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
            at 
org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
            at 
org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
            at 
scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
            at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
            at 
org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
            at 
org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
            at 
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1444)
            at 
org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
    ```
    
    So we need to protect the above NPE scenarios
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    Closes #39623 from LuciferYang/SPARK-42100.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../protobuf/sql/SQLExecutionUIDataSerializer.scala       | 12 +++++++-----
 .../sql/execution/ui/SQLAppStatusListenerSuite.scala      | 15 +++++++++++++++
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
index 80a36e1b02b..dc76ab9a4e9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -31,11 +31,13 @@ class SQLExecutionUIDataSerializer extends 
ProtobufSerDe[SQLExecutionUIData] {
     val builder = StoreTypes.SQLExecutionUIData.newBuilder()
     builder.setExecutionId(ui.executionId)
     builder.setRootExecutionId(ui.rootExecutionId)
-    builder.setDescription(ui.description)
-    builder.setDetails(ui.details)
-    builder.setPhysicalPlanDescription(ui.physicalPlanDescription)
-    ui.modifiedConfigs.foreach {
-      case (k, v) => builder.putModifiedConfigs(k, v)
+    Option(ui.description).foreach(builder.setDescription)
+    Option(ui.details).foreach(builder.setDetails)
+    
Option(ui.physicalPlanDescription).foreach(builder.setPhysicalPlanDescription)
+    if (ui.modifiedConfigs != null) {
+      ui.modifiedConfigs.foreach {
+        case (k, v) => builder.putModifiedConfigs(k, v)
+      }
     }
     ui.metrics.foreach(m => 
builder.addMetrics(SQLPlanMetricSerializer.serialize(m)))
     builder.setSubmissionTime(ui.submissionTime)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 26bb35cf0e5..81c745029fd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -1010,6 +1010,21 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     spark.sparkContext.listenerBus.waitUntilEmpty(10000)
     assert(received)
   }
+
+  test("SPARK-42100: onJobStart handle event with unregistered executionId 
shouldn't throw NPE") {
+    val statusStore = createStatusStore()
+    val listener = statusStore.listener.get
+
+    val executionId = 5
+    // Using protobuf serialization will throw npe before SPARK-42100
+    listener.onJobStart(SparkListenerJobStart(
+      jobId = 0,
+      time = System.currentTimeMillis(),
+      stageInfos = Nil,
+      createProperties(executionId)))
+
+    assertJobs(statusStore.execution(executionId), running = Seq(0))
+  }
 }
 
 class SQLAppStatusListenerWithInMemoryStoreSuite extends 
SQLAppStatusListenerSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to