cchighman commented on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-650900831
> > I wonder though if structured streaming always implied an event source, particularly when streaming from a file source? > > Ideally it should be. It's not 100% really for file stream source (as it doesn't store offset and allows reading late files), but it's partially considered as file stream source "sorts" the files by its timestamp, in both directions, forward and backward. Probably you're thinking about "offset" as a file offset, while I mean "offset" to mark where we read so far. In this case, modified time, with some processed files as there could be new files having same modified time (some of files may not be picked up due to max number of files per trigger). > > > For example, modifiedDateFilter applies specifically to a point in time when you begin structured streaming on a file data source. You would not have an offset yet in commitedOffsets. The offset use case would imply you are restarting an existing, checkpointed stream or attempting to read from a checkpointed location, right? > > Yes, but you'll want to consider the case where end users "changes" modifiedDataFilter before restaring the query from the checkpoint. Every options can be changed during restart, which makes things very complicated. You'll need to consider combinations of `modifiedDataFilter`, `latestFirst`, `maxFilesAge`. And also make sure the behavior is intuitive to the end users. Please refer #28422 for what I proposed and which review comments it got. > > > When using an offset with a Kafka data source, some write has occurred by which a written checkpoint exists. With the file data source for files that have not yet been read or written, I'm curious how I would apply offset bounds in this way. I was thinking I would have to be reading from a data source that had used structured streaming with checkpointing in order for the offset to exist (committed). > > > > Does this make sense? It seems like once you've written a checkpoint while writing to a stream from the readStream dataframe that's loading files, you would have clear context to apply offset-based semantics. > > Yeah, that should be ideally an "offset" instead of file stream source's own metadata, but I'm not sure how far we want to go. Probably I've been thinking too far, but there's a real issue on file stream source metadata growing, and I support the way it also clearly fixes the issue. I'm worried that once we introduce a relevent option without enough consideration, that would bug us via respecting backward compatiblity. > > > With_startingOffsetByTimestamp_, you have the ability to indicate start/end offsets per topic such as TopicA or TopicB. If this concept were applied to a file data source with the underlying intent that each file name represented a topic, problems begin to emerge. For example, if there are multiple files, they would have different file names, different file names may imply a new topic. > > `underlying intent that each file name represented a topic` - I don't think so (If you thought I proposed it then I didn't mean that). Do you remember a file is an unit of processing on file source? Spark doesn't process the file partially, in other words, Spark never splits the file across micro-batches. If you reflect the file source as Kafka data source, an record in Kafka data source = a file in File data source. > > So the timestamp can represent the offset how Spark has been read so far - though there're some issues below: > > 1) We allowed to read late files - there's new updated file but modified time being set to "previous time". I don't think it should be, but current File data source allows to do so, so that's arguable topic. > > 2) There can be lots of files in same timestamp, and Spark has max files to process in a batch. That means there's a case only partial files get processed. That's why only storing timestamp doesn't work here and we still need to store some processed files as well to mitigate. > > 3) That's technically really odd, but considering eventual consistency like S3... Listing operation may not show some of available files. This may be a blocker to use timestamp as an offset, but even with eventual consistency we don't expect it shows after days. (Do I understand correctly on expectation for eventual consistency on S3?) That said, (ts for (timestamp - some margin) + processed files within margin) could be used for timestamp instead. We can leverage purge timestamp in SeenFileMap, though I think 7 days are too long. And latestFirst should be replaced with something else, because it just simply defeats maxFilesAge and unable to apply offset-like approach. > > Does it make sense? We were writing responses at the same time. Just saw this and will read through ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
