[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-07-02 Thread GitBox


HeartSaVioR commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-652870311


   @cchighman 
   Thanks for understanding. I still think even in SS case we want to provide 
timestamp to effectively filter out files - the point is whether the timestamp 
is static or dynamic. Let's focus on the batch case here.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-07-02 Thread GitBox


HeartSaVioR commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-652857478


   Your math is correct and I agree it helps. The thing is, how much it would 
help?
   
   When we talk about streaming we are probably talking about the query which 
runs months. The lower bound is static and the query will get boosted on the 
earlier batch to filter out older files, but once the query catches up, 
situation would be similar.
   
   I agree the overall complication is not a goal for this PR - just wanted to 
allow me think less when I go through such complication. One more option, one 
more complication.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-07-02 Thread GitBox


HeartSaVioR commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-652850721


   @cchighman Thanks for reading through the huge wall of text! 
   
   I agree the option can be provided to batch query only, and consider how to 
apply the option to structured streaming later (as we don't have solid idea 
yet). Just to reiterate, I guess we may want to still discuss only lower bound 
vs lower & upper bound, even in batch case.
   
   I also agree the option can be simply applied to the structured streaming 
(only for lower bound) on top of current options. That would play as a 
"filter". As I mentioned in #28422 I already proposed the similar thing, though 
the purpose was for applying "retention" hence dynamically changing instead of 
be static.
   
   That said, the problem is, this approach doesn't help to cut down file 
stream source metadata log, which is another known major issue in file stream 
source. File stream source remembers every file you processed and never drops 
anything. My viewpoint is focused on how we can minimize the entries to 
remember across long query run. We have maxFileAge option but due to some issue 
it doesn't help minimizing the entries. It wouldn't be good if we introduce 
another similar option but leave the major issue behind. 
   
   That's why I cannot agree simply to apply the option to SS. It deserves 
another level of consideration.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-06-28 Thread GitBox


HeartSaVioR commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-650898695


   > I wonder though if structured streaming always implied an event source, 
particularly when streaming from a file source?
   
   Ideally it should be. It's not 100% really for file stream source (as it 
doesn't store offset and allows reading late files), but it's partially 
considered as file stream source "sorts" the files by its timestamp, in both 
directions, forward and backward. Probably you're thinking about "offset" as a 
file offset, while I mean "offset" to mark where we read so far. In this case, 
modified time, with some processed files as there could be new files having 
same modified time (some of files may not be picked up due to max number of 
files per trigger).
   
   > For example, modifiedDateFilter applies specifically to a point in time 
when you begin structured streaming on a file data source. You would not have 
an offset yet in commitedOffsets. The offset use case would imply you are 
restarting an existing, checkpointed stream or attempting to read from a 
checkpointed location, right?
   
   Yes, but you'll want to consider the case where end users "changes" 
modifiedDataFilter before restaring the query from the checkpoint. Every 
options can be changed during restart, which makes things very complicated. 
You'll need to consider combinations of `modifiedDataFilter`, `latestFirst`, 
`maxFilesAge`. And also make sure the behavior is intuitive to the end users. 
Please refer #28422 for what I proposed and which review comments it got.
   
   > When using an offset with a Kafka data source, some write has occurred by 
which a written checkpoint exists. With the file data source for files that 
have not yet been read or written, I'm curious how I would apply offset bounds 
in this way. I was thinking I would have to be reading from a data source that 
had used structured streaming with checkpointing in order for the offset to 
exist (committed).
   > 
   > Does this make sense? It seems like once you've written a checkpoint while 
writing to a stream from the readStream dataframe that's loading files, you 
would have clear context to apply offset-based semantics.
   
   Yeah, that should be ideally an "offset" instead of file stream source's own 
metadata, but I'm not sure how far we want to go. Probably I've been thinking 
too far, but there's a real issue on file stream source metadata growing, and I 
support the way it also clearly fixes the issue. I'm worried that once we 
introduce a relevent option without enough consideration, that would bug us via 
respecting backward compatiblity.
   
   > With_startingOffsetByTimestamp_, you have the ability to indicate 
start/end offsets per topic such as TopicA or TopicB. If this concept were 
applied to a file data source with the underlying intent that each file name 
represented a topic, problems begin to emerge. For example, if there are 
multiple files, they would have different file names, different file names may 
imply a new topic.
   
   `underlying intent that each file name represented a topic` I don't think 
so. Do you remember a file is an unit of processing on file source? Spark 
doesn't process the file partially, in other words, Spark never splits the file 
across micro-batches. If you reflect the file source as Kafka data source, an 
record in Kafka data source = a file in File data source.
   
   So the timestamp can represent the offset how Spark has been read so far - 
though there're some issues below:
   
   1) We allowed to read late files - there's new updated file but modified 
time being set to "previous time". I don't think it should be, but current File 
data source allows to do so, so that's arguable topic.
   
   2) There can be lots of files in same timestamp, and Spark has max files to 
process in a batch. That means there's a case only partial files get processed. 
That's why only storing timestamp doesn't work here and we still need to store 
some processed files as well to mitigate.
   
   3) That's technically really odd, but considering eventual consistency like 
S3... Listing operation may not show some of available files. This may be a 
blocker to use timestamp as an offset, but even with eventual consistency we 
don't expect it shows after days. (Do I understand correctly on expectation for 
eventual consistency on S3?) That said, (ts for (timestamp - some margin) + 
processed files within margin) could be used for timestamp instead. We can 
leverage purge timestamp in SeenFileMap, though I think 7 days are too long. 
And latestFirst should be replaced with something else, because it just simply 
defeats maxFilesAge and unable to apply offset-like approach.
   
   Does it make sense?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL ab

[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-06-27 Thread GitBox


HeartSaVioR commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-650694094


   Please take a look at how Kafka data source options apply with both batch 
and streaming query. The semantic of the option should be applied differently.
   
   
http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries
   
   `startingOffsetsByTimestamp`, `startingOffsets`, `endingOffsetsByTimestamp`, 
`endingOffsets`
   
   If we are not fully sure about, let's only apply the option to batch query, 
and file an issue to address for the streaming query.
   
   Btw, that said, I prefer to have lower bound + upper bound instead of only 
lower bound, as commented earlier on reviewing.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-06-26 Thread GitBox


HeartSaVioR commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-650477921


   The PR has lots of changed lines which are actually not changed 
(indentation). Indentation is the one of style guides, and they didn't seem to 
violate the guide (that said, these changed lines violate the guide). 
   
   Could you please go through the PR changes and revert these changes? Thanks 
in advance!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org