This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f156db2e3852 [SPARK-48048][CONNECT][SS] Added client side listener support for Scala f156db2e3852 is described below commit f156db2e385234f8ebc933898133ff174a7d38aa Author: bogao007 <bo....@databricks.com> AuthorDate: Fri May 3 10:07:49 2024 +0900 [SPARK-48048][CONNECT][SS] Added client side listener support for Scala ### What changes were proposed in this pull request? Added client side Streaming Listener support for Scala ### Why are the changes needed? Support Streaming Listener on client side for Spark Connect which has better user experience (no breaking change compared to legacy mode) compared to previous server side listener. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46287 from bogao007/client-listener. Authored-by: bogao007 <bo....@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/streaming/DataStreamWriter.scala | 6 + .../sql/streaming/StreamingQueryListener.scala | 65 ++++++++- .../sql/streaming/StreamingQueryListenerBus.scala | 152 +++++++++++++++++++++ .../sql/streaming/StreamingQueryManager.scala | 34 ++--- .../CheckConnectJvmClientCompatibility.scala | 4 + .../sql/streaming/ClientStreamingQuerySuite.scala | 151 +++++++++++++------- .../sql/streaming/StreamingQueryListener.scala | 62 +++++++++ 7 files changed, 396 insertions(+), 78 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index f05d29c6f1ab..fe68f3cb0b57 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.streaming.AvailableNowTrigger import org.apache.spark.sql.execution.streaming.ContinuousTrigger import org.apache.spark.sql.execution.streaming.OneTimeTrigger import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger +import org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent import org.apache.spark.sql.types.NullType import org.apache.spark.util.SparkSerDeUtils @@ -297,6 +298,11 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging { .build() val resp = ds.sparkSession.execute(startCmd).head + if (resp.getWriteStreamOperationStartResult.hasQueryStartedEventJson) { + val event = QueryStartedEvent.fromJson( + resp.getWriteStreamOperationStartResult.getQueryStartedEventJson) + ds.sparkSession.streams.streamingQueryListenerBus.postToAll(event) + } RemoteStreamingQuery.fromStartCommandResponse(ds.sparkSession, resp) } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 404bd1b078ba..fcb4bdcb327b 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.streaming import java.util.UUID -import org.json4s.{JObject, JString} -import org.json4s.JsonAST.JValue +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} +import org.json4s.{JObject, JString, JValue} import org.json4s.JsonDSL.{jobject2assoc, pair2Assoc} import org.json4s.jackson.JsonMethods.{compact, render} @@ -120,6 +121,21 @@ object StreamingQueryListener extends Serializable { } } + private[spark] object QueryStartedEvent { + private val mapper = { + val ret = new ObjectMapper() with ClassTagExtensions + ret.registerModule(DefaultScalaModule) + ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + ret + } + + private[spark] def jsonString(event: QueryStartedEvent): String = + mapper.writeValueAsString(event) + + private[spark] def fromJson(json: String): QueryStartedEvent = + mapper.readValue[QueryStartedEvent](json) + } + /** * Event representing any progress updates in a query. * @param progress @@ -136,6 +152,21 @@ object StreamingQueryListener extends Serializable { private def jsonValue: JValue = JObject("progress" -> progress.jsonValue) } + private[spark] object QueryProgressEvent { + private val mapper = { + val ret = new ObjectMapper() with ClassTagExtensions + ret.registerModule(DefaultScalaModule) + ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + ret + } + + private[spark] def jsonString(event: QueryProgressEvent): String = + mapper.writeValueAsString(event) + + private[spark] def fromJson(json: String): QueryProgressEvent = + mapper.readValue[QueryProgressEvent](json) + } + /** * Event representing that query is idle and waiting for new data to process. * @@ -161,6 +192,21 @@ object StreamingQueryListener extends Serializable { } } + private[spark] object QueryIdleEvent { + private val mapper = { + val ret = new ObjectMapper() with ClassTagExtensions + ret.registerModule(DefaultScalaModule) + ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + ret + } + + private[spark] def jsonString(event: QueryTerminatedEvent): String = + mapper.writeValueAsString(event) + + private[spark] def fromJson(json: String): QueryTerminatedEvent = + mapper.readValue[QueryTerminatedEvent](json) + } + /** * Event representing that termination of a query. * @@ -199,4 +245,19 @@ object StreamingQueryListener extends Serializable { ("errorClassOnException" -> JString(errorClassOnException.orNull)) } } + + private[spark] object QueryTerminatedEvent { + private val mapper = { + val ret = new ObjectMapper() with ClassTagExtensions + ret.registerModule(DefaultScalaModule) + ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + ret + } + + private[spark] def jsonString(event: QueryTerminatedEvent): String = + mapper.writeValueAsString(event) + + private[spark] def fromJson(json: String): QueryTerminatedEvent = + mapper.readValue[QueryTerminatedEvent](json) + } } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala new file mode 100644 index 000000000000..b62a89b8417e --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.concurrent.CopyOnWriteArrayList + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.connect.proto.{Command, ExecutePlanResponse, Plan, StreamingQueryEventType} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connect.client.CloseableIterator +import org.apache.spark.sql.streaming.StreamingQueryListener.{Event, QueryIdleEvent, QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent} + +class StreamingQueryListenerBus(sparkSession: SparkSession) extends Logging { + private val listeners = new CopyOnWriteArrayList[StreamingQueryListener]() + private var executionThread: Option[Thread] = Option.empty + + val lock = new Object() + + def close(): Unit = { + listeners.forEach(remove(_)) + } + + def append(listener: StreamingQueryListener): Unit = lock.synchronized { + listeners.add(listener) + + if (listeners.size() == 1) { + var iter: Option[CloseableIterator[ExecutePlanResponse]] = Option.empty + try { + iter = Some(registerServerSideListener()) + } catch { + case e: Exception => + logWarning("Failed to add the listener, please add it again.", e) + listeners.remove(listener) + return + } + executionThread = Some(new Thread(new Runnable { + def run(): Unit = { + queryEventHandler(iter.get) + } + })) + // Start the thread + executionThread.get.start() + } + } + + def remove(listener: StreamingQueryListener): Unit = lock.synchronized { + if (listeners.size() == 1) { + val cmdBuilder = Command.newBuilder() + cmdBuilder.getStreamingQueryListenerBusCommandBuilder + .setRemoveListenerBusListener(true) + try { + sparkSession.execute(cmdBuilder.build()) + } catch { + case e: Exception => + logWarning("Failed to remove the listener, please remove it again.", e) + return + } + if (executionThread.isDefined) { + executionThread.get.interrupt() + executionThread = Option.empty + } + } + listeners.remove(listener) + } + + def list(): Array[StreamingQueryListener] = lock.synchronized { + listeners.asScala.toArray + } + + def registerServerSideListener(): CloseableIterator[ExecutePlanResponse] = { + val cmdBuilder = Command.newBuilder() + cmdBuilder.getStreamingQueryListenerBusCommandBuilder + .setAddListenerBusListener(true) + + val plan = Plan.newBuilder().setCommand(cmdBuilder.build()).build() + val iterator = sparkSession.client.execute(plan) + while (iterator.hasNext) { + val response = iterator.next() + if (response.getStreamingQueryListenerEventsResult.hasListenerBusListenerAdded && + response.getStreamingQueryListenerEventsResult.getListenerBusListenerAdded) { + return iterator + } + } + iterator + } + + def queryEventHandler(iter: CloseableIterator[ExecutePlanResponse]): Unit = { + try { + while (iter.hasNext) { + val response = iter.next() + val listenerEvents = response.getStreamingQueryListenerEventsResult.getEventsList + listenerEvents.forEach(event => { + event.getEventType match { + case StreamingQueryEventType.QUERY_PROGRESS_EVENT => + postToAll(QueryProgressEvent.fromJson(event.getEventJson)) + case StreamingQueryEventType.QUERY_IDLE_EVENT => + postToAll(QueryIdleEvent.fromJson(event.getEventJson)) + case StreamingQueryEventType.QUERY_TERMINATED_EVENT => + postToAll(QueryTerminatedEvent.fromJson(event.getEventJson)) + case _ => + logWarning(s"Unknown StreamingQueryListener event: $event") + } + }) + } + } catch { + case e: Exception => + logWarning("StreamingQueryListenerBus Handler thread received exception, all client" + + " side listeners are removed and handler thread is terminated.", e) + lock.synchronized { + executionThread = Option.empty + listeners.forEach(remove(_)) + } + } + } + + def postToAll(event: Event): Unit = lock.synchronized { + listeners.forEach(listener => + try { + event match { + case t: QueryStartedEvent => + listener.onQueryStarted(t) + case t: QueryProgressEvent => + listener.onQueryProgress(t) + case t: QueryIdleEvent => + listener.onQueryIdle(t) + case t: QueryTerminatedEvent => + listener.onQueryTerminated(t) + case _ => logWarning(s"Unknown StreamingQueryListener event: $event") + } + } catch { + case e: Exception => + logWarning(s"Listener $listener threw an exception", e) + }) + } +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index fd33efd72193..7efced227d6d 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -22,16 +22,13 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import scala.jdk.CollectionConverters._ -import com.google.protobuf.ByteString - import org.apache.spark.annotation.Evolving import org.apache.spark.connect.proto.Command import org.apache.spark.connect.proto.StreamingQueryManagerCommand import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connect.common.{InvalidPlanInput, StreamingListenerPacket} -import org.apache.spark.util.SparkSerDeUtils +import org.apache.spark.sql.connect.common.InvalidPlanInput /** * A class to manage all the [[StreamingQuery]] active in a `SparkSession`. @@ -50,6 +47,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo private lazy val listenerCache: ConcurrentMap[String, StreamingQueryListener] = new ConcurrentHashMap() + private[spark] val streamingQueryListenerBus = new StreamingQueryListenerBus(sparkSession) + + private[spark] def close(): Unit = { + streamingQueryListenerBus.close() + } + /** * Returns a list of active queries associated with this SQLContext * @@ -153,17 +156,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo * @since 3.5.0 */ def addListener(listener: StreamingQueryListener): Unit = { - // TODO: [SPARK-44400] Improve the Listener to provide users a way to access the Spark session - // and perform arbitrary actions inside the Listener. Right now users can use - // `val spark = SparkSession.builder.getOrCreate()` to create a Spark session inside the - // Listener, but this is a legacy session instead of a connect remote session. - val id = UUID.randomUUID.toString - cacheListenerById(id, listener) - executeManagerCmd( - _.getAddListenerBuilder - .setListenerPayload(ByteString.copyFrom(SparkSerDeUtils - .serialize(StreamingListenerPacket(id, listener)))) - .setId(id)) + streamingQueryListenerBus.append(listener) } /** @@ -172,11 +165,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo * @since 3.5.0 */ def removeListener(listener: StreamingQueryListener): Unit = { - val id = getIdByListener(listener) - executeManagerCmd( - _.getRemoveListenerBuilder - .setId(id)) - removeCachedListener(id) + streamingQueryListenerBus.remove(listener) } /** @@ -185,10 +174,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo * @since 3.5.0 */ def listListeners(): Array[StreamingQueryListener] = { - executeManagerCmd(_.setListListeners(true)).getListListeners.getListenerIdsList.asScala - .filter(listenerCache.containsKey(_)) - .map(listenerCache.get(_)) - .toArray + streamingQueryListenerBus.list() } private def executeManagerCmd( diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index f73290c5ce29..2fa631d1674c 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -440,6 +440,10 @@ object CheckConnectJvmClientCompatibility { "org.apache.spark.sql.streaming.RemoteStreamingQuery"), ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.sql.streaming.RemoteStreamingQuery$"), + // Skip client side listener specific class + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.StreamingQueryListenerBus" + ), // Encoders are in the wrong JAR ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Encoders"), diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala index 38712a0f1f63..5ea06cb9634a 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala @@ -508,6 +508,33 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { assert(spark.streams.listListeners().length == 0) } + test("listener events") { + val listener = new MyListener() + spark.streams.addListener(listener) + + val q = spark.readStream + .format("rate") + .load() + .writeStream + .format("console") + .start() + + try { + q.processAllAvailable() + eventually(timeout(30.seconds)) { + assert(q.isActive) + assert(listener.start.length == 1) + assert(listener.progress.nonEmpty) + } + } finally { + q.stop() + eventually(timeout(30.seconds)) { + assert(!q.isActive) + assert(listener.terminate.nonEmpty) + } + } + } + test("foreachBatch") { // Starts a streaming query with a foreachBatch function, which writes batchId and row count // to a temp view. The test verifies that the view is populated with data. @@ -543,6 +570,78 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { q.stop() } } + + abstract class EventCollector extends StreamingQueryListener { + protected def tablePostfix: String + + protected def handleOnQueryStarted(event: QueryStartedEvent): Unit = { + val df = spark.createDataFrame(Seq((event.json, 0))) + df.write.mode("append").saveAsTable(s"listener_start_events$tablePostfix") + } + + protected def handleOnQueryProgress(event: QueryProgressEvent): Unit = { + val df = spark.createDataFrame(Seq((event.json, 0))) + df.write.mode("append").saveAsTable(s"listener_progress_events$tablePostfix") + } + + protected def handleOnQueryTerminated(event: QueryTerminatedEvent): Unit = { + val df = spark.createDataFrame(Seq((event.json, 0))) + df.write.mode("append").saveAsTable(s"listener_terminated_events$tablePostfix") + } + } + + /** + * V1: Initial interface of StreamingQueryListener containing methods `onQueryStarted`, + * `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5. + */ + class EventCollectorV1 extends EventCollector { + override protected def tablePostfix: String = "_v1" + + override def onQueryStarted(event: QueryStartedEvent): Unit = handleOnQueryStarted(event) + + override def onQueryProgress(event: QueryProgressEvent): Unit = handleOnQueryProgress(event) + + override def onQueryTerminated(event: QueryTerminatedEvent): Unit = + handleOnQueryTerminated(event) + } + + /** + * V2: The interface after the method `onQueryIdle` is added. It is Spark 3.5+. + */ + class EventCollectorV2 extends EventCollector { + override protected def tablePostfix: String = "_v2" + + override def onQueryStarted(event: QueryStartedEvent): Unit = handleOnQueryStarted(event) + + override def onQueryProgress(event: QueryProgressEvent): Unit = handleOnQueryProgress(event) + + override def onQueryIdle(event: QueryIdleEvent): Unit = {} + + override def onQueryTerminated(event: QueryTerminatedEvent): Unit = + handleOnQueryTerminated(event) + } + + class MyListener extends StreamingQueryListener { + var start: Seq[String] = Seq.empty + var progress: Seq[String] = Seq.empty + var terminate: Seq[String] = Seq.empty + + override def onQueryStarted(event: QueryStartedEvent): Unit = { + start = start :+ event.json + } + + override def onQueryProgress(event: QueryProgressEvent): Unit = { + progress = progress :+ event.json + } + + override def onQueryIdle(event: QueryIdleEvent): Unit = { + // Do nothing + } + + override def onQueryTerminated(event: QueryTerminatedEvent): Unit = { + terminate = terminate :+ event.json + } + } } class TestForeachWriter[T] extends ForeachWriter[T] { @@ -570,58 +669,6 @@ case class TestClass(value: Int) { override def toString: String = value.toString } -abstract class EventCollector extends StreamingQueryListener { - private lazy val spark = SparkSession.builder().getOrCreate() - - protected def tablePostfix: String - - protected def handleOnQueryStarted(event: QueryStartedEvent): Unit = { - val df = spark.createDataFrame(Seq((event.json, 0))) - df.write.mode("append").saveAsTable(s"listener_start_events$tablePostfix") - } - - protected def handleOnQueryProgress(event: QueryProgressEvent): Unit = { - val df = spark.createDataFrame(Seq((event.json, 0))) - df.write.mode("append").saveAsTable(s"listener_progress_events$tablePostfix") - } - - protected def handleOnQueryTerminated(event: QueryTerminatedEvent): Unit = { - val df = spark.createDataFrame(Seq((event.json, 0))) - df.write.mode("append").saveAsTable(s"listener_terminated_events$tablePostfix") - } -} - -/** - * V1: Initial interface of StreamingQueryListener containing methods `onQueryStarted`, - * `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5. - */ -class EventCollectorV1 extends EventCollector { - override protected def tablePostfix: String = "_v1" - - override def onQueryStarted(event: QueryStartedEvent): Unit = handleOnQueryStarted(event) - - override def onQueryProgress(event: QueryProgressEvent): Unit = handleOnQueryProgress(event) - - override def onQueryTerminated(event: QueryTerminatedEvent): Unit = - handleOnQueryTerminated(event) -} - -/** - * V2: The interface after the method `onQueryIdle` is added. It is Spark 3.5+. - */ -class EventCollectorV2 extends EventCollector { - override protected def tablePostfix: String = "_v2" - - override def onQueryStarted(event: QueryStartedEvent): Unit = handleOnQueryStarted(event) - - override def onQueryProgress(event: QueryProgressEvent): Unit = handleOnQueryProgress(event) - - override def onQueryIdle(event: QueryIdleEvent): Unit = {} - - override def onQueryTerminated(event: QueryTerminatedEvent): Unit = - handleOnQueryTerminated(event) -} - class ForeachBatchFn(val viewName: String) extends VoidFunction2[DataFrame, java.lang.Long] with Serializable { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 484ed0245ddf..c1ceed048ae2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.streaming import java.util.UUID +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} import org.json4s.{JObject, JString} import org.json4s.JsonAST.JValue import org.json4s.JsonDSL.{jobject2assoc, pair2Assoc} @@ -140,6 +142,21 @@ object StreamingQueryListener extends Serializable { } } + private[spark] object QueryStartedEvent { + private val mapper = { + val ret = new ObjectMapper() with ClassTagExtensions + ret.registerModule(DefaultScalaModule) + ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + ret + } + + private[spark] def jsonString(event: QueryStartedEvent): String = + mapper.writeValueAsString(event) + + private[spark] def fromJson(json: String): QueryStartedEvent = + mapper.readValue[QueryStartedEvent](json) + } + /** * Event representing any progress updates in a query. * @param progress The query progress updates. @@ -154,6 +171,21 @@ object StreamingQueryListener extends Serializable { private def jsonValue: JValue = JObject("progress" -> progress.jsonValue) } + private[spark] object QueryProgressEvent { + private val mapper = { + val ret = new ObjectMapper() with ClassTagExtensions + ret.registerModule(DefaultScalaModule) + ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + ret + } + + private[spark] def jsonString(event: QueryProgressEvent): String = + mapper.writeValueAsString(event) + + private[spark] def fromJson(json: String): QueryProgressEvent = + mapper.readValue[QueryProgressEvent](json) + } + /** * Event representing that query is idle and waiting for new data to process. * @@ -177,6 +209,21 @@ object StreamingQueryListener extends Serializable { } } + private[spark] object QueryIdleEvent { + private val mapper = { + val ret = new ObjectMapper() with ClassTagExtensions + ret.registerModule(DefaultScalaModule) + ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + ret + } + + private[spark] def jsonString(event: QueryTerminatedEvent): String = + mapper.writeValueAsString(event) + + private[spark] def fromJson(json: String): QueryTerminatedEvent = + mapper.readValue[QueryTerminatedEvent](json) + } + /** * Event representing that termination of a query. * @@ -211,4 +258,19 @@ object StreamingQueryListener extends Serializable { ("errorClassOnException" -> JString(errorClassOnException.orNull)) } } + + private[spark] object QueryTerminatedEvent { + private val mapper = { + val ret = new ObjectMapper() with ClassTagExtensions + ret.registerModule(DefaultScalaModule) + ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + ret + } + + private[spark] def jsonString(event: QueryTerminatedEvent): String = + mapper.writeValueAsString(event) + + private[spark] def fromJson(json: String): QueryTerminatedEvent = + mapper.readValue[QueryTerminatedEvent](json) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org