Repository: spark
Updated Branches:
  refs/heads/master 710e4e81a -> 434d74e33


[SPARK-23503][SS] Enforce sequencing of committed epochs for Continuous 
Execution

## What changes were proposed in this pull request?

Made changes to EpochCoordinator so that it enforces a commit order. In case a 
message for epoch n is lost and epoch (n + 1) is ready for commit before epoch 
n is, epoch (n + 1) will wait for epoch n to be committed first.

## How was this patch tested?

Existing tests in ContinuousSuite and EpochCoordinatorSuite.

Author: Efim Poberezkin <e...@poberezkin.ru>

Closes #20936 from efimpoberezkin/pr/sequence-commited-epochs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/434d74e3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/434d74e3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/434d74e3

Branch: refs/heads/master
Commit: 434d74e337465d77fa49ab65e2b5461e5ff7b5c7
Parents: 710e4e8
Author: Efim Poberezkin <e...@poberezkin.ru>
Authored: Fri May 18 16:54:39 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri May 18 16:54:39 2018 -0700

----------------------------------------------------------------------
 .../streaming/continuous/EpochCoordinator.scala | 69 ++++++++++++++++----
 .../continuous/EpochCoordinatorSuite.scala      |  6 +-
 2 files changed, 58 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/434d74e3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
index cc68080..8877ebe 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
@@ -137,30 +137,71 @@ private[continuous] class EpochCoordinator(
   private val partitionOffsets =
     mutable.Map[(Long, Int), PartitionOffset]()
 
+  private var lastCommittedEpoch = startEpoch - 1
+  // Remembers epochs that have to wait for previous epochs to be committed 
first.
+  private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long]
+
   private def resolveCommitsAtEpoch(epoch: Long) = {
-    val thisEpochCommits =
-      partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
+    val thisEpochCommits = findPartitionCommitsForEpoch(epoch)
     val nextEpochOffsets =
       partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
 
     if (thisEpochCommits.size == numWriterPartitions &&
       nextEpochOffsets.size == numReaderPartitions) {
-      logDebug(s"Epoch $epoch has received commits from all partitions. 
Committing globally.")
-      // Sequencing is important here. We must commit to the writer before 
recording the commit
-      // in the query, or we will end up dropping the commit if we restart in 
the middle.
-      writer.commit(epoch, thisEpochCommits.toArray)
-      query.commit(epoch)
-
-      // Cleanup state from before this epoch, now that we know all partitions 
are forever past it.
-      for (k <- partitionCommits.keys.filter { case (e, _) => e < epoch }) {
-        partitionCommits.remove(k)
-      }
-      for (k <- partitionOffsets.keys.filter { case (e, _) => e < epoch }) {
-        partitionOffsets.remove(k)
+
+      // Check that last committed epoch is the previous one for sequencing of 
committed epochs.
+      // If not, add the epoch being currently processed to epochs waiting to 
be committed,
+      // otherwise commit it.
+      if (lastCommittedEpoch != epoch - 1) {
+        logDebug(s"Epoch $epoch has received commits from all partitions " +
+          s"and is waiting for epoch ${epoch - 1} to be committed first.")
+        epochsWaitingToBeCommitted.add(epoch)
+      } else {
+        commitEpoch(epoch, thisEpochCommits)
+        lastCommittedEpoch = epoch
+
+        // Commit subsequent epochs that are waiting to be committed.
+        var nextEpoch = lastCommittedEpoch + 1
+        while (epochsWaitingToBeCommitted.contains(nextEpoch)) {
+          val nextEpochCommits = findPartitionCommitsForEpoch(nextEpoch)
+          commitEpoch(nextEpoch, nextEpochCommits)
+
+          epochsWaitingToBeCommitted.remove(nextEpoch)
+          lastCommittedEpoch = nextEpoch
+          nextEpoch += 1
+        }
+
+        // Cleanup state from before last committed epoch,
+        // now that we know all partitions are forever past it.
+        for (k <- partitionCommits.keys.filter { case (e, _) => e < 
lastCommittedEpoch }) {
+          partitionCommits.remove(k)
+        }
+        for (k <- partitionOffsets.keys.filter { case (e, _) => e < 
lastCommittedEpoch }) {
+          partitionOffsets.remove(k)
+        }
       }
     }
   }
 
+  /**
+   * Collect per-partition commits for an epoch.
+   */
+  private def findPartitionCommitsForEpoch(epoch: Long): 
Iterable[WriterCommitMessage] = {
+    partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
+  }
+
+  /**
+   * Commit epoch to the offset log.
+   */
+  private def commitEpoch(epoch: Long, messages: 
Iterable[WriterCommitMessage]): Unit = {
+    logDebug(s"Epoch $epoch has received commits from all partitions " +
+      s"and is ready to be committed. Committing epoch $epoch.")
+    // Sequencing is important here. We must commit to the writer before 
recording the commit
+    // in the query, or we will end up dropping the commit if we restart in 
the middle.
+    writer.commit(epoch, messages.toArray)
+    query.commit(epoch)
+  }
+
   override def receive: PartialFunction[Any, Unit] = {
     // If we just drop these messages, we won't do any writes to the query. 
The lame duck tasks
     // won't shed errors or anything.

http://git-wip-us.apache.org/repos/asf/spark/blob/434d74e3/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
index 99e3056..82836dc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
@@ -120,7 +120,7 @@ class EpochCoordinatorSuite
     verifyCommitsInOrderOf(List(1, 2))
   }
 
-  ignore("consequent epochs, a message for epoch k arrives after messages for 
epoch (k + 1)") {
+  test("consequent epochs, a message for epoch k arrives after messages for 
epoch (k + 1)") {
     setWriterPartitions(2)
     setReaderPartitions(2)
 
@@ -141,7 +141,7 @@ class EpochCoordinatorSuite
     verifyCommitsInOrderOf(List(1, 2))
   }
 
-  ignore("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") {
+  test("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") {
     setWriterPartitions(1)
     setReaderPartitions(1)
 
@@ -162,7 +162,7 @@ class EpochCoordinatorSuite
     verifyCommitsInOrderOf(List(1, 2, 3, 4))
   }
 
-  ignore("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") {
+  test("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") {
     setWriterPartitions(1)
     setReaderPartitions(1)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to