GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/21220

    [SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for 
streaming aggregation and deduplication.

    ## What changes were proposed in this pull request?
    This PR enables the MicroBatchExecution to run no-data batches if some 
SparkPlan requires running another batch to output results based on updated 
watermark / processing time. In this PR, I have enabled streaming aggregations 
and streaming deduplicates to automatically run addition batch even if new data 
is available. In future PRs, I will enable streaming join and 
mapGroupsWithState as well.
    
    Major changes/refactoring done in this PR.
    - Refactoring MicroBatchExecution - A major point of confusion in 
MicroBatchExecution control flow was always (at least to me) was that 
`populateStartOffsets` internally called `constructNextBatch` which was not 
obvious from just the name "populateStartOffsets" and made the control flow 
from the main trigger execution loop very confusing (main loop in 
`runActivatedStream` called `constructNextBatch` but only if 
`populateStartOffsets` hadn't already called it). Instead, the refactoring 
makes it cleaner.
        - `populateStartOffsets` only the updates `availableOffsets` and 
`committedOffsets`. Does not call `constructNextBatch`.
        - Main loop in `runActivatedStream` calls `constructNextBatch` which 
returns true or false reflecting whether the next batch is ready for executing. 
This method is now idempotent; if a batch has already been constructed, then it 
will always return true until the batch has been executed.
        - If next batch is ready then we call `runBatch` or sleep. 
        - That's it.
    
    - Refactoring watermark management logic - This has been refactored out 
from `MicroBatchExecution` in a separate class to simplify 
`MicroBatchExecution`.
    
    - New method `shouldRunAnotherBatch` in `IncrementalExecution` - This 
returns true if there is any stateful operation in the last execution plan that 
requires another batch for state cleanup, etc. This is used to decide whether 
to construct a batch or not in `constructNextBatch`.
    
    - Changes to stream testing framework - Many tests used CheckLastBatch to 
validate answers. This assumed that there will be no more batches after the 
last set of input has been processed, so the last batch is the one that has 
output corresponding to the last input. This is not true anymore. To account 
for that, I made two changes.
        - `CheckNewAnswer` is a new test action that verifies the new rows 
generated since the last time the answer was checked by `CheckAnswer`, 
`CheckNewAnswer` or `CheckLastBatch`. This is agnostic to how many batches 
occurred between the last check and now. To do make this easier, I added a 
common trait between MemorySink and MemorySinkV2 to abstract out some common 
methods.
        - `assertNumStateRows` has been updated in the same way to be agnostic 
to batches while checking what the total rows and how many state rows were 
updated (sums up updates since the last check).
    
    
    
    ## How was this patch tested?
    - Changes made to existing tests - Tests have been changed in one of the 
following patterns.
        - Tests where the last input was given again to force another batch to 
be executed and state cleaned up / output generated, they were simplified by 
removing the extra input.
        - Tests using aggregation+watermark where CheckLastBatch were replaced 
with CheckNewAnswer to make them batch agnostic.
    - New tests added to check whether the flag works for streaming aggregation 
and deduplication
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-24157

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21220.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21220
    
----
commit 2ccdc60012b06201b1cf48e8216c21525f731e5a
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-05-02T22:15:31Z

    Enabled no-data batches in MicroBatchExecution for streaming aggregation 
and deduplication.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to