This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 227e26202522 [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper 227e26202522 is described below commit 227e262025229a67f43a8de452215053a9cbf662 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Wed Sep 20 11:05:06 2023 +0900 [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper ### What changes were proposed in this pull request? This PR proposes to change the behavior when user runs streaming query with Trigger.AvailableNow, which query has any source which does not support Trigger.AvailableNow. Instead of using wrapper implementation, this PR proposes to fall back to execute a single batch (a.k.a Trigger.Once). This PR introduces a new flag `spark.sql.streaming.triggerAvailableNowWrapper.enabled` to retain the behavior for advanced and extreme users. The flag is marked as internal since it's really only for extreme users who are concerned about behavioral change. Minor details would be following: * This PR does not use Trigger.Once, hence users won't see deprecation warning for Trigger.Once. * This PR will provide a warning log to inform the source(s) which doesn't support Trigger.AvailableNow, so that users can indicate which source(s) is/are preventing them to enjoy benefits of Trigger.AvailableNow. ### Why are the changes needed? We have observed a data duplication issue with 3rd party data source when it's used with Trigger.AvailableNow. The source didn't support Trigger.AvailableNow, and unfortunately is also not played well with wrapper implementation. We care more about possible correctness issue than better coverage of Trigger.AvailableNow, hence want to stop using wrapper implementation by default. We also care about not breaking existing query, so fallback to single batch execution rather than failing the query. ### Does this PR introduce _any_ user-facing change? Yes, this introduces a behavioral change for streaming query with Trigger.AvailableNow which contains any source not supporting Trigger.AvailableNow. ### How was this patch tested? Modified UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42940 from HeartSaVioR/SPARK-45178. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- docs/ss-migration-guide.md | 4 ++ docs/structured-streaming-programming-guide.md | 2 + .../org/apache/spark/sql/internal/SQLConf.scala | 11 ++++ .../AsyncProgressTrackingMicroBatchExecution.scala | 7 +- .../streaming/AvailableNowDataStreamWrapper.scala | 6 ++ .../execution/streaming/MicroBatchExecution.scala | 50 +++++++++++++-- .../spark/sql/execution/streaming/memory.scala | 19 +++++- .../streaming/StreamingQueryListenerSuite.scala | 6 +- .../spark/sql/streaming/StreamingQuerySuite.scala | 6 +- .../sql/streaming/TriggerAvailableNowSuite.scala | 74 +++++++++++++++++++++- 10 files changed, 167 insertions(+), 18 deletions(-) diff --git a/docs/ss-migration-guide.md b/docs/ss-migration-guide.md index 57fe3a84e12c..3247866206ee 100644 --- a/docs/ss-migration-guide.md +++ b/docs/ss-migration-guide.md @@ -26,6 +26,10 @@ Note that this migration guide describes the items specific to Structured Stream Many items of SQL migration can be applied when migrating Structured Streaming to higher versions. Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html). +## Upgrading from Structured Streaming 3.5 to 4.0 + +- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.) + ## Upgrading from Structured Streaming 3.3 to 3.4 - Since Spark 3.4, `Trigger.Once` is deprecated, and users are encouraged to migrate from `Trigger.Once` to `Trigger.AvailableNow`. Please refer [SPARK-39805](https://issues.apache.org/jira/browse/SPARK-39805) for more details. diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index b84877d67c01..70e763be0d70 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -3265,6 +3265,8 @@ Here are the different kinds of triggers that are supported. if the last batch advances the watermark. This helps to maintain smaller and predictable state size and smaller latency on the output of stateful operators.</li> </ul> + NOTE: this trigger will be deactivated when there is any source which does not support Trigger.AvailableNow. + Spark will perform one-time micro-batch as a fall-back. Check the above differences for a risk of fallback. </td> </tr> <tr> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 49a4b0bf98bb..cce85da29b62 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2180,6 +2180,17 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED = + buildConf("spark.sql.streaming.triggerAvailableNowWrapper.enabled") + .internal() + .doc("Whether to use the wrapper implementation of Trigger.AvailableNow if the source " + + "does not support Trigger.AvailableNow. Enabling this allows the benefits of " + + "Trigger.AvailableNow with sources which don't support it, but some sources " + + "may show unexpected behavior including duplication, data loss, etc. So use with " + + "extreme care! The ideal direction is to persuade developers of source(s) to " + + "support Trigger.AvailableNow.") + .booleanConf + .createWithDefault(false) val VARIABLE_SUBSTITUTE_ENABLED = buildConf("spark.sql.variable.substitute") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala index 56cdba881753..206efb9a5450 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala @@ -50,8 +50,6 @@ class AsyncProgressTrackingMicroBatchExecution( // to cache the batch id of the last batch written to storage private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) - override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() - // used to check during the first batch if the pipeline is stateful private var isFirstBatch: Boolean = true @@ -94,6 +92,9 @@ class AsyncProgressTrackingMicroBatchExecution( override val commitLog = new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + // perform quick validation to fail faster + validateAndGetTrigger() + override def validateOffsetLogAndGetPrevOffset(latestBatchId: Long): Option[OffsetSeq] = { /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. @@ -228,6 +229,8 @@ class AsyncProgressTrackingMicroBatchExecution( asyncWritesExecutorService.getQueue.size() > 0 || asyncWritesExecutorService.getActiveCount > 0 } + override protected def getTrigger(): TriggerExecutor = validateAndGetTrigger() + private def validateAndGetTrigger(): TriggerExecutor = { // validate that the pipeline is using a supported sink if (!extraOptions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala index 0dc510476279..18dd2eba083a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala @@ -28,6 +28,12 @@ import org.apache.spark.sql.connector.read.streaming class AvailableNowDataStreamWrapper(val delegate: SparkDataStream) extends SparkDataStream with SupportsTriggerAvailableNow with Logging { + // See SPARK-45178 for more details. + logWarning("Activating the wrapper implementation of Trigger.AvailableNow for source " + + s"[$delegate]. Note that this might introduce possibility of deduplication, dataloss, " + + "correctness issue. Enable the config with extreme care. We strongly recommend to contact " + + "the data source developer to support Trigger.AvailableNow.") + private var fetchedOffset: streaming.Offset = _ override def initialOffset(): streaming.Offset = delegate.initialOffset() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 010ac75a73da..8edbfea3eb2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -52,11 +52,46 @@ class MicroBatchExecution( @volatile protected var sources: Seq[SparkDataStream] = Seq.empty - protected val triggerExecutor: TriggerExecutor = trigger match { - case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) - case OneTimeTrigger => SingleBatchExecutor() - case AvailableNowTrigger => MultiBatchExecutor() - case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") + @volatile protected[sql] var triggerExecutor: TriggerExecutor = _ + + protected def getTrigger(): TriggerExecutor = { + assert(sources.nonEmpty, "sources should have been retrieved from the plan!") + trigger match { + case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) + case OneTimeTrigger => SingleBatchExecutor() + case AvailableNowTrigger => + // When the flag is enabled, Spark will wrap sources which do not support + // Trigger.AvailableNow with wrapper implementation, so that Trigger.AvailableNow can + // take effect. + // When the flag is disabled, Spark will fall back to single batch execution, whenever + // it figures out any source does not support Trigger.AvailableNow. + // See SPARK-45178 for more details. + if (sparkSession.sqlContext.conf.getConf( + SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) { + logInfo("Configured to use the wrapper of Trigger.AvailableNow for query " + + s"$prettyIdString.") + MultiBatchExecutor() + } else { + val supportsTriggerAvailableNow = sources.distinct.forall { src => + val supports = src.isInstanceOf[SupportsTriggerAvailableNow] + if (!supports) { + logWarning(s"source [$src] does not support Trigger.AvailableNow. Falling back to " + + "single batch execution. Note that this may not guarantee processing new data if " + + "there is an uncommitted batch. Please consult with data source developer to " + + "support Trigger.AvailableNow.") + } + + supports + } + + if (supportsTriggerAvailableNow) { + MultiBatchExecutor() + } else { + SingleBatchExecutor() + } + } + case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") + } } protected var watermarkTracker: WatermarkTracker = _ @@ -130,6 +165,11 @@ class MicroBatchExecution( // v2 source case r: StreamingDataSourceV2Relation => r.stream } + + // Initializing TriggerExecutor relies on `sources`, hence calling this after initializing + // sources. + triggerExecutor = getTrigger() + uniqueSources = triggerExecutor match { case _: SingleBatchExecutor => sources.distinct.map { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 732eaa8d783d..fa0744dc19b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} -import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream, Offset => OffsetV2, SparkDataStream} +import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsTriggerAvailableNow} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.SimpleTableProvider import org.apache.spark.sql.types.StructType @@ -155,7 +155,10 @@ case class MemoryStream[A : Encoder]( id: Int, sqlContext: SQLContext, numPartitions: Option[Int] = None) - extends MemoryStreamBase[A](sqlContext) with MicroBatchStream with Logging { + extends MemoryStreamBase[A](sqlContext) + with MicroBatchStream + with SupportsTriggerAvailableNow + with Logging { protected val output = logicalPlan.output @@ -175,6 +178,9 @@ case class MemoryStream[A : Encoder]( @GuardedBy("this") private var endOffset = new LongOffset(-1) + @GuardedBy("this") + private var availableNowEndOffset: OffsetV2 = _ + /** * Last offset that was discarded, or -1 if no commits have occurred. Note that the value * -1 is used in calculations below and isn't just an arbitrary constant. @@ -201,7 +207,15 @@ case class MemoryStream[A : Encoder]( override def initialOffset: OffsetV2 = LongOffset(-1) + override def prepareForTriggerAvailableNow(): Unit = synchronized { + availableNowEndOffset = latestOffset(initialOffset, ReadLimit.allAvailable()) + } + override def latestOffset(): OffsetV2 = { + throw new IllegalStateException("Should not reach here!") + } + + override def latestOffset(startOffset: OffsetV2, limit: ReadLimit): OffsetV2 = { if (currentOffset.offset == -1) null else currentOffset } @@ -277,6 +291,7 @@ case class MemoryStream[A : Encoder]( endOffset = LongOffset(-1) currentOffset = new LongOffset(-1) lastOffsetCommitted = new LongOffset(-1) + availableNowEndOffset = null } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 52b740bc5c34..861e4e83ceff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.concurrent.Waiters.Waiter import org.apache.spark.SparkException import org.apache.spark.scheduler._ import org.apache.spark.sql.{Encoder, Row, SparkSession} -import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} +import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQueryListener._ @@ -314,9 +314,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { try { var numTriggers = 0 val input = new MemoryStream[Int](0, sqlContext) { - override def latestOffset(): OffsetV2 = { + override def latestOffset(startOffset: OffsetV2, limit: ReadLimit): OffsetV2 = { numTriggers += 1 - super.latestOffset() + super.latestOffset(startOffset, limit) } } val clock = new StreamManualClock() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index c3729d50ed09..9444db3e10fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.connector.read.InputPartition -import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} +import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.{MemorySink, TestForeachWriter} @@ -230,9 +230,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi private def dataAdded: Boolean = currentOffset.offset != -1 // latestOffset should take 50 ms the first time it is called after data is added - override def latestOffset(): OffsetV2 = synchronized { + override def latestOffset(startOffset: OffsetV2, limit: ReadLimit): OffsetV2 = synchronized { if (dataAdded) clock.waitTillTime(1050) - super.latestOffset() + super.latestOffset(startOffset, limit) } // getBatch should take 100 ms the first time it is called diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala index 65deca222073..defd5fd110de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala @@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.connector.read.streaming import org.apache.spark.sql.connector.read.streaming.{ReadLimit, SupportsAdmissionControl} -import org.apache.spark.sql.execution.streaming.{LongOffset, MemoryStream, Offset, SerializedOffset, Source, StreamingExecutionRelation} +import org.apache.spark.sql.execution.streaming.{LongOffset, MemoryStream, MicroBatchExecution, MultiBatchExecutor, Offset, SerializedOffset, SingleBatchExecutor, Source, StreamingExecutionRelation, StreamingQueryWrapper} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{LongType, StructType} import org.apache.spark.tags.SlowSQLTest @@ -41,6 +42,10 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest { def incrementAvailableOffset(numNewRows: Int): Unit def sourceName: String + + def reset(): Unit = { + currentOffset = 0L + } } class TestSource extends TestDataFrameProvider with Source { @@ -103,6 +108,22 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest { // remove the trailing `$` in the class name override def sourceName: String = MemoryStream.getClass.getSimpleName.dropRight(1) + + override def reset(): Unit = { + super.reset() + memoryStream.reset() + } + } + + def testWithConfigMatrix(testName: String)(testFun: => Any): Unit = { + Seq(true, false).foreach { useWrapper => + test(testName + s" (using wrapper: $useWrapper)") { + withSQLConf( + SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED.key -> useWrapper.toString) { + testFun + } + } + } } Seq( @@ -110,7 +131,9 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest { new TestSourceWithAdmissionControl, new TestMicroBatchStream ).foreach { testSource => - test(s"TriggerAvailableNow for multiple sources with ${testSource.getClass}") { + testWithConfigMatrix(s"TriggerAvailableNow for multiple sources with ${testSource.getClass}") { + testSource.reset() + withTempDirs { (src, target) => val checkpoint = new File(target, "chk").getCanonicalPath val targetDir = new File(target, "data").getCanonicalPath @@ -154,6 +177,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest { q.recentProgress.foreach { p => assert(p.sources.exists(_.description.startsWith(testSource.sourceName))) } + assertQueryUsingRightBatchExecutor(testSource, q) checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"), Seq(1, 2, 3, 7, 8, 9).map(_.toString).toDF()) } finally { @@ -174,6 +198,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest { q2.recentProgress.foreach { p => assert(p.sources.exists(_.description.startsWith(testSource.sourceName))) } + assertQueryUsingRightBatchExecutor(testSource, q2) checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"), (1 to 12).map(_.toString).toDF()) } finally { q2.stop() @@ -187,7 +212,9 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest { new TestSourceWithAdmissionControl, new TestMicroBatchStream ).foreach { testSource => - test(s"TriggerAvailableNow for single source with ${testSource.getClass}") { + testWithConfigMatrix(s"TriggerAvailableNow for single source with ${testSource.getClass}") { + testSource.reset() + val tableName = "trigger_available_now_test_table" withTable(tableName) { val df = testSource.toDF @@ -210,6 +237,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest { q.recentProgress.foreach { p => assert(p.sources.exists(_.description.startsWith(testSource.sourceName))) } + assertQueryUsingRightBatchExecutor(testSource, q) checkAnswer(spark.table(tableName), (1 to 3).toDF()) } finally { q.stop() @@ -225,6 +253,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest { q2.recentProgress.foreach { p => assert(p.sources.exists(_.description.startsWith(testSource.sourceName))) } + assertQueryUsingRightBatchExecutor(testSource, q2) checkAnswer(spark.table(tableName), (1 to 6).toDF()) } finally { q2.stop() @@ -232,4 +261,43 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest { } } } + + private def assertQueryUsingRightBatchExecutor( + testSource: TestDataFrameProvider, + query: StreamingQuery): Unit = { + val useWrapper = query.sparkSession.conf.get( + SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED) + + if (useWrapper) { + assertQueryUsingMultiBatchExecutor(query) + } else { + testSource match { + case _: TestMicroBatchStream => + // Trigger.AvailableNow should take effect because all sources support + // Trigger.AvailableNow. + assertQueryUsingMultiBatchExecutor(query) + + case _ => + // We fall back to single batch executor because there is a source which doesn't + // support Trigger.AvailableNow. + assertQueryUsingSingleBatchExecutor(query) + } + } + } + + private def assertQueryUsingSingleBatchExecutor(query: StreamingQuery): Unit = { + assert(getMicroBatchExecution(query).triggerExecutor.isInstanceOf[SingleBatchExecutor]) + } + + private def assertQueryUsingMultiBatchExecutor(query: StreamingQuery): Unit = { + assert(getMicroBatchExecution(query).triggerExecutor.isInstanceOf[MultiBatchExecutor]) + } + + private def getMicroBatchExecution(query: StreamingQuery): MicroBatchExecution = { + if (query.isInstanceOf[StreamingQueryWrapper]) { + query.asInstanceOf[StreamingQueryWrapper].streamingQuery.asInstanceOf[MicroBatchExecution] + } else { + query.asInstanceOf[MicroBatchExecution] + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org