[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19926


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-14 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r157047178
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.sources.v2.reader.Offset
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.util.{Clock, Utils}
+
+class MicroBatchExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: Sink,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  private val triggerExecutor = trigger match {
+case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
+case OneTimeTrigger => OneTimeExecutor()
+case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
+  }
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in QueryExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
+val _logicalPlan = analyzedPlan.transform {
+  case streamingRelation@StreamingRelation(dataSource, _, output) =>
+toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
+  // Materialize source to avoid creating it in every batch
+  val metadataPath = 
s"$resolvedCheckpointRoot/sources/$nextSourceId"
+  val source = dataSource.createSource(metadataPath)
+  nextSourceId += 1
+  // We still need to use the previous `output` instead of 
`source.schema` as attributes in
+  // "df.logicalPlan" has already used attributes of the previous 
`output`.
+  StreamingExecutionRelation(source, output)(sparkSession)
+})
+}
+sources = _logicalPlan.collect { case s: StreamingExecutionRelation => 
s.source }
+uniqueSources = sources.distinct
+_logicalPlan
+  }
+
+  /**
+   * Repeatedly attempts to run batches as data arrives.
+   */
+  protected def runActivatedStream(sparkSessionForStream: SparkSession): 
Unit = {
+triggerExecutor.execute(() => {
+  startTrigger()
+
+  if (isActive) {
+reportTimeTaken("triggerExecution") {
+  if (currentBatchId < 0) {
+// We'll do this initialization only once
+populateStartOffsets(sparkSessionForStream)
+
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
+logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+  } else {
+constructNextBatch()
+  }
+  if (dataAvailable) {
+currentStatus = currentStatus.copy(isDataAvailable = true)
+updateStatusMessage("Processing new data")
+runBatch(sparkSessionForStream)
+  }
+}
+// Report trigger as 

[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-14 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r157044691
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.sources.v2.reader.Offset
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.util.{Clock, Utils}
+
+class MicroBatchExecution(
+sparkSession: SparkSession,
+name: String,
+checkpointRoot: String,
+analyzedPlan: LogicalPlan,
+sink: Sink,
+trigger: Trigger,
+triggerClock: Clock,
+outputMode: OutputMode,
+deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+sparkSession, name, checkpointRoot, analyzedPlan, sink,
+trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  private val triggerExecutor = trigger match {
+case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
+case OneTimeTrigger => OneTimeExecutor()
+case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
+  }
+
+  override lazy val logicalPlan: LogicalPlan = {
+assert(queryExecutionThread eq Thread.currentThread,
+  "logicalPlan must be initialized in QueryExecutionThread " +
+s"but the current thread was ${Thread.currentThread}")
+var nextSourceId = 0L
+val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
+val _logicalPlan = analyzedPlan.transform {
+  case streamingRelation@StreamingRelation(dataSource, _, output) =>
+toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
+  // Materialize source to avoid creating it in every batch
+  val metadataPath = 
s"$resolvedCheckpointRoot/sources/$nextSourceId"
+  val source = dataSource.createSource(metadataPath)
+  nextSourceId += 1
+  // We still need to use the previous `output` instead of 
`source.schema` as attributes in
+  // "df.logicalPlan" has already used attributes of the previous 
`output`.
+  StreamingExecutionRelation(source, output)(sparkSession)
+})
+}
+sources = _logicalPlan.collect { case s: StreamingExecutionRelation => 
s.source }
+uniqueSources = sources.distinct
+_logicalPlan
+  }
+
+  /**
+   * Repeatedly attempts to run batches as data arrives.
+   */
+  protected def runActivatedStream(sparkSessionForStream: SparkSession): 
Unit = {
+triggerExecutor.execute(() => {
+  startTrigger()
+
+  if (isActive) {
+reportTimeTaken("triggerExecution") {
+  if (currentBatchId < 0) {
+// We'll do this initialization only once
+populateStartOffsets(sparkSessionForStream)
+
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
+logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+  } else {
+constructNextBatch()
+  }
+  if (dataAvailable) {
+currentStatus = currentStatus.copy(isDataAvailable = true)
+updateStatusMessage("Processing new data")
+runBatch(sparkSessionForStream)
+  }
+}
+// Report trigger as finished 

[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r157041705
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -231,14 +201,14 @@ class StreamExecution(
* processing is done.  Thus, the Nth record in this log indicated data 
that is currently being
* processed and the N-1th entry indicates which offsets have been 
durably committed to the sink.
*/
-  val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
+  def offsetLog: OffsetSeqLog
 
   /**
* A log that records the batch ids that have completed. This is used to 
check if a batch was
* fully processed, and its output was committed to the sink, hence no 
need to process it again.
* This is used (for instance) during restart, to help identify which 
batch to run next.
*/
-  val batchCommitLog = new BatchCommitLog(sparkSession, 
checkpointFile("commits"))
--- End diff --

let's keep `batchCommitLog` and `offsetLog` in the base class since both 
subclasses need to initialize them. And we can rename `batchCommitLog` to 
`commitLog` to make it more general.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156532817
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -447,296 +384,6 @@ class StreamExecution(
 }
   }
 
-  /**
-   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
-   * (i.e. avoid reprocessing data that we have already processed). This 
function must be called
-   * before any processing occurs and will populate the following fields:
-   *  - currentBatchId
-   *  - committedOffsets
-   *  - availableOffsets
-   *  The basic structure of this method is as follows:
-   *
-   *  Identify (from the offset log) the offsets used to run the last batch
-   *  IF last batch exists THEN
-   *Set the next batch to be executed as the last recovered batch
-   *Check the commit log to see which batch was committed last
-   *IF the last batch was committed THEN
-   *  Call getBatch using the last batch start and end offsets
-   *  //  above line is needed since some sources assume last 
batch always re-executes
-   *  Setup for a new batch i.e., start = last batch end, and identify 
new end
-   *DONE
-   *  ELSE
-   *Identify a brand new batch
-   *  DONE
-   */
-  private def populateStartOffsets(sparkSessionToRunBatches: 
SparkSession): Unit = {
-offsetLog.getLatest() match {
--- End diff --

Sure, we could do that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156532669
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   "is not supported in streaming DataFrames/Datasets and will be 
disabled.")
 }
 
-new StreamingQueryWrapper(new StreamExecution(
+new StreamingQueryWrapper(new MicroBatchExecution(
--- End diff --

My current thinking is to have it be a new trigger type. It can't really be 
a config, because continuous processing (at least in the initial 
implementation) won't support all operators.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156527867
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -71,27 +68,29 @@ class StreamExecution(
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
 
-  private val pollingDelayMs = 
sparkSession.sessionState.conf.streamingPollingDelay
+  protected val pollingDelayMs: Long = 
sparkSession.sessionState.conf.streamingPollingDelay
 
-  private val minBatchesToRetain = 
sparkSession.sessionState.conf.minBatchesToRetain
+  protected val minBatchesToRetain: Int = 
sparkSession.sessionState.conf.minBatchesToRetain
--- End diff --

yes, tweaking the var names may make it look better



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156527709
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -447,296 +384,6 @@ class StreamExecution(
 }
   }
 
-  /**
-   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
-   * (i.e. avoid reprocessing data that we have already processed). This 
function must be called
-   * before any processing occurs and will populate the following fields:
-   *  - currentBatchId
-   *  - committedOffsets
-   *  - availableOffsets
-   *  The basic structure of this method is as follows:
-   *
-   *  Identify (from the offset log) the offsets used to run the last batch
-   *  IF last batch exists THEN
-   *Set the next batch to be executed as the last recovered batch
-   *Check the commit log to see which batch was committed last
-   *IF the last batch was committed THEN
-   *  Call getBatch using the last batch start and end offsets
-   *  //  above line is needed since some sources assume last 
batch always re-executes
-   *  Setup for a new batch i.e., start = last batch end, and identify 
new end
-   *DONE
-   *  ELSE
-   *Identify a brand new batch
-   *  DONE
-   */
-  private def populateStartOffsets(sparkSessionToRunBatches: 
SparkSession): Unit = {
-offsetLog.getLatest() match {
--- End diff --

so, shall we also make them null here and let child classes override them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156527409
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   "is not supported in streaming DataFrames/Datasets and will be 
disabled.")
 }
 
-new StreamingQueryWrapper(new StreamExecution(
+new StreamingQueryWrapper(new MicroBatchExecution(
--- End diff --

I mean, how we switch between ContinuousExecution and MicroBatchExecution?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156471697
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   "is not supported in streaming DataFrames/Datasets and will be 
disabled.")
 }
 
-new StreamingQueryWrapper(new StreamExecution(
+new StreamingQueryWrapper(new MicroBatchExecution(
--- End diff --

Sorry, I'm not sure what you have in mind here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156471355
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -783,29 +430,29 @@ class StreamExecution(
 }
 
 while (notDone) {
-  awaitBatchLock.lock()
+  awaitProgressLock.lock()
   try {
-awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
+awaitProgressLockCondition.await(100, TimeUnit.MILLISECONDS)
 if (streamDeathCause != null) {
   throw streamDeathCause
 }
   } finally {
-awaitBatchLock.unlock()
+awaitProgressLock.unlock()
   }
 }
 logDebug(s"Unblocked at $newOffset for $source")
   }
 
   /** A flag to indicate that a batch has completed with no new data 
available. */
-  @volatile private var noNewData = false
+  @volatile protected var noNewData = false
--- End diff --

Yes. The flag is really just a test harness; it's only used in 
processAllAvailable, so tests can block until there's a batch (or now epoch) 
that doesn't contain any data.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156470973
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -447,296 +384,6 @@ class StreamExecution(
 }
   }
 
-  /**
-   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
-   * (i.e. avoid reprocessing data that we have already processed). This 
function must be called
-   * before any processing occurs and will populate the following fields:
-   *  - currentBatchId
-   *  - committedOffsets
-   *  - availableOffsets
-   *  The basic structure of this method is as follows:
-   *
-   *  Identify (from the offset log) the offsets used to run the last batch
-   *  IF last batch exists THEN
-   *Set the next batch to be executed as the last recovered batch
-   *Check the commit log to see which batch was committed last
-   *IF the last batch was committed THEN
-   *  Call getBatch using the last batch start and end offsets
-   *  //  above line is needed since some sources assume last 
batch always re-executes
-   *  Setup for a new batch i.e., start = last batch end, and identify 
new end
-   *DONE
-   *  ELSE
-   *Identify a brand new batch
-   *  DONE
-   */
-  private def populateStartOffsets(sparkSessionToRunBatches: 
SparkSession): Unit = {
-offsetLog.getLatest() match {
--- End diff --

The offset log right now has a strict schema that commit information 
wouldn't fit in. I was planning to keep both logs in the continuous 
implementation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156468624
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -71,27 +68,29 @@ class StreamExecution(
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
 
-  private val pollingDelayMs = 
sparkSession.sessionState.conf.streamingPollingDelay
+  protected val pollingDelayMs: Long = 
sparkSession.sessionState.conf.streamingPollingDelay
 
-  private val minBatchesToRetain = 
sparkSession.sessionState.conf.minBatchesToRetain
+  protected val minBatchesToRetain: Int = 
sparkSession.sessionState.conf.minBatchesToRetain
--- End diff --

We may want to tweak the variable name, but continuous processing will 
still need to know how long it should retain commit and offset log entries. 
Unfortunately we're stuck with the config name, and I don't think it makes 
sense to introduce a second parallel one doing the same thing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156458057
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   "is not supported in streaming DataFrames/Datasets and will be 
disabled.")
 }
 
-new StreamingQueryWrapper(new StreamExecution(
+new StreamingQueryWrapper(new MicroBatchExecution(
--- End diff --

shall we make it class name configurable? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156451593
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -71,27 +68,29 @@ class StreamExecution(
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
 
-  private val pollingDelayMs = 
sparkSession.sessionState.conf.streamingPollingDelay
+  protected val pollingDelayMs: Long = 
sparkSession.sessionState.conf.streamingPollingDelay
 
-  private val minBatchesToRetain = 
sparkSession.sessionState.conf.minBatchesToRetain
+  protected val minBatchesToRetain: Int = 
sparkSession.sessionState.conf.minBatchesToRetain
   require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive")
 
   /**
* A lock used to wait/notify when batches complete. Use a fair lock to 
avoid thread starvation.
*/
-  private val awaitBatchLock = new ReentrantLock(true)
-  private val awaitBatchLockCondition = awaitBatchLock.newCondition()
+  protected val awaitProgressLock = new ReentrantLock(true)
+  protected val awaitProgressLockCondition: Condition = 
awaitProgressLock.newCondition()
--- End diff --

do we really need explicitly typing here? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156461586
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -447,296 +384,6 @@ class StreamExecution(
 }
   }
 
-  /**
-   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
-   * (i.e. avoid reprocessing data that we have already processed). This 
function must be called
-   * before any processing occurs and will populate the following fields:
-   *  - currentBatchId
-   *  - committedOffsets
-   *  - availableOffsets
-   *  The basic structure of this method is as follows:
-   *
-   *  Identify (from the offset log) the offsets used to run the last batch
-   *  IF last batch exists THEN
-   *Set the next batch to be executed as the last recovered batch
-   *Check the commit log to see which batch was committed last
-   *IF the last batch was committed THEN
-   *  Call getBatch using the last batch start and end offsets
-   *  //  above line is needed since some sources assume last 
batch always re-executes
-   *  Setup for a new batch i.e., start = last batch end, and identify 
new end
-   *DONE
-   *  ELSE
-   *Identify a brand new batch
-   *  DONE
-   */
-  private def populateStartOffsets(sparkSessionToRunBatches: 
SparkSession): Unit = {
-offsetLog.getLatest() match {
--- End diff --

while this part of code is removed, `offsetLog` is still in the base class, 
and same for `batchCommitLog`, 

offsetLog may be needed as WAL, batchCommitLog should be moved to 
MicroBatchStreamExecution?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156451756
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -71,27 +68,29 @@ class StreamExecution(
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
 
-  private val pollingDelayMs = 
sparkSession.sessionState.conf.streamingPollingDelay
+  protected val pollingDelayMs: Long = 
sparkSession.sessionState.conf.streamingPollingDelay
 
-  private val minBatchesToRetain = 
sparkSession.sessionState.conf.minBatchesToRetain
+  protected val minBatchesToRetain: Int = 
sparkSession.sessionState.conf.minBatchesToRetain
   require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive")
 
   /**
* A lock used to wait/notify when batches complete. Use a fair lock to 
avoid thread starvation.
*/
-  private val awaitBatchLock = new ReentrantLock(true)
-  private val awaitBatchLockCondition = awaitBatchLock.newCondition()
+  protected val awaitProgressLock = new ReentrantLock(true)
+  protected val awaitProgressLockCondition: Condition = 
awaitProgressLock.newCondition()
 
   private val initializationLatch = new CountDownLatch(1)
   private val startLatch = new CountDownLatch(1)
   private val terminationLatch = new CountDownLatch(1)
 
-  val resolvedCheckpointRoot = {
+  val resolvedCheckpointRoot: String = {
--- End diff --

I think it's easy for the reader to derive that it is a String-typed 
variable from the code


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156457754
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -783,29 +430,29 @@ class StreamExecution(
 }
 
 while (notDone) {
-  awaitBatchLock.lock()
+  awaitProgressLock.lock()
   try {
-awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
+awaitProgressLockCondition.await(100, TimeUnit.MILLISECONDS)
 if (streamDeathCause != null) {
   throw streamDeathCause
 }
   } finally {
-awaitBatchLock.unlock()
+awaitProgressLock.unlock()
   }
 }
 logDebug(s"Unblocked at $newOffset for $source")
   }
 
   /** A flag to indicate that a batch has completed with no new data 
available. */
-  @volatile private var noNewData = false
+  @volatile protected var noNewData = false
--- End diff --

will the noNewData flag still be useful for continuous processing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156458765
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
@@ -285,12 +285,13 @@ trait StreamTest extends QueryTest with 
SharedSQLContext with TimeLimits with Be
   if (currentStream != null) currentStream.committedOffsets.toString 
else "not started"
 
 def threadState =
-  if (currentStream != null && currentStream.microBatchThread.isAlive) 
"alive" else "dead"
-def threadStackTrace = if (currentStream != null && 
currentStream.microBatchThread.isAlive) {
-  s"Thread stack trace: 
${currentStream.microBatchThread.getStackTrace.mkString("\n")}"
-} else {
-  ""
-}
+  if (currentStream != null && 
currentStream.queryExecutionThread.isAlive) "alive" else "dead"
+def threadStackTrace =
--- End diff --

we may want to add a new line above this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-12 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19926#discussion_r156459771
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -71,27 +68,29 @@ class StreamExecution(
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
 
-  private val pollingDelayMs = 
sparkSession.sessionState.conf.streamingPollingDelay
+  protected val pollingDelayMs: Long = 
sparkSession.sessionState.conf.streamingPollingDelay
 
-  private val minBatchesToRetain = 
sparkSession.sessionState.conf.minBatchesToRetain
+  protected val minBatchesToRetain: Int = 
sparkSession.sessionState.conf.minBatchesToRetain
--- End diff --

since this is a base class for both microbatch and continuous processing, 
is it right to put this variable here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...

2017-12-07 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19926

[SPARK-22733] Split StreamExecution into MicroBatchExecution and 
StreamExecution.

## What changes were proposed in this pull request?

StreamExecution is now an abstract base class, which MicroBatchExecution 
(the current StreamExecution) inherits. When continuous processing is 
implemented, we'll have a new ContinuousExecution implementation of 
StreamExecution.

A few fields are also renamed to make them less microbatch-specific.

## How was this patch tested?

refactoring only


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark continuous-refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19926.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19926


commit 22d93b7d6133bffb271e6db300b936ae4dda74ab
Author: Jose Torres 
Date:   2017-12-07T22:08:28Z

Refactor StreamExecution into a parent class so continuous processing can 
extend it




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org