This is an automated email from the ASF dual-hosted git repository. brkyvz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4c37a8a [SPARK-30143][SS] Add a timeout on stopping a streaming query 4c37a8a is described below commit 4c37a8a3f4a489b52f1919d2db84f6e32c6a05cd Author: Burak Yavuz <brk...@gmail.com> AuthorDate: Fri Dec 13 15:16:00 2019 -0800 [SPARK-30143][SS] Add a timeout on stopping a streaming query ### What changes were proposed in this pull request? Add a timeout configuration for StreamingQuery.stop() ### Why are the changes needed? The stop() method on a Streaming Query awaits the termination of the stream execution thread. However, the stream execution thread may block forever depending on the streaming source implementation (like in Kafka, which runs UninterruptibleThreads). This causes control flow applications to hang indefinitely as well. We'd like to introduce a timeout to stop the execution thread, so that the control flow thread can decide to do an action if a timeout is hit. ### Does this PR introduce any user-facing change? By default, no. If the timeout configuration is set, then a TimeoutException will be thrown if a stream cannot be stopped within the given timeout. ### How was this patch tested? Unit tests Closes #26771 from brkyvz/stopTimeout. Lead-authored-by: Burak Yavuz <brk...@gmail.com> Co-authored-by: Burak Yavuz <bu...@databricks.com> Signed-off-by: Burak Yavuz <brk...@gmail.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 7 ++ .../execution/streaming/MicroBatchExecution.scala | 3 +- .../sql/execution/streaming/StreamExecution.scala | 26 +++- .../streaming/continuous/ContinuousExecution.scala | 3 +- .../spark/sql/streaming/DataStreamWriter.scala | 11 +- .../spark/sql/streaming/StreamingQuery.scala | 12 +- .../sql/streaming/StreamingQueryManager.scala | 3 +- .../streaming/JavaDataStreamReaderWriterSuite.java | 5 +- .../apache/spark/sql/streaming/StreamSuite.scala | 35 +++++- .../sql/streaming/util/BlockOnStopSource.scala | 132 +++++++++++++++++++++ 10 files changed, 224 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c54008c..91347cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1298,6 +1298,13 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) + val STREAMING_STOP_TIMEOUT = + buildConf("spark.sql.streaming.stopTimeout") + .doc("How long to wait for the streaming execution thread to stop when calling the " + + "streaming query's stop() method in milliseconds. 0 or negative values wait indefinitely.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(0L) + val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL = buildConf("spark.sql.streaming.noDataProgressEventInterval") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 5fe1f92..872c367 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -150,8 +150,7 @@ class MicroBatchExecution( state.set(TERMINATED) if (queryExecutionThread.isAlive) { sparkSession.sparkContext.cancelJobGroup(runId.toString) - queryExecutionThread.interrupt() - queryExecutionThread.join() + interruptAndAwaitExecutionThreadTermination() // microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak sparkSession.sparkContext.cancelJobGroup(runId.toString) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index f470ad3..1cb3955 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming import java.io.{InterruptedIOException, IOException, UncheckedIOException} import java.nio.channels.ClosedByInterruptException import java.util.UUID -import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} +import java.util.concurrent.{CountDownLatch, ExecutionException, TimeoutException, TimeUnit} import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.locks.ReentrantLock @@ -435,6 +435,30 @@ abstract class StreamExecution( } /** + * Interrupts the query execution thread and awaits its termination until until it exceeds the + * timeout. The timeout can be set on "spark.sql.streaming.stopTimeout". + * + * @throws TimeoutException If the thread cannot be stopped within the timeout + */ + @throws[TimeoutException] + protected def interruptAndAwaitExecutionThreadTermination(): Unit = { + val timeout = math.max( + sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_TIMEOUT), 0) + queryExecutionThread.interrupt() + queryExecutionThread.join(timeout) + if (queryExecutionThread.isAlive) { + val stackTraceException = new SparkException("The stream thread was last executing:") + stackTraceException.setStackTrace(queryExecutionThread.getStackTrace) + val timeoutException = new TimeoutException( + s"Stream Execution thread failed to stop within $timeout milliseconds (specified by " + + s"${SQLConf.STREAMING_STOP_TIMEOUT.key}). See the cause on what was " + + "being executed in the streaming query thread.") + timeoutException.initCause(stackTraceException) + throw timeoutException + } + } + + /** * Blocks the current thread until processing for data from the given `source` has reached at * least the given `Offset`. This method is intended for use primarily when writing tests. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 8c7371e..481552a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -427,8 +427,7 @@ class ContinuousExecution( if (queryExecutionThread.isAlive) { // The query execution thread will clean itself up in the finally clause of runContinuous. // We just need to interrupt the long running job. - queryExecutionThread.interrupt() - queryExecutionThread.join() + interruptAndAwaitExecutionThreadTermination() } logInfo(s"Query $prettyIdString was stopped") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 74170b1..62a1add 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.streaming import java.util.Locale +import java.util.concurrent.TimeoutException import scala.collection.JavaConverters._ @@ -238,10 +239,18 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { /** * Starts the execution of the streaming query, which will continually output results to the given * path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with - * the stream. + * the stream. Throws a `TimeoutException` if the following conditions are met: + * - Another run of the same streaming query, that is a streaming query + * sharing the same checkpoint location, is already active on the same + * Spark Driver + * - The SQL configuration `spark.sql.streaming.stopActiveRunOnRestart` + * is enabled + * - The active run cannot be stopped within the timeout controlled by + * the SQL configuration `spark.sql.streaming.stopTimeout` * * @since 2.0.0 */ + @throws[TimeoutException] def start(): StreamingQuery = { if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { throw new AnalysisException("Hive data source can only be used with tables, you can not " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 47ddc88..85d980e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.streaming import java.util.UUID +import java.util.concurrent.TimeoutException import org.apache.spark.annotation.Evolving import org.apache.spark.sql.SparkSession @@ -142,10 +143,17 @@ trait StreamingQuery { def processAllAvailable(): Unit /** - * Stops the execution of this query if it is running. This method blocks until the threads - * performing execution has stopped. + * Stops the execution of this query if it is running. This waits until the termination of the + * query execution threads or until a timeout is hit. + * + * By default stop will block indefinitely. You can configure a timeout by the configuration + * `spark.sql.streaming.stopTimeout`. A timeout of 0 (or negative) milliseconds will block + * indefinitely. If a `TimeoutException` is thrown, users can retry stopping the stream. If the + * issue persists, it is advisable to kill the Spark application. + * * @since 2.0.0 */ + @throws[TimeoutException] def stop(): Unit /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index e64f67c..810f4a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.streaming import java.util.{ConcurrentModificationException, UUID} -import java.util.concurrent.TimeUnit +import java.util.concurrent.{TimeoutException, TimeUnit} import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ @@ -321,6 +321,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo * @param trigger [[Trigger]] for the query. * @param triggerClock [[Clock]] to use for the triggering. */ + @throws[TimeoutException] private[sql] def startQuery( userSpecifiedName: Option[String], userSpecifiedCheckpointLocation: Option[String], diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java index 48cdb26..5903623 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java @@ -18,6 +18,7 @@ package test.org.apache.spark.sql.streaming; import java.io.File; +import java.util.concurrent.TimeoutException; import org.junit.After; import org.junit.Before; @@ -52,7 +53,7 @@ public class JavaDataStreamReaderWriterSuite { } @Test - public void testForeachBatchAPI() { + public void testForeachBatchAPI() throws TimeoutException { StreamingQuery query = spark .readStream() .textFile(input) @@ -66,7 +67,7 @@ public class JavaDataStreamReaderWriterSuite { } @Test - public void testForeachAPI() { + public void testForeachAPI() throws TimeoutException { StreamingQuery query = spark .readStream() .textFile(input) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index a637b42..297d6c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.streaming import java.io.{File, InterruptedIOException, IOException, UncheckedIOException} import java.nio.channels.ClosedByInterruptException -import java.util.concurrent.{CountDownLatch, ExecutionException, TimeoutException, TimeUnit} +import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} +import scala.concurrent.TimeoutException import scala.reflect.ClassTag import scala.util.control.ControlThrowable @@ -42,7 +43,7 @@ import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreCon import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.StreamSourceProvider -import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.sql.streaming.util.{BlockOnStopSourceProvider, StreamManualClock} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils @@ -1125,6 +1126,36 @@ class StreamSuite extends StreamTest { } ) } + + // ProcessingTime trigger generates MicroBatchExecution, and ContinuousTrigger starts a + // ContinuousExecution + Seq(Trigger.ProcessingTime("1 second"), Trigger.Continuous("1 second")).foreach { trigger => + test(s"SPARK-30143: stop waits until timeout if blocked - trigger: $trigger") { + BlockOnStopSourceProvider.enableBlocking() + val sq = spark.readStream.format(classOf[BlockOnStopSourceProvider].getName) + .load() + .writeStream + .format("console") + .trigger(trigger) + .start() + failAfter(60.seconds) { + val startTime = System.nanoTime() + withSQLConf(SQLConf.STREAMING_STOP_TIMEOUT.key -> "2000") { + intercept[TimeoutException] { + sq.stop() + } + } + val duration = (System.nanoTime() - startTime) / 1e6 + assert(duration >= 2000, + s"Should have waited more than 2000 millis, but waited $duration millis") + + BlockOnStopSourceProvider.disableBlocking() + withSQLConf(SQLConf.STREAMING_STOP_TIMEOUT.key -> "0") { + sq.stop() + } + } + } + } } abstract class FakeSource extends StreamSourceProvider { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockOnStopSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockOnStopSource.scala new file mode 100644 index 0000000..f25758c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockOnStopSource.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.util + +import java.util +import java.util.concurrent.CountDownLatch + +import scala.collection.JavaConverters._ + +import org.apache.zookeeper.KeeperException.UnimplementedException + +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider} +import org.apache.spark.sql.connector.catalog.TableCapability.CONTINUOUS_READ +import org.apache.spark.sql.connector.read.{streaming, InputPartition, Scan, ScanBuilder} +import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReaderFactory, ContinuousStream, PartitionOffset} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.StreamSourceProvider +import org.apache.spark.sql.types.{LongType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** The V1 and V2 provider of a streaming source, which blocks indefinitely on the call of stop() */ +object BlockOnStopSourceProvider { + private var _latch: CountDownLatch = _ + val schema: StructType = new StructType().add("id", LongType) + + /** Set the latch that we will use to block the streaming query thread. */ + def enableBlocking(): Unit = { + if (_latch == null || _latch.getCount == 0) { + _latch = new CountDownLatch(1) + } + } + + def disableBlocking(): Unit = { + if (_latch != null) { + _latch.countDown() + _latch = null + } + } +} + +class BlockOnStopSourceProvider extends StreamSourceProvider with TableProvider { + override def getTable(options: CaseInsensitiveStringMap): Table = { + new BlockOnStopSourceTable(BlockOnStopSourceProvider._latch) + } + + override def sourceSchema( + sqlContext: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): (String, StructType) = { + "blockingSource" -> BlockOnStopSourceProvider.schema + } + + override def createSource( + sqlContext: SQLContext, + metadataPath: String, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): Source = { + new BlockOnStopSource(sqlContext.sparkSession, BlockOnStopSourceProvider._latch) + } +} + +/** A V1 Streaming Source which blocks on stop(). It does not produce any data. */ +class BlockOnStopSource(spark: SparkSession, latch: CountDownLatch) extends Source { + // Blocks until latch countdowns + override def stop(): Unit = latch.await() + + // Boiler-plate + override val schema: StructType = BlockOnStopSourceProvider.schema + override def getOffset: Option[Offset] = Some(LongOffset(0)) + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + } +} + +/** A V2 Table, which can create a blocking streaming source for ContinuousExecution. */ +class BlockOnStopSourceTable(latch: CountDownLatch) extends Table with SupportsRead { + override def schema(): StructType = BlockOnStopSourceProvider.schema + + override def name(): String = "blockingSource" + + override def capabilities(): util.Set[TableCapability] = Set(CONTINUOUS_READ).asJava + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new ScanBuilder { + override def build(): Scan = new Scan { + override def readSchema(): StructType = schema() + + override def toContinuousStream(checkpointLocation: String): ContinuousStream = { + new BlockOnStopContinuousStream(latch) + } + } + } + } +} + +/** + * A V2 Streaming Source which blocks on stop(). It does not produce any data. We use this for + * testing stopping in ContinuousExecution. + */ +class BlockOnStopContinuousStream(latch: CountDownLatch) extends ContinuousStream { + + // Blocks until latch countdowns + override def stop(): Unit = latch.await() + + // Boiler-plate + override def planInputPartitions(start: streaming.Offset): Array[InputPartition] = Array.empty + override def mergeOffsets(offsets: Array[PartitionOffset]): streaming.Offset = LongOffset(0L) + override def deserializeOffset(json: String): streaming.Offset = LongOffset(0L) + override def initialOffset(): Offset = LongOffset(0) + override def commit(end: streaming.Offset): Unit = {} + override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = { + throw new UnimplementedException + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org