GitHub user tcondie opened a pull request:

    https://github.com/apache/spark/pull/15949

    [Spark-18339] [SQL] Don't push down current_timestamp for filters in 
StructuredStreaming

    ## What changes were proposed in this pull request?
    
    For the following workflow:
    1. I have a column called time which is at minute level precision in a 
Streaming DataFrame
    2. I want to perform groupBy time, count
    3. Then I want my MemorySink to only have the last 30 minutes of counts and 
I perform this by
    .where('time >= current_timestamp().cast("long") - 30 * 60)
    what happens is that the `filter` gets pushed down before the aggregation, 
and the filter happens on the source data for the aggregation instead of the 
result of the aggregation (where I actually want to filter).
    I guess the main issue here is that `current_timestamp` is 
non-deterministic in the streaming context and shouldn't be pushed down the 
filter.
    Does this require us to store the `current_timestamp` for each trigger of 
the streaming job, that is something to discuss.
    
    @brkyvz @zsxwing @tdas 
    
    ## How was this patch tested?
    
    A test was added to StreamingAggregationSuite ensuring the above use case 
is handled. The test injects a stream of time values (in seconds) to a query 
that runs in complete mode and only outputs the (count) aggregation results for 
the past 10 seconds. 


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tcondie/spark SPARK-18339

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/15949.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #15949
    
----
commit 729ecedab4b61804a9717cadbd4f2c7b6aa50176
Author: Tyson Condie <[email protected]>
Date:   2016-11-18T23:26:25Z

    added CurrentBatchTimestamp

commit b8a1f71bef2aa4c11c08178b2250bd995e952601
Author: Tyson Condie <[email protected]>
Date:   2016-11-18T23:35:27Z

    update comment

commit 8f0a27329ea711d1936c2df11a310129e22eb9b5
Author: Tyson Condie <[email protected]>
Date:   2016-11-20T20:41:49Z

    add test for filtering time-based aggregation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to