Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20936#discussion_r178116567
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
    @@ -145,18 +149,42 @@ private[continuous] class EpochCoordinator(
     
         if (thisEpochCommits.size == numWriterPartitions &&
           nextEpochOffsets.size == numReaderPartitions) {
    -      logDebug(s"Epoch $epoch has received commits from all partitions. 
Committing globally.")
    -      // Sequencing is important here. We must commit to the writer before 
recording the commit
    -      // in the query, or we will end up dropping the commit if we restart 
in the middle.
    -      writer.commit(epoch, thisEpochCommits.toArray)
    -      query.commit(epoch)
    -
    -      // Cleanup state from before this epoch, now that we know all 
partitions are forever past it.
    -      for (k <- partitionCommits.keys.filter { case (e, _) => e < epoch }) 
{
    -        partitionCommits.remove(k)
    -      }
    -      for (k <- partitionOffsets.keys.filter { case (e, _) => e < epoch }) 
{
    -        partitionOffsets.remove(k)
    +
    +      // Check that last committed epoch is the previous one for 
sequencing of committed epochs.
    +      if (lastCommittedEpoch == epoch - 1) {
    +        logDebug(s"Epoch $epoch has received commits from all partitions. 
Committing globally.")
    +        // Sequencing is important here. We must commit to the writer 
before recording the commit
    +        // in the query, or we will end up dropping the commit if we 
restart in the middle.
    +        writer.commit(epoch, thisEpochCommits.toArray)
    +        query.commit(epoch)
    +        lastCommittedEpoch = epoch
    +
    +        // Commit subsequent epochs that are waiting to be committed.
    +        var nextEpoch = lastCommittedEpoch + 1
    +        while (epochsWaitingToBeCommitted.contains(nextEpoch)) {
    +          val nextEpochCommits =
    +            partitionCommits.collect { case ((e, _), msg) if e == 
nextEpoch => msg }
    +          logDebug(s"Committing epoch $nextEpoch.")
    +          writer.commit(nextEpoch, nextEpochCommits.toArray)
    +          query.commit(nextEpoch)
    --- End diff --
    
    There's a bit of duplicated logic here - helper methods would probably be 
nice.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to