Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20936#discussion_r178116567
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
---
@@ -145,18 +149,42 @@ private[continuous] class EpochCoordinator(
if (thisEpochCommits.size == numWriterPartitions &&
nextEpochOffsets.size == numReaderPartitions) {
- logDebug(s"Epoch $epoch has received commits from all partitions.
Committing globally.")
- // Sequencing is important here. We must commit to the writer before
recording the commit
- // in the query, or we will end up dropping the commit if we restart
in the middle.
- writer.commit(epoch, thisEpochCommits.toArray)
- query.commit(epoch)
-
- // Cleanup state from before this epoch, now that we know all
partitions are forever past it.
- for (k <- partitionCommits.keys.filter { case (e, _) => e < epoch })
{
- partitionCommits.remove(k)
- }
- for (k <- partitionOffsets.keys.filter { case (e, _) => e < epoch })
{
- partitionOffsets.remove(k)
+
+ // Check that last committed epoch is the previous one for
sequencing of committed epochs.
+ if (lastCommittedEpoch == epoch - 1) {
+ logDebug(s"Epoch $epoch has received commits from all partitions.
Committing globally.")
+ // Sequencing is important here. We must commit to the writer
before recording the commit
+ // in the query, or we will end up dropping the commit if we
restart in the middle.
+ writer.commit(epoch, thisEpochCommits.toArray)
+ query.commit(epoch)
+ lastCommittedEpoch = epoch
+
+ // Commit subsequent epochs that are waiting to be committed.
+ var nextEpoch = lastCommittedEpoch + 1
+ while (epochsWaitingToBeCommitted.contains(nextEpoch)) {
+ val nextEpochCommits =
+ partitionCommits.collect { case ((e, _), msg) if e ==
nextEpoch => msg }
+ logDebug(s"Committing epoch $nextEpoch.")
+ writer.commit(nextEpoch, nextEpochCommits.toArray)
+ query.commit(nextEpoch)
--- End diff --
There's a bit of duplicated logic here - helper methods would probably be
nice.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]