This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 227e26202522 [SPARK-45178][SS] Fallback to execute a single batch for 
Trigger.AvailableNow with unsupported sources rather than using wrapper
227e26202522 is described below

commit 227e262025229a67f43a8de452215053a9cbf662
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Wed Sep 20 11:05:06 2023 +0900

    [SPARK-45178][SS] Fallback to execute a single batch for 
Trigger.AvailableNow with unsupported sources rather than using wrapper
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to change the behavior when user runs streaming query with 
Trigger.AvailableNow, which query has any source which does not support 
Trigger.AvailableNow. Instead of using wrapper implementation, this PR proposes 
to fall back to execute a single batch (a.k.a Trigger.Once).
    
    This PR introduces a new flag 
`spark.sql.streaming.triggerAvailableNowWrapper.enabled` to retain the behavior 
for advanced and extreme users. The flag is marked as internal since it's 
really only for extreme users who are concerned about behavioral change.
    
    Minor details would be following:
    
    * This PR does not use Trigger.Once, hence users won't see deprecation 
warning for Trigger.Once.
    * This PR will provide a warning log to inform the source(s) which doesn't 
support Trigger.AvailableNow, so that users can indicate which source(s) is/are 
preventing them to enjoy benefits of Trigger.AvailableNow.
    
    ### Why are the changes needed?
    
    We have observed a data duplication issue with 3rd party data source when 
it's used with Trigger.AvailableNow. The source didn't support 
Trigger.AvailableNow, and unfortunately is also not played well with wrapper 
implementation.
    
    We care more about possible correctness issue than better coverage of 
Trigger.AvailableNow, hence want to stop using wrapper implementation by 
default. We also care about not breaking existing query, so fallback to single 
batch execution rather than failing the query.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this introduces a behavioral change for streaming query with 
Trigger.AvailableNow which contains any source not supporting 
Trigger.AvailableNow.
    
    ### How was this patch tested?
    
    Modified UT.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42940 from HeartSaVioR/SPARK-45178.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 docs/ss-migration-guide.md                         |  4 ++
 docs/structured-streaming-programming-guide.md     |  2 +
 .../org/apache/spark/sql/internal/SQLConf.scala    | 11 ++++
 .../AsyncProgressTrackingMicroBatchExecution.scala |  7 +-
 .../streaming/AvailableNowDataStreamWrapper.scala  |  6 ++
 .../execution/streaming/MicroBatchExecution.scala  | 50 +++++++++++++--
 .../spark/sql/execution/streaming/memory.scala     | 19 +++++-
 .../streaming/StreamingQueryListenerSuite.scala    |  6 +-
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  6 +-
 .../sql/streaming/TriggerAvailableNowSuite.scala   | 74 +++++++++++++++++++++-
 10 files changed, 167 insertions(+), 18 deletions(-)

diff --git a/docs/ss-migration-guide.md b/docs/ss-migration-guide.md
index 57fe3a84e12c..3247866206ee 100644
--- a/docs/ss-migration-guide.md
+++ b/docs/ss-migration-guide.md
@@ -26,6 +26,10 @@ Note that this migration guide describes the items specific 
to Structured Stream
 Many items of SQL migration can be applied when migrating Structured Streaming 
to higher versions.
 Please refer [Migration Guide: SQL, Datasets and 
DataFrame](sql-migration-guide.html).
 
