GitHub user tdas opened a pull request:
https://github.com/apache/spark/pull/21220
[SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for
streaming aggregation and deduplication.
## What changes were proposed in this pull request?
This PR enables the MicroBatchExecution to run no-data batches if some
SparkPlan requires running another batch to output results based on updated
watermark / processing time. In this PR, I have enabled streaming aggregations
and streaming deduplicates to automatically run addition batch even if new data
is available. In future PRs, I will enable streaming join and
mapGroupsWithState as well.
Major changes/refactoring done in this PR.
- Refactoring MicroBatchExecution - A major point of confusion in
MicroBatchExecution control flow was always (at least to me) was that
`populateStartOffsets` internally called `constructNextBatch` which was not
obvious from just the name "populateStartOffsets" and made the control flow
from the main trigger execution loop very confusing (main loop in
`runActivatedStream` called `constructNextBatch` but only if
`populateStartOffsets` hadn't already called it). Instead, the refactoring
makes it cleaner.
- `populateStartOffsets` only the updates `availableOffsets` and
`committedOffsets`. Does not call `constructNextBatch`.
- Main loop in `runActivatedStream` calls `constructNextBatch` which
returns true or false reflecting whether the next batch is ready for executing.
This method is now idempotent; if a batch has already been constructed, then it
will always return true until the batch has been executed.
- If next batch is ready then we call `runBatch` or sleep.
- That's it.
- Refactoring watermark management logic - This has been refactored out
from `MicroBatchExecution` in a separate class to simplify
`MicroBatchExecution`.
- New method `shouldRunAnotherBatch` in `IncrementalExecution` - This
returns true if there is any stateful operation in the last execution plan that
requires another batch for state cleanup, etc. This is used to decide whether
to construct a batch or not in `constructNextBatch`.
- Changes to stream testing framework - Many tests used CheckLastBatch to
validate answers. This assumed that there will be no more batches after the
last set of input has been processed, so the last batch is the one that has
output corresponding to the last input. This is not true anymore. To account
for that, I made two changes.
- `CheckNewAnswer` is a new test action that verifies the new rows
generated since the last time the answer was checked by `CheckAnswer`,
`CheckNewAnswer` or `CheckLastBatch`. This is agnostic to how many batches
occurred between the last check and now. To do make this easier, I added a
common trait between MemorySink and MemorySinkV2 to abstract out some common
methods.
- `assertNumStateRows` has been updated in the same way to be agnostic
to batches while checking what the total rows and how many state rows were
updated (sums up updates since the last check).
## How was this patch tested?
- Changes made to existing tests - Tests have been changed in one of the
following patterns.
- Tests where the last input was given again to force another batch to
be executed and state cleaned up / output generated, they were simplified by
removing the extra input.
- Tests using aggregation+watermark where CheckLastBatch were replaced
with CheckNewAnswer to make them batch agnostic.
- New tests added to check whether the flag works for streaming aggregation
and deduplication
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tdas/spark SPARK-24157
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/21220.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21220
----
commit 2ccdc60012b06201b1cf48e8216c21525f731e5a
Author: Tathagata Das <tathagata.das1565@...>
Date: 2018-05-02T22:15:31Z
Enabled no-data batches in MicroBatchExecution for streaming aggregation
and deduplication.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]