This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dee294b  [SPARK-38056][WEB UI] Fix issue of Structured streaming not 
working in history server when using LevelDB
dee294b is described below

commit dee294b453b550471028fdbd9e17952963504a3a
Author: kuwii <kuwii.some...@gmail.com>
AuthorDate: Wed Feb 9 16:59:38 2022 +0900

    [SPARK-38056][WEB UI] Fix issue of Structured streaming not working in 
history server when using LevelDB
    
    ### What changes were proposed in this pull request?
    
    Change type of `org.apache.spark.sql.streaming.ui.StreamingQueryData.runId` 
from `UUID` to `String`.
    
    ### Why are the changes needed?
    
    In 
[SPARK-31953](https://github.com/apache/spark/commit/4f9667035886a67e6c9a4e8fad2efa390e87ca68),
 structured streaming support is added in history server. However this does not 
work when history server is using LevelDB instead of in-memory KV store.
    
    - Level DB does not support `UUID` as key.
    - If `spark.history.store.path` is set in history server to use Level DB, 
when writing info to the store during replaying events, error will occur.
    - `StreamingQueryStatusListener` will throw exceptions when writing info, 
saying `java.lang.IllegalArgumentException: Type java.util.UUID not allowed as 
key.`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added tests in `StreamingQueryStatusListenerSuite` to test whether 
`StreamingQueryData` can be successfully written to in-memory store,  LevelDB 
and RocksDB.
    
    Closes #35356 from kuwii/hs-streaming-fix.
    
    Authored-by: kuwii <kuwii.some...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../ui/StreamingQueryStatisticsPage.scala          |  4 +-
 .../ui/StreamingQueryStatusListener.scala          |  6 +-
 .../sql/streaming/ui/StreamingQueryPageSuite.scala |  2 +-
 .../ui/StreamingQueryStatusListenerSuite.scala     | 64 +++++++++++++++++++---
 4 files changed, 62 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
index 97691d9..e13ac4e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.ui
 
 import java.{util => ju}
 import java.lang.{Long => JLong}
-import java.util.{Locale, UUID}
+import java.util.Locale
 import javax.servlet.http.HttpServletRequest
 
 import scala.collection.JavaConverters._
@@ -59,7 +59,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: 
StreamingQueryTab)
     require(parameterId != null && parameterId.nonEmpty, "Missing id 
parameter")
 
     val query = parent.store.allQueryUIData.find { uiData =>
-      uiData.summary.runId.equals(UUID.fromString(parameterId))
+      uiData.summary.runId.equals(parameterId)
     }.getOrElse(throw new IllegalArgumentException(s"Failed to find streaming 
query $parameterId"))
 
     val resources = generateLoadResources(request)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index fdd3754..b59ec04 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -75,7 +75,7 @@ private[sql] class StreamingQueryStatusListener(
     store.write(new StreamingQueryData(
       event.name,
       event.id,
-      event.runId,
+      event.runId.toString,
       isActive = true,
       None,
       startTimestamp
@@ -100,7 +100,7 @@ private[sql] class StreamingQueryStatusListener(
 
   override def onQueryTerminated(
       event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
-    val querySummary = store.read(classOf[StreamingQueryData], event.runId)
+    val querySummary = store.read(classOf[StreamingQueryData], 
event.runId.toString)
     val curTime = System.currentTimeMillis()
     store.write(new StreamingQueryData(
       querySummary.name,
@@ -118,7 +118,7 @@ private[sql] class StreamingQueryStatusListener(
 private[sql] class StreamingQueryData(
     val name: String,
     val id: UUID,
-    @KVIndexParam val runId: UUID,
+    @KVIndexParam val runId: String,
     @KVIndexParam("active") val isActive: Boolean,
     val exception: Option[String],
     @KVIndexParam("startTimestamp") val startTimestamp: Long,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
index 246fa1f..78ade6a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
@@ -103,7 +103,7 @@ class StreamingQueryPageSuite extends SharedSparkSession 
with BeforeAndAfter {
     when(summary.isActive).thenReturn(true)
     when(summary.name).thenReturn("query")
     when(summary.id).thenReturn(id)
-    when(summary.runId).thenReturn(id)
+    when(summary.runId).thenReturn(id.toString)
     when(summary.startTimestamp).thenReturn(1L)
     when(summary.exception).thenReturn(None)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
index 91c55d5..eee1a7c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -28,8 +28,9 @@ import 
org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, 
StreamingQueryProgress, StreamTest}
 import org.apache.spark.sql.streaming
-import org.apache.spark.status.ElementTrackingStore
-import org.apache.spark.util.kvstore.InMemoryStore
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore.{InMemoryStore, KVStore, RocksDB}
 
 class StreamingQueryStatusListenerSuite extends StreamTest {
 
@@ -48,7 +49,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
     // result checking
     assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1)
     assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData 
=>
-      uiData.summary.runId == runId && uiData.summary.name.equals("test")))
+      uiData.summary.runId == runId.toString && 
uiData.summary.name.equals("test")))
 
     // handle query progress event
     val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
@@ -64,7 +65,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
 
     // result checking
     val activeQuery =
-      
queryStore.allQueryUIData.filter(_.summary.isActive).find(_.summary.runId == 
runId)
+      
queryStore.allQueryUIData.filter(_.summary.isActive).find(_.summary.runId == 
runId.toString)
     assert(activeQuery.isDefined)
     assert(activeQuery.get.summary.isActive)
     assert(activeQuery.get.recentProgress.length == 1)
@@ -81,7 +82,8 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
     listener.onQueryTerminated(terminateEvent)
 
     
assert(!queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.isActive)
-    
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId
 == runId)
+    assert(
+      
queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == 
runId.toString)
     
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id 
== id)
   }
 
@@ -110,10 +112,12 @@ class StreamingQueryStatusListenerSuite extends 
StreamTest {
     // result checking
     assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1)
     assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).length == 1)
-    
assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(_.summary.runId
 == runId1))
+    assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(
+      _.summary.runId == runId1.toString))
     assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData 
=>
-      uiData.summary.runId == runId1 && uiData.summary.id == id))
-    
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId
 == runId0)
