asfgit closed pull request #23095: [SPARK-23886][SS] Update query status for 
ContinuousExecution
URL: https://github.com/apache/spark/pull/23095
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 2cac86599ef19..f2dda0373c7ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -146,6 +146,12 @@ class MicroBatchExecution(
     logInfo(s"Query $prettyIdString was stopped")
   }
 
+  /** Begins recording statistics about query progress for a given trigger. */
+  override protected def startTrigger(): Unit = {
+    super.startTrigger()
+    currentStatus = currentStatus.copy(isTriggerActive = true)
+  }
+
   /**
    * Repeatedly attempts to run batches as data arrives.
    */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 392229bcb5f55..a5fbb56e27099 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -114,7 +114,6 @@ trait ProgressReporter extends Logging {
     logDebug("Starting Trigger Calculation")
     lastTriggerStartTimestamp = currentTriggerStartTimestamp
     currentTriggerStartTimestamp = triggerClock.getTimeMillis()
-    currentStatus = currentStatus.copy(isTriggerActive = true)
     currentTriggerStartOffsets = null
     currentTriggerEndOffsets = null
     currentDurationsMs.clear()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 4a7df731da67d..adbec0b00f368 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -117,6 +117,8 @@ class ContinuousExecution(
     // For at least once, we can just ignore those reports and risk duplicates.
     commitLog.getLatest() match {
       case Some((latestEpochId, _)) =>
+        updateStatusMessage("Starting new streaming query " +
+          s"and getting offsets from latest epoch $latestEpochId")
         val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
           throw new IllegalStateException(
             s"Batch $latestEpochId was committed without end epoch offsets!")
@@ -128,6 +130,7 @@ class ContinuousExecution(
         nextOffsets
       case None =>
         // We are starting this stream for the first time. Offsets are all 
None.
+        updateStatusMessage("Starting new streaming query")
         logInfo(s"Starting new streaming query.")
         currentBatchId = 0
         OffsetSeq.fill(continuousSources.map(_ => null): _*)
@@ -260,6 +263,7 @@ class ContinuousExecution(
       epochUpdateThread.setDaemon(true)
       epochUpdateThread.start()
 
+      updateStatusMessage("Running")
       reportTimeTaken("runContinuous") {
         SQLExecution.withNewExecutionId(
           sparkSessionForQuery, lastExecution) {
@@ -319,6 +323,8 @@ class ContinuousExecution(
    * before this is called.
    */
   def commit(epoch: Long): Unit = {
+    updateStatusMessage(s"Committing epoch $epoch")
+
     assert(continuousSources.length == 1, "only one continuous source 
supported currently")
     assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not 
reported before commit")
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index a0c9bcc8929eb..ca79e0248c06b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -28,9 +28,11 @@ import org.apache.spark.annotation.InterfaceStability
  * Reports information about the instantaneous status of a streaming query.
  *
  * @param message A human readable description of what the stream is currently 
doing.
- * @param isDataAvailable True when there is new data to be processed.
+ * @param isDataAvailable True when there is new data to be processed. Doesn't 
apply
+ *                        to ContinuousExecution where it is always false.
  * @param isTriggerActive True when the trigger is actively firing, false when 
waiting for the
- *                        next trigger time.
+ *                        next trigger time. Doesn't apply to 
ContinuousExecution where it is
+ *                        always false.
  *
  * @since 2.1.0
  */
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueryStatusAndProgressSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueryStatusAndProgressSuite.scala
new file mode 100644
index 0000000000000..10bea7f090571
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueryStatusAndProgressSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.continuous
+
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
+import org.apache.spark.sql.streaming.Trigger
+
+class ContinuousQueryStatusAndProgressSuite extends ContinuousSuiteBase {
+  test("StreamingQueryStatus - ContinuousExecution isDataAvailable and 
isTriggerActive " +
+      "should be false") {
+    import testImplicits._
+
+    val input = ContinuousMemoryStream[Int]
+
+    def assertStatus(stream: StreamExecution): Unit = {
+      assert(stream.status.isDataAvailable === false)
+      assert(stream.status.isTriggerActive === false)
+    }
+
+    val trigger = Trigger.Continuous(100)
+    testStream(input.toDF(), useV2Sink = true)(
+      StartStream(trigger),
+      Execute(assertStatus),
+      AddData(input, 0, 1, 2),
+      Execute(assertStatus),
+      CheckAnswer(0, 1, 2),
+      Execute(assertStatus),
+      StopStream,
+      Execute(assertStatus),
+      AddData(input, 3, 4, 5),
+      Execute(assertStatus),
+      StartStream(trigger),
+      Execute(assertStatus),
+      CheckAnswer(0, 1, 2, 3, 4, 5),
+      Execute(assertStatus),
+      StopStream,
+      Execute(assertStatus))
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to