GitHub user tcondie opened a pull request:
https://github.com/apache/spark/pull/15949
[Spark-18339] [SQL] Don't push down current_timestamp for filters in
StructuredStreaming
## What changes were proposed in this pull request?
For the following workflow:
1. I have a column called time which is at minute level precision in a
Streaming DataFrame
2. I want to perform groupBy time, count
3. Then I want my MemorySink to only have the last 30 minutes of counts and
I perform this by
.where('time >= current_timestamp().cast("long") - 30 * 60)
what happens is that the `filter` gets pushed down before the aggregation,
and the filter happens on the source data for the aggregation instead of the
result of the aggregation (where I actually want to filter).
I guess the main issue here is that `current_timestamp` is
non-deterministic in the streaming context and shouldn't be pushed down the
filter.
Does this require us to store the `current_timestamp` for each trigger of
the streaming job, that is something to discuss.
@brkyvz @zsxwing @tdas
## How was this patch tested?
A test was added to StreamingAggregationSuite ensuring the above use case
is handled. The test injects a stream of time values (in seconds) to a query
that runs in complete mode and only outputs the (count) aggregation results for
the past 10 seconds.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tcondie/spark SPARK-18339
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/15949.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #15949
----
commit 729ecedab4b61804a9717cadbd4f2c7b6aa50176
Author: Tyson Condie <[email protected]>
Date: 2016-11-18T23:26:25Z
added CurrentBatchTimestamp
commit b8a1f71bef2aa4c11c08178b2250bd995e952601
Author: Tyson Condie <[email protected]>
Date: 2016-11-18T23:35:27Z
update comment
commit 8f0a27329ea711d1936c2df11a310129e22eb9b5
Author: Tyson Condie <[email protected]>
Date: 2016-11-20T20:41:49Z
add test for filtering time-based aggregation
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]