viirya commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329632673


##########
docs/ss-migration-guide.md:
##########
@@ -26,6 +26,10 @@ Note that this migration guide describes the items specific 
to Structured Stream
 Many items of SQL migration can be applied when migrating Structured Streaming 
to higher versions.
 Please refer [Migration Guide: SQL, Datasets and 
DataFrame](sql-migration-guide.html).
 
+## Upgrading from Structured Streaming 3.5 to 4.0
+
+- Since Spark 4.0, Spark falls back to single batch execution if any source in 
the query does not support `Trigger.AvailableNow`. This is to avoid any 
possible correctness, duplication, and dataloss issue due to incompatibility 
between source and wrapper implementation. (See 
[SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more 
details.)

Review Comment:
   If this is about correctness, do you plan to backport this to 3.4/3.5 
branch? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala:
##########
@@ -28,6 +28,12 @@ import org.apache.spark.sql.connector.read.streaming
 class AvailableNowDataStreamWrapper(val delegate: SparkDataStream)
   extends SparkDataStream with SupportsTriggerAvailableNow with Logging {
 
+  // See SPARK-45178 for more details.
+  logWarning(s"Activating the wrapper implementation of Trigger.AvailableNow 
for source " +
+    s"[$delegate]. Note that this might introduce possibility of 
deduplication, dataloss, " +
+    s"correctness issue. Enable the config with extreme care. We strongly 
recommend to contact " +
+    "the data source developer to support Trigger.AvailableNow.")

Review Comment:
   ```suggestion
     logWarning("Activating the wrapper implementation of Trigger.AvailableNow 
for source " +
       s"[$delegate]. Note that this might introduce possibility of 
deduplication, dataloss, " +
       "correctness issue. Enable the config with extreme care. We strongly 
recommend to contact " +
       "the data source developer to support Trigger.AvailableNow.")
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala:
##########
@@ -201,7 +207,15 @@ case class MemoryStream[A : Encoder](
 
   override def initialOffset: OffsetV2 = LongOffset(-1)
 
+  override def prepareForTriggerAvailableNow(): Unit = synchronized {
+    availableNowEndOffset = latestOffset(initialOffset, 
ReadLimit.allAvailable())
+  }
+
   override def latestOffset(): OffsetV2 = {
+    throw new IllegalStateException("Should not reach here!")
+  }

Review Comment:
   Not sure why this cannot be called. Cannot `MemoryStream` be used as a 
`MicroBatchStream` after this change?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -52,11 +52,46 @@ class MicroBatchExecution(
 
   @volatile protected var sources: Seq[SparkDataStream] = Seq.empty
 
-  protected val triggerExecutor: TriggerExecutor = trigger match {
-    case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
-    case OneTimeTrigger => SingleBatchExecutor()
-    case AvailableNowTrigger => MultiBatchExecutor()
-    case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
+  @volatile protected[sql] var triggerExecutor: TriggerExecutor = _
+
+  protected def getTrigger(): TriggerExecutor = {
+    assert(sources.nonEmpty, "sources should have been retrieved from the 
plan!")
+    trigger match {
+      case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
+      case OneTimeTrigger => SingleBatchExecutor()
+      case AvailableNowTrigger =>
+        // When the flag is enabled, Spark will wrap sources which does not 
support

Review Comment:
   ```suggestion
           // When the flag is enabled, Spark will wrap sources which do not 
support
   ```



##########
docs/ss-migration-guide.md:
##########
@@ -26,6 +26,10 @@ Note that this migration guide describes the items specific 
to Structured Stream
 Many items of SQL migration can be applied when migrating Structured Streaming 
to higher versions.
 Please refer [Migration Guide: SQL, Datasets and 
DataFrame](sql-migration-guide.html).
 
+## Upgrading from Structured Streaming 3.5 to 4.0
+
+- Since Spark 4.0, Spark falls back to single batch execution if any source in 
the query does not support `Trigger.AvailableNow`. This is to avoid any 
possible correctness, duplication, and dataloss issue due to incompatibility 
between source and wrapper implementation. (See 
[SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more 
details.)

Review Comment:
   For "single batch execution", isn't it deprecated now? Do we guarantee that 
it could be used in future version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to