Repository: spark
Updated Branches:
  refs/heads/master 98909c398 -> 6039b1323


[SPARK-24351][SS] offsetLog/commitLog purge thresholdBatchId should be computed 
with current committed epoch but not currentBatchId in CP mode

## What changes were proposed in this pull request?
Compute the thresholdBatchId to purge metadata based on current committed epoch 
instead of currentBatchId in CP mode to avoid cleaning all the committed 
metadata in some case as described in the jira 
[SPARK-24351](https://issues.apache.org/jira/browse/SPARK-24351).

## How was this patch tested?
Add new unit test.

Author: Huang Tengfei <tengfe...@gmail.com>

Closes #21400 from ivoson/branch-cp-meta.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6039b132
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6039b132
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6039b132

Branch: refs/heads/master
Commit: 6039b132304cc77ed39e4ca7813850507ae0b440
Parents: 98909c3
Author: Huang Tengfei <tengfe...@gmail.com>
Authored: Fri Jun 1 10:47:53 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri Jun 1 10:47:53 2018 -0700

----------------------------------------------------------------------
 .../continuous/ContinuousExecution.scala        | 11 +++--
 .../streaming/continuous/ContinuousSuite.scala  | 46 ++++++++++++++++++++
 2 files changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6039b132/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index d16b24c..e3d0cea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -318,9 +318,14 @@ class ContinuousExecution(
       }
     }
 
-    if (minLogEntriesToMaintain < currentBatchId) {
-      offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
-      commitLog.purge(currentBatchId - minLogEntriesToMaintain)
+    // Since currentBatchId increases independently in cp mode, the current 
committed epoch may
+    // be far behind currentBatchId. It is not safe to discard the metadata 
with thresholdBatchId
+    // computed based on currentBatchId. As minLogEntriesToMaintain is used to 
keep the minimum
+    // number of batches that must be retained and made recoverable, so we 
should keep the
+    // specified number of metadata that have been committed.
+    if (minLogEntriesToMaintain <= epoch) {
+      offsetLog.purge(epoch + 1 - minLogEntriesToMaintain)
+      commitLog.purge(epoch + 1 - minLogEntriesToMaintain)
     }
 
     awaitProgressLock.lock()

http://git-wip-us.apache.org/repos/asf/spark/blob/6039b132/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index cd1704a..4980b0c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -297,3 +297,49 @@ class ContinuousStressSuite extends ContinuousSuiteBase {
       CheckAnswerRowsContains(scala.Range(0, 25000).map(Row(_))))
   }
 }
+
+class ContinuousMetaSuite extends ContinuousSuiteBase {
+  import testImplicits._
+
+  // We need to specify spark.sql.streaming.minBatchesToRetain to do the 
following test.
+  override protected def createSparkSession = new TestSparkSession(
+    new SparkContext(
+      "local[10]",
+      "continuous-stream-test-sql-context",
+      sparkConf.set("spark.sql.testkey", "true")
+        .set("spark.sql.streaming.minBatchesToRetain", "2")))
+
+  test("SPARK-24351: check offsetLog/commitLog retained in the checkpoint 
directory") {
+    withTempDir { checkpointDir =>
+      val input = ContinuousMemoryStream[Int]
+      val df = input.toDF().mapPartitions(iter => {
+        // Sleep the task thread for 300 ms to make sure epoch processing time 
3 times
+        // longer than epoch creating interval. So the gap between last 
committed
+        // epoch and currentBatchId grows over time.
+        Thread.sleep(300)
+        iter.map(row => row.getInt(0) * 2)
+      })
+
+      testStream(df)(
+        StartStream(trigger = Trigger.Continuous(100),
+          checkpointLocation = checkpointDir.getAbsolutePath),
+        AddData(input, 1),
+        CheckAnswer(2),
+        // Make sure epoch 2 has been committed before the following 
validation.
+        AwaitEpoch(2),
+        StopStream,
+        AssertOnQuery(q => {
+          q.commitLog.getLatest() match {
+            case Some((latestEpochId, _)) =>
+              val commitLogValidateResult = q.commitLog.get(latestEpochId - 
1).isDefined &&
+                q.commitLog.get(latestEpochId - 2).isEmpty
+              val offsetLogValidateResult = q.offsetLog.get(latestEpochId - 
1).isDefined &&
+                q.offsetLog.get(latestEpochId - 2).isEmpty
+              commitLogValidateResult && offsetLogValidateResult
+            case None => false
+          }
+        })
+      )
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to