+      uiData.summary.runId == runId1.toString && uiData.summary.id == id))
+    assert(
+      
queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == 
runId0.toString)
     
assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id 
== id)
   }
 
@@ -210,4 +214,48 @@ class StreamingQueryStatusListenerSuite extends StreamTest 
{
     addQueryProgress()
     checkQueryProcessData(5)
   }
+
+  test("SPARK-38056: test writing StreamingQueryData to an in-memory store") {
+    testStreamingQueryData(new InMemoryStore())
+  }
+
+  test("SPARK-38056: test writing StreamingQueryData to a LevelDB store") {
+    assume(!Utils.isMacOnAppleSilicon)
+    val testDir = Utils.createTempDir()
+    val kvStore = KVUtils.open(testDir, getClass.getName)
+    try {
+      testStreamingQueryData(kvStore)
+    } finally {
+      kvStore.close()
+      Utils.deleteRecursively(testDir)
+    }
+  }
+
+  test("SPARK-38056: test writing StreamingQueryData to a RocksDB store") {
+    assume(!Utils.isMacOnAppleSilicon)
+    val testDir = Utils.createTempDir()
+    val kvStore = new RocksDB(testDir)
+    try {
+      testStreamingQueryData(kvStore)
+    } finally {
+      kvStore.close()
+      Utils.deleteRecursively(testDir)
+    }
+  }
+
+  private def testStreamingQueryData(kvStore: KVStore): Unit = {
+    val id = UUID.randomUUID()
+    val testData = new StreamingQueryData(
+      "some-query",
+      id,
+      id.toString,
+      isActive = false,
+      None,
+      1L,
+      None
+    )
+    val store = new ElementTrackingStore(kvStore, sparkConf)
+    store.write(testData)
+    store.close(closeParent = false)
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to