[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19926 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r157047178 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.sources.v2.reader.Offset +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.util.{Clock, Utils} + +class MicroBatchExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: Sink, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + private val triggerExecutor = trigger match { +case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) +case OneTimeTrigger => OneTimeExecutor() +case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") + } + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in QueryExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() +val _logicalPlan = analyzedPlan.transform { + case streamingRelation@StreamingRelation(dataSource, _, output) => +toExecutionRelationMap.getOrElseUpdate(streamingRelation, { + // Materialize source to avoid creating it in every batch + val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" + val source = dataSource.createSource(metadataPath) + nextSourceId += 1 + // We still need to use the previous `output` instead of `source.schema` as attributes in + // "df.logicalPlan" has already used attributes of the previous `output`. + StreamingExecutionRelation(source, output)(sparkSession) +}) +} +sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source } +uniqueSources = sources.distinct +_logicalPlan + } + + /** + * Repeatedly attempts to run batches as data arrives. + */ + protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +triggerExecutor.execute(() => { + startTrigger() + + if (isActive) { +reportTimeTaken("triggerExecution") { + if (currentBatchId < 0) { +// We'll do this initialization only once +populateStartOffsets(sparkSessionForStream) + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) +logDebug(s"Stream running from $committedOffsets to $availableOffsets") + } else { +constructNextBatch() + } + if (dataAvailable) { +currentStatus = currentStatus.copy(isDataAvailable = true) +updateStatusMessage("Processing new data") +runBatch(sparkSessionForStream) + } +} +// Report trigger as
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r157044691 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.sources.v2.reader.Offset +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.util.{Clock, Utils} + +class MicroBatchExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: Sink, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + private val triggerExecutor = trigger match { +case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) +case OneTimeTrigger => OneTimeExecutor() +case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") + } + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in QueryExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() +val _logicalPlan = analyzedPlan.transform { + case streamingRelation@StreamingRelation(dataSource, _, output) => +toExecutionRelationMap.getOrElseUpdate(streamingRelation, { + // Materialize source to avoid creating it in every batch + val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" + val source = dataSource.createSource(metadataPath) + nextSourceId += 1 + // We still need to use the previous `output` instead of `source.schema` as attributes in + // "df.logicalPlan" has already used attributes of the previous `output`. + StreamingExecutionRelation(source, output)(sparkSession) +}) +} +sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source } +uniqueSources = sources.distinct +_logicalPlan + } + + /** + * Repeatedly attempts to run batches as data arrives. + */ + protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +triggerExecutor.execute(() => { + startTrigger() + + if (isActive) { +reportTimeTaken("triggerExecution") { + if (currentBatchId < 0) { +// We'll do this initialization only once +populateStartOffsets(sparkSessionForStream) + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) +logDebug(s"Stream running from $committedOffsets to $availableOffsets") + } else { +constructNextBatch() + } + if (dataAvailable) { +currentStatus = currentStatus.copy(isDataAvailable = true) +updateStatusMessage("Processing new data") +runBatch(sparkSessionForStream) + } +} +// Report trigger as finished
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r157041705 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -231,14 +201,14 @@ class StreamExecution( * processing is done. Thus, the Nth record in this log indicated data that is currently being * processed and the N-1th entry indicates which offsets have been durably committed to the sink. */ - val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets")) + def offsetLog: OffsetSeqLog /** * A log that records the batch ids that have completed. This is used to check if a batch was * fully processed, and its output was committed to the sink, hence no need to process it again. * This is used (for instance) during restart, to help identify which batch to run next. */ - val batchCommitLog = new BatchCommitLog(sparkSession, checkpointFile("commits")) --- End diff -- let's keep `batchCommitLog` and `offsetLog` in the base class since both subclasses need to initialize them. And we can rename `batchCommitLog` to `commitLog` to make it more general. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156532817 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -447,296 +384,6 @@ class StreamExecution( } } - /** - * Populate the start offsets to start the execution at the current offsets stored in the sink - * (i.e. avoid reprocessing data that we have already processed). This function must be called - * before any processing occurs and will populate the following fields: - * - currentBatchId - * - committedOffsets - * - availableOffsets - * The basic structure of this method is as follows: - * - * Identify (from the offset log) the offsets used to run the last batch - * IF last batch exists THEN - *Set the next batch to be executed as the last recovered batch - *Check the commit log to see which batch was committed last - *IF the last batch was committed THEN - * Call getBatch using the last batch start and end offsets - * // above line is needed since some sources assume last batch always re-executes - * Setup for a new batch i.e., start = last batch end, and identify new end - *DONE - * ELSE - *Identify a brand new batch - * DONE - */ - private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { -offsetLog.getLatest() match { --- End diff -- Sure, we could do that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156532669 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo "is not supported in streaming DataFrames/Datasets and will be disabled.") } -new StreamingQueryWrapper(new StreamExecution( +new StreamingQueryWrapper(new MicroBatchExecution( --- End diff -- My current thinking is to have it be a new trigger type. It can't really be a config, because continuous processing (at least in the initial implementation) won't support all operators. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156527867 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -71,27 +68,29 @@ class StreamExecution( import org.apache.spark.sql.streaming.StreamingQueryListener._ - private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay + protected val pollingDelayMs: Long = sparkSession.sessionState.conf.streamingPollingDelay - private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain + protected val minBatchesToRetain: Int = sparkSession.sessionState.conf.minBatchesToRetain --- End diff -- yes, tweaking the var names may make it look better --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156527709 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -447,296 +384,6 @@ class StreamExecution( } } - /** - * Populate the start offsets to start the execution at the current offsets stored in the sink - * (i.e. avoid reprocessing data that we have already processed). This function must be called - * before any processing occurs and will populate the following fields: - * - currentBatchId - * - committedOffsets - * - availableOffsets - * The basic structure of this method is as follows: - * - * Identify (from the offset log) the offsets used to run the last batch - * IF last batch exists THEN - *Set the next batch to be executed as the last recovered batch - *Check the commit log to see which batch was committed last - *IF the last batch was committed THEN - * Call getBatch using the last batch start and end offsets - * // above line is needed since some sources assume last batch always re-executes - * Setup for a new batch i.e., start = last batch end, and identify new end - *DONE - * ELSE - *Identify a brand new batch - * DONE - */ - private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { -offsetLog.getLatest() match { --- End diff -- so, shall we also make them null here and let child classes override them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156527409 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo "is not supported in streaming DataFrames/Datasets and will be disabled.") } -new StreamingQueryWrapper(new StreamExecution( +new StreamingQueryWrapper(new MicroBatchExecution( --- End diff -- I mean, how we switch between ContinuousExecution and MicroBatchExecution? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156471697 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo "is not supported in streaming DataFrames/Datasets and will be disabled.") } -new StreamingQueryWrapper(new StreamExecution( +new StreamingQueryWrapper(new MicroBatchExecution( --- End diff -- Sorry, I'm not sure what you have in mind here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156471355 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -783,29 +430,29 @@ class StreamExecution( } while (notDone) { - awaitBatchLock.lock() + awaitProgressLock.lock() try { -awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS) +awaitProgressLockCondition.await(100, TimeUnit.MILLISECONDS) if (streamDeathCause != null) { throw streamDeathCause } } finally { -awaitBatchLock.unlock() +awaitProgressLock.unlock() } } logDebug(s"Unblocked at $newOffset for $source") } /** A flag to indicate that a batch has completed with no new data available. */ - @volatile private var noNewData = false + @volatile protected var noNewData = false --- End diff -- Yes. The flag is really just a test harness; it's only used in processAllAvailable, so tests can block until there's a batch (or now epoch) that doesn't contain any data. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156470973 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -447,296 +384,6 @@ class StreamExecution( } } - /** - * Populate the start offsets to start the execution at the current offsets stored in the sink - * (i.e. avoid reprocessing data that we have already processed). This function must be called - * before any processing occurs and will populate the following fields: - * - currentBatchId - * - committedOffsets - * - availableOffsets - * The basic structure of this method is as follows: - * - * Identify (from the offset log) the offsets used to run the last batch - * IF last batch exists THEN - *Set the next batch to be executed as the last recovered batch - *Check the commit log to see which batch was committed last - *IF the last batch was committed THEN - * Call getBatch using the last batch start and end offsets - * // above line is needed since some sources assume last batch always re-executes - * Setup for a new batch i.e., start = last batch end, and identify new end - *DONE - * ELSE - *Identify a brand new batch - * DONE - */ - private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { -offsetLog.getLatest() match { --- End diff -- The offset log right now has a strict schema that commit information wouldn't fit in. I was planning to keep both logs in the continuous implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156468624 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -71,27 +68,29 @@ class StreamExecution( import org.apache.spark.sql.streaming.StreamingQueryListener._ - private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay + protected val pollingDelayMs: Long = sparkSession.sessionState.conf.streamingPollingDelay - private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain + protected val minBatchesToRetain: Int = sparkSession.sessionState.conf.minBatchesToRetain --- End diff -- We may want to tweak the variable name, but continuous processing will still need to know how long it should retain commit and offset log entries. Unfortunately we're stuck with the config name, and I don't think it makes sense to introduce a second parallel one doing the same thing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156458057 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo "is not supported in streaming DataFrames/Datasets and will be disabled.") } -new StreamingQueryWrapper(new StreamExecution( +new StreamingQueryWrapper(new MicroBatchExecution( --- End diff -- shall we make it class name configurable? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156451593 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -71,27 +68,29 @@ class StreamExecution( import org.apache.spark.sql.streaming.StreamingQueryListener._ - private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay + protected val pollingDelayMs: Long = sparkSession.sessionState.conf.streamingPollingDelay - private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain + protected val minBatchesToRetain: Int = sparkSession.sessionState.conf.minBatchesToRetain require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive") /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. */ - private val awaitBatchLock = new ReentrantLock(true) - private val awaitBatchLockCondition = awaitBatchLock.newCondition() + protected val awaitProgressLock = new ReentrantLock(true) + protected val awaitProgressLockCondition: Condition = awaitProgressLock.newCondition() --- End diff -- do we really need explicitly typing here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156461586 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -447,296 +384,6 @@ class StreamExecution( } } - /** - * Populate the start offsets to start the execution at the current offsets stored in the sink - * (i.e. avoid reprocessing data that we have already processed). This function must be called - * before any processing occurs and will populate the following fields: - * - currentBatchId - * - committedOffsets - * - availableOffsets - * The basic structure of this method is as follows: - * - * Identify (from the offset log) the offsets used to run the last batch - * IF last batch exists THEN - *Set the next batch to be executed as the last recovered batch - *Check the commit log to see which batch was committed last - *IF the last batch was committed THEN - * Call getBatch using the last batch start and end offsets - * // above line is needed since some sources assume last batch always re-executes - * Setup for a new batch i.e., start = last batch end, and identify new end - *DONE - * ELSE - *Identify a brand new batch - * DONE - */ - private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { -offsetLog.getLatest() match { --- End diff -- while this part of code is removed, `offsetLog` is still in the base class, and same for `batchCommitLog`, offsetLog may be needed as WAL, batchCommitLog should be moved to MicroBatchStreamExecution? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156451756 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -71,27 +68,29 @@ class StreamExecution( import org.apache.spark.sql.streaming.StreamingQueryListener._ - private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay + protected val pollingDelayMs: Long = sparkSession.sessionState.conf.streamingPollingDelay - private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain + protected val minBatchesToRetain: Int = sparkSession.sessionState.conf.minBatchesToRetain require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive") /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. */ - private val awaitBatchLock = new ReentrantLock(true) - private val awaitBatchLockCondition = awaitBatchLock.newCondition() + protected val awaitProgressLock = new ReentrantLock(true) + protected val awaitProgressLockCondition: Condition = awaitProgressLock.newCondition() private val initializationLatch = new CountDownLatch(1) private val startLatch = new CountDownLatch(1) private val terminationLatch = new CountDownLatch(1) - val resolvedCheckpointRoot = { + val resolvedCheckpointRoot: String = { --- End diff -- I think it's easy for the reader to derive that it is a String-typed variable from the code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156457754 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -783,29 +430,29 @@ class StreamExecution( } while (notDone) { - awaitBatchLock.lock() + awaitProgressLock.lock() try { -awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS) +awaitProgressLockCondition.await(100, TimeUnit.MILLISECONDS) if (streamDeathCause != null) { throw streamDeathCause } } finally { -awaitBatchLock.unlock() +awaitProgressLock.unlock() } } logDebug(s"Unblocked at $newOffset for $source") } /** A flag to indicate that a batch has completed with no new data available. */ - @volatile private var noNewData = false + @volatile protected var noNewData = false --- End diff -- will the noNewData flag still be useful for continuous processing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156458765 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala --- @@ -285,12 +285,13 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be if (currentStream != null) currentStream.committedOffsets.toString else "not started" def threadState = - if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead" -def threadStackTrace = if (currentStream != null && currentStream.microBatchThread.isAlive) { - s"Thread stack trace: ${currentStream.microBatchThread.getStackTrace.mkString("\n")}" -} else { - "" -} + if (currentStream != null && currentStream.queryExecutionThread.isAlive) "alive" else "dead" +def threadStackTrace = --- End diff -- we may want to add a new line above this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156459771 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -71,27 +68,29 @@ class StreamExecution( import org.apache.spark.sql.streaming.StreamingQueryListener._ - private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay + protected val pollingDelayMs: Long = sparkSession.sessionState.conf.streamingPollingDelay - private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain + protected val minBatchesToRetain: Int = sparkSession.sessionState.conf.minBatchesToRetain --- End diff -- since this is a base class for both microbatch and continuous processing, is it right to put this variable here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19926 [SPARK-22733] Split StreamExecution into MicroBatchExecution and StreamExecution. ## What changes were proposed in this pull request? StreamExecution is now an abstract base class, which MicroBatchExecution (the current StreamExecution) inherits. When continuous processing is implemented, we'll have a new ContinuousExecution implementation of StreamExecution. A few fields are also renamed to make them less microbatch-specific. ## How was this patch tested? refactoring only You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark continuous-refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19926.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19926 commit 22d93b7d6133bffb271e6db300b936ae4dda74ab Author: Jose TorresDate: 2017-12-07T22:08:28Z Refactor StreamExecution into a parent class so continuous processing can extend it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org