[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path
HeartSaVioR commented on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-652870311 @cchighman Thanks for understanding. I still think even in SS case we want to provide timestamp to effectively filter out files - the point is whether the timestamp is static or dynamic. Let's focus on the batch case here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path
HeartSaVioR commented on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-652857478 Your math is correct and I agree it helps. The thing is, how much it would help? When we talk about streaming we are probably talking about the query which runs months. The lower bound is static and the query will get boosted on the earlier batch to filter out older files, but once the query catches up, situation would be similar. I agree the overall complication is not a goal for this PR - just wanted to allow me think less when I go through such complication. One more option, one more complication. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path
HeartSaVioR commented on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-652850721 @cchighman Thanks for reading through the huge wall of text! I agree the option can be provided to batch query only, and consider how to apply the option to structured streaming later (as we don't have solid idea yet). Just to reiterate, I guess we may want to still discuss only lower bound vs lower & upper bound, even in batch case. I also agree the option can be simply applied to the structured streaming (only for lower bound) on top of current options. That would play as a "filter". As I mentioned in #28422 I already proposed the similar thing, though the purpose was for applying "retention" hence dynamically changing instead of be static. That said, the problem is, this approach doesn't help to cut down file stream source metadata log, which is another known major issue in file stream source. File stream source remembers every file you processed and never drops anything. My viewpoint is focused on how we can minimize the entries to remember across long query run. We have maxFileAge option but due to some issue it doesn't help minimizing the entries. It wouldn't be good if we introduce another similar option but leave the major issue behind. That's why I cannot agree simply to apply the option to SS. It deserves another level of consideration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path
HeartSaVioR commented on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-650898695 > I wonder though if structured streaming always implied an event source, particularly when streaming from a file source? Ideally it should be. It's not 100% really for file stream source (as it doesn't store offset and allows reading late files), but it's partially considered as file stream source "sorts" the files by its timestamp, in both directions, forward and backward. Probably you're thinking about "offset" as a file offset, while I mean "offset" to mark where we read so far. In this case, modified time, with some processed files as there could be new files having same modified time (some of files may not be picked up due to max number of files per trigger). > For example, modifiedDateFilter applies specifically to a point in time when you begin structured streaming on a file data source. You would not have an offset yet in commitedOffsets. The offset use case would imply you are restarting an existing, checkpointed stream or attempting to read from a checkpointed location, right? Yes, but you'll want to consider the case where end users "changes" modifiedDataFilter before restaring the query from the checkpoint. Every options can be changed during restart, which makes things very complicated. You'll need to consider combinations of `modifiedDataFilter`, `latestFirst`, `maxFilesAge`. And also make sure the behavior is intuitive to the end users. Please refer #28422 for what I proposed and which review comments it got. > When using an offset with a Kafka data source, some write has occurred by which a written checkpoint exists. With the file data source for files that have not yet been read or written, I'm curious how I would apply offset bounds in this way. I was thinking I would have to be reading from a data source that had used structured streaming with checkpointing in order for the offset to exist (committed). > > Does this make sense? It seems like once you've written a checkpoint while writing to a stream from the readStream dataframe that's loading files, you would have clear context to apply offset-based semantics. Yeah, that should be ideally an "offset" instead of file stream source's own metadata, but I'm not sure how far we want to go. Probably I've been thinking too far, but there's a real issue on file stream source metadata growing, and I support the way it also clearly fixes the issue. I'm worried that once we introduce a relevent option without enough consideration, that would bug us via respecting backward compatiblity. > With_startingOffsetByTimestamp_, you have the ability to indicate start/end offsets per topic such as TopicA or TopicB. If this concept were applied to a file data source with the underlying intent that each file name represented a topic, problems begin to emerge. For example, if there are multiple files, they would have different file names, different file names may imply a new topic. `underlying intent that each file name represented a topic` I don't think so. Do you remember a file is an unit of processing on file source? Spark doesn't process the file partially, in other words, Spark never splits the file across micro-batches. If you reflect the file source as Kafka data source, an record in Kafka data source = a file in File data source. So the timestamp can represent the offset how Spark has been read so far - though there're some issues below: 1) We allowed to read late files - there's new updated file but modified time being set to "previous time". I don't think it should be, but current File data source allows to do so, so that's arguable topic. 2) There can be lots of files in same timestamp, and Spark has max files to process in a batch. That means there's a case only partial files get processed. That's why only storing timestamp doesn't work here and we still need to store some processed files as well to mitigate. 3) That's technically really odd, but considering eventual consistency like S3... Listing operation may not show some of available files. This may be a blocker to use timestamp as an offset, but even with eventual consistency we don't expect it shows after days. (Do I understand correctly on expectation for eventual consistency on S3?) That said, (ts for (timestamp - some margin) + processed files within margin) could be used for timestamp instead. We can leverage purge timestamp in SeenFileMap, though I think 7 days are too long. And latestFirst should be replaced with something else, because it just simply defeats maxFilesAge and unable to apply offset-like approach. Does it make sense? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL ab
[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path
HeartSaVioR commented on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-650694094 Please take a look at how Kafka data source options apply with both batch and streaming query. The semantic of the option should be applied differently. http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries `startingOffsetsByTimestamp`, `startingOffsets`, `endingOffsetsByTimestamp`, `endingOffsets` If we are not fully sure about, let's only apply the option to batch query, and file an issue to address for the streaming query. Btw, that said, I prefer to have lower bound + upper bound instead of only lower bound, as commented earlier on reviewing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path
HeartSaVioR commented on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-650477921 The PR has lots of changed lines which are actually not changed (indentation). Indentation is the one of style guides, and they didn't seem to violate the guide (that said, these changed lines violate the guide). Could you please go through the PR changes and revert these changes? Thanks in advance! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org