+## Upgrading from Structured Streaming 3.5 to 4.0
+
+- Since Spark 4.0, Spark falls back to single batch execution if any source in 
the query does not support `Trigger.AvailableNow`. This is to avoid any 
possible correctness, duplication, and dataloss issue due to incompatibility 
between source and wrapper implementation. (See 
[SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more 
details.)
+
 ## Upgrading from Structured Streaming 3.3 to 3.4
 
 - Since Spark 3.4, `Trigger.Once` is deprecated, and users are encouraged to 
migrate from `Trigger.Once` to `Trigger.AvailableNow`. Please refer 
[SPARK-39805](https://issues.apache.org/jira/browse/SPARK-39805) for more 
details.
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index b84877d67c01..70e763be0d70 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -3265,6 +3265,8 @@ Here are the different kinds of triggers that are 
supported.
                 if the last batch advances the watermark. This helps to 
maintain smaller and predictable
                 state size and smaller latency on the output of stateful 
operators.</li>
         </ul>
+        NOTE: this trigger will be deactivated when there is any source which 
does not support Trigger.AvailableNow.
+        Spark will perform one-time micro-batch as a fall-back. Check the 
above differences for a risk of fallback.
     </td>
   </tr>
   <tr>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 49a4b0bf98bb..cce85da29b62 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2180,6 +2180,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED =
+    buildConf("spark.sql.streaming.triggerAvailableNowWrapper.enabled")
+      .internal()
+      .doc("Whether to use the wrapper implementation of Trigger.AvailableNow 
if the source " +
+        "does not support Trigger.AvailableNow. Enabling this allows the 
benefits of " +
+        "Trigger.AvailableNow with sources which don't support it, but some 
sources " +
+        "may show unexpected behavior including duplication, data loss, etc. 
So use with " +
+        "extreme care! The ideal direction is to persuade developers of 
source(s) to " +
+        "support Trigger.AvailableNow.")
+      .booleanConf
+      .createWithDefault(false)
 
   val VARIABLE_SUBSTITUTE_ENABLED =
     buildConf("spark.sql.variable.substitute")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
index 56cdba881753..206efb9a5450 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
@@ -50,8 +50,6 @@ class AsyncProgressTrackingMicroBatchExecution(
   // to cache the batch id of the last batch written to storage
   private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
 
-  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
-
   // used to check during the first batch if the pipeline is stateful
   private var isFirstBatch: Boolean = true
 
@@ -94,6 +92,9 @@ class AsyncProgressTrackingMicroBatchExecution(
   override val commitLog =
     new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
 
+  // perform quick validation to fail faster
+  validateAndGetTrigger()
+
   override def validateOffsetLogAndGetPrevOffset(latestBatchId: Long): 
Option[OffsetSeq] = {
     /* Initialize committed offsets to a committed batch, which at this
      * is the second latest batch id in the offset log.
@@ -228,6 +229,8 @@ class AsyncProgressTrackingMicroBatchExecution(
     asyncWritesExecutorService.getQueue.size() > 0 || 
asyncWritesExecutorService.getActiveCount > 0
   }
 
+  override protected def getTrigger(): TriggerExecutor = 
validateAndGetTrigger()
+
   private def validateAndGetTrigger(): TriggerExecutor = {
     // validate that the pipeline is using a supported sink
     if (!extraOptions
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
index 0dc510476279..18dd2eba083a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
@@ -28,6 +28,12 @@ import org.apache.spark.sql.connector.read.streaming
 class AvailableNowDataStreamWrapper(val delegate: SparkDataStream)
   extends SparkDataStream with SupportsTriggerAvailableNow with Logging {
 
+  // See SPARK-45178 for more details.
+  logWarning("Activating the wrapper implementation of Trigger.AvailableNow 
for source " +
+    s"[$delegate]. Note that this might introduce possibility of 
deduplication, dataloss, " +
+    "correctness issue. Enable the config with extreme care. We strongly 
recommend to contact " +
+    "the data source developer to support Trigger.AvailableNow.")
+
   private var fetchedOffset: streaming.Offset = _
 
   override def initialOffset(): streaming.Offset = delegate.initialOffset()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 010ac75a73da..8edbfea3eb2c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -52,11 +52,46 @@ class MicroBatchExecution(
 
   @volatile protected var sources: Seq[SparkDataStream] = Seq.empty
 
-  protected val triggerExecutor: TriggerExecutor = trigger match {
-    case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
-    case OneTimeTrigger => SingleBatchExecutor()
-    case AvailableNowTrigger => MultiBatchExecutor()
-    case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
+  @volatile protected[sql] var triggerExecutor: TriggerExecutor = _
+
+  protected def getTrigger(): TriggerExecutor = {
+    assert(sources.nonEmpty, "sources should have been retrieved from the 
plan!")
+    trigger match {
+      case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
+      case OneTimeTrigger => SingleBatchExecutor()
+      case AvailableNowTrigger =>
+        // When the flag is enabled, Spark will wrap sources which do not 
support
+        // Trigger.AvailableNow with wrapper implementation, so that 
Trigger.AvailableNow can
+        // take effect.
+        // When the flag is disabled, Spark will fall back to single batch 
execution, whenever
+        // it figures out any source does not support Trigger.AvailableNow.
+        // See SPARK-45178 for more details.
+        if (sparkSession.sqlContext.conf.getConf(
+            SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) {
+          logInfo("Configured to use the wrapper of Trigger.AvailableNow for 
query " +
+            s"$prettyIdString.")
+          MultiBatchExecutor()
+        } else {
+          val supportsTriggerAvailableNow = sources.distinct.forall { src =>
+            val supports = src.isInstanceOf[SupportsTriggerAvailableNow]
+            if (!supports) {
+              logWarning(s"source [$src] does not support 
Trigger.AvailableNow. Falling back to " +
+                "single batch execution. Note that this may not guarantee 
processing new data if " +
+                "there is an uncommitted batch. Please consult with data 
source developer to " +
+                "support Trigger.AvailableNow.")
+            }
+
+            supports
+          }
+
+          if (supportsTriggerAvailableNow) {
+            MultiBatchExecutor()
+          } else {
+            SingleBatchExecutor()
+          }
+        }
+      case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
+    }
   }
 
   protected var watermarkTracker: WatermarkTracker = _
@@ -130,6 +165,11 @@ class MicroBatchExecution(
       // v2 source
       case r: StreamingDataSourceV2Relation => r.stream
     }
+
+    // Initializing TriggerExecutor relies on `sources`, hence calling this 
after initializing
+    // sources.
+    triggerExecutor = getTrigger()
+
     uniqueSources = triggerExecutor match {
       case _: SingleBatchExecutor =>
         sources.distinct.map {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 732eaa8d783d..fa0744dc19b2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, 
TableCapability}
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory, Scan, ScanBuilder}
-import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, 
MicroBatchStream, Offset => OffsetV2, SparkDataStream}
+import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, 
MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, 
SupportsTriggerAvailableNow}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.connector.SimpleTableProvider
 import org.apache.spark.sql.types.StructType
@@ -155,7 +155,10 @@ case class MemoryStream[A : Encoder](
     id: Int,
     sqlContext: SQLContext,
     numPartitions: Option[Int] = None)
-  extends MemoryStreamBase[A](sqlContext) with MicroBatchStream with Logging {
+  extends MemoryStreamBase[A](sqlContext)
+  with MicroBatchStream
+  with SupportsTriggerAvailableNow
+  with Logging {
 
   protected val output = logicalPlan.output
 
@@ -175,6 +178,9 @@ case class MemoryStream[A : Encoder](
   @GuardedBy("this")
   private var endOffset = new LongOffset(-1)
 
+  @GuardedBy("this")
+  private var availableNowEndOffset: OffsetV2 = _
+
   /**
    * Last offset that was discarded, or -1 if no commits have occurred. Note 
that the value
    * -1 is used in calculations below and isn't just an arbitrary constant.
@@ -201,7 +207,15 @@ case class MemoryStream[A : Encoder](
 
   override def initialOffset: OffsetV2 = LongOffset(-1)
 
+  override def prepareForTriggerAvailableNow(): Unit = synchronized {
+    availableNowEndOffset = latestOffset(initialOffset, 
ReadLimit.allAvailable())
+  }
+
   override def latestOffset(): OffsetV2 = {
+    throw new IllegalStateException("Should not reach here!")
+  }
+
+  override def latestOffset(startOffset: OffsetV2, limit: ReadLimit): OffsetV2 
= {
     if (currentOffset.offset == -1) null else currentOffset
   }
 
@@ -277,6 +291,7 @@ case class MemoryStream[A : Encoder](
     endOffset = LongOffset(-1)
     currentOffset = new LongOffset(-1)
     lastOffsetCommitted = new LongOffset(-1)
+    availableNowEndOffset = null
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 52b740bc5c34..861e4e83ceff 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.concurrent.Waiters.Waiter
 import org.apache.spark.SparkException
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.{Encoder, Row, SparkSession}
-import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
+import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, 
ReadLimit}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamingQueryListener._
@@ -314,9 +314,9 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
       try {
         var numTriggers = 0
         val input = new MemoryStream[Int](0, sqlContext) {
-          override def latestOffset(): OffsetV2 = {
+          override def latestOffset(startOffset: OffsetV2, limit: ReadLimit): 
OffsetV2 = {
             numTriggers += 1
-            super.latestOffset()
+            super.latestOffset(startOffset, limit)
           }
         }
         val clock = new StreamManualClock()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index c3729d50ed09..9444db3e10fa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -43,7 +43,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
+import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, 
ReadLimit}
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.{MemorySink, 
TestForeachWriter}
@@ -230,9 +230,9 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
       private def dataAdded: Boolean = currentOffset.offset != -1
 
       // latestOffset should take 50 ms the first time it is called after data 
is added
-      override def latestOffset(): OffsetV2 = synchronized {
+      override def latestOffset(startOffset: OffsetV2, limit: ReadLimit): 
OffsetV2 = synchronized {
         if (dataAdded) clock.waitTillTime(1050)
-        super.latestOffset()
+        super.latestOffset(startOffset, limit)
       }
 
       // getBatch should take 100 ms the first time it is called
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
index 65deca222073..defd5fd110de 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
@@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.catalyst.util.stringToFile
 import org.apache.spark.sql.connector.read.streaming
 import org.apache.spark.sql.connector.read.streaming.{ReadLimit, 
SupportsAdmissionControl}
-import org.apache.spark.sql.execution.streaming.{LongOffset, MemoryStream, 
Offset, SerializedOffset, Source, StreamingExecutionRelation}
+import org.apache.spark.sql.execution.streaming.{LongOffset, MemoryStream, 
MicroBatchExecution, MultiBatchExecutor, Offset, SerializedOffset, 
SingleBatchExecutor, Source, StreamingExecutionRelation, StreamingQueryWrapper}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{LongType, StructType}
 import org.apache.spark.tags.SlowSQLTest
 
@@ -41,6 +42,10 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest {
     def incrementAvailableOffset(numNewRows: Int): Unit
 
     def sourceName: String
+
+    def reset(): Unit = {
+      currentOffset = 0L
+    }
   }
 
   class TestSource extends TestDataFrameProvider with Source {
@@ -103,6 +108,22 @@ class TriggerAvailableNowSuite extends 
FileStreamSourceTest {
 
     // remove the trailing `$` in the class name
     override def sourceName: String = 
MemoryStream.getClass.getSimpleName.dropRight(1)
+
+    override def reset(): Unit = {
+      super.reset()
+      memoryStream.reset()
+    }
+  }
+
+  def testWithConfigMatrix(testName: String)(testFun: => Any): Unit = {
+    Seq(true, false).foreach { useWrapper =>
+      test(testName + s" (using wrapper: $useWrapper)") {
+        withSQLConf(
+          SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED.key -> 
useWrapper.toString) {
+          testFun
+        }
+      }
+    }
   }
 
   Seq(
@@ -110,7 +131,9 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest 
{
     new TestSourceWithAdmissionControl,
     new TestMicroBatchStream
   ).foreach { testSource =>
-    test(s"TriggerAvailableNow for multiple sources with 
${testSource.getClass}") {
+    testWithConfigMatrix(s"TriggerAvailableNow for multiple sources with 
${testSource.getClass}") {
+      testSource.reset()
+
       withTempDirs { (src, target) =>
         val checkpoint = new File(target, "chk").getCanonicalPath
         val targetDir = new File(target, "data").getCanonicalPath
@@ -154,6 +177,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest 
{
           q.recentProgress.foreach { p =>
             
assert(p.sources.exists(_.description.startsWith(testSource.sourceName)))
           }
+          assertQueryUsingRightBatchExecutor(testSource, q)
           checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"),
             Seq(1, 2, 3, 7, 8, 9).map(_.toString).toDF())
         } finally {
@@ -174,6 +198,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest 
{
           q2.recentProgress.foreach { p =>
             
assert(p.sources.exists(_.description.startsWith(testSource.sourceName)))
           }
+          assertQueryUsingRightBatchExecutor(testSource, q2)
           checkAnswer(sql(s"SELECT * from parquet.`$targetDir`"), (1 to 
12).map(_.toString).toDF())
         } finally {
           q2.stop()
@@ -187,7 +212,9 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest 
{
     new TestSourceWithAdmissionControl,
     new TestMicroBatchStream
   ).foreach { testSource =>
-    test(s"TriggerAvailableNow for single source with ${testSource.getClass}") 
{
+    testWithConfigMatrix(s"TriggerAvailableNow for single source with 
${testSource.getClass}") {
+      testSource.reset()
+
       val tableName = "trigger_available_now_test_table"
       withTable(tableName) {
         val df = testSource.toDF
@@ -210,6 +237,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest 
{
           q.recentProgress.foreach { p =>
             
assert(p.sources.exists(_.description.startsWith(testSource.sourceName)))
           }
+          assertQueryUsingRightBatchExecutor(testSource, q)
           checkAnswer(spark.table(tableName), (1 to 3).toDF())
         } finally {
           q.stop()
@@ -225,6 +253,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest 
{
           q2.recentProgress.foreach { p =>
             
assert(p.sources.exists(_.description.startsWith(testSource.sourceName)))
           }
+          assertQueryUsingRightBatchExecutor(testSource, q2)
           checkAnswer(spark.table(tableName), (1 to 6).toDF())
         } finally {
           q2.stop()
@@ -232,4 +261,43 @@ class TriggerAvailableNowSuite extends 
FileStreamSourceTest {
       }
     }
   }
+
+  private def assertQueryUsingRightBatchExecutor(
+      testSource: TestDataFrameProvider,
+      query: StreamingQuery): Unit = {
+    val useWrapper = query.sparkSession.conf.get(
+      SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)
+
+    if (useWrapper) {
+      assertQueryUsingMultiBatchExecutor(query)
+    } else {
+      testSource match {
+        case _: TestMicroBatchStream =>
+          // Trigger.AvailableNow should take effect because all sources 
support
+          // Trigger.AvailableNow.
+          assertQueryUsingMultiBatchExecutor(query)
+
+        case _ =>
+          // We fall back to single batch executor because there is a source 
which doesn't
+          // support Trigger.AvailableNow.
+          assertQueryUsingSingleBatchExecutor(query)
+      }
+    }
+  }
+
+  private def assertQueryUsingSingleBatchExecutor(query: StreamingQuery): Unit 
= {
+    
assert(getMicroBatchExecution(query).triggerExecutor.isInstanceOf[SingleBatchExecutor])
+  }
+
+  private def assertQueryUsingMultiBatchExecutor(query: StreamingQuery): Unit 
= {
+    
assert(getMicroBatchExecution(query).triggerExecutor.isInstanceOf[MultiBatchExecutor])
+  }
+
+  private def getMicroBatchExecution(query: StreamingQuery): 
MicroBatchExecution = {
+    if (query.isInstanceOf[StreamingQueryWrapper]) {
+      
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.asInstanceOf[MicroBatchExecution]
+    } else {
+      query.asInstanceOf[MicroBatchExecution]
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to