HeartSaVioR commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-650898695


   > I wonder though if structured streaming always implied an event source, 
particularly when streaming from a file source?
   
   Ideally it should be. It's not 100% really for file stream source (as it 
doesn't store offset and allows reading late files), but it's partially 
considered as file stream source "sorts" the files by its timestamp, in both 
directions, forward and backward. Probably you're thinking about "offset" as a 
file offset, while I mean "offset" to mark where we read so far. In this case, 
modified time, with some processed files as there could be new files having 
same modified time (some of files may not be picked up due to max number of 
files per trigger).
   
   > For example, modifiedDateFilter applies specifically to a point in time 
when you begin structured streaming on a file data source. You would not have 
an offset yet in commitedOffsets. The offset use case would imply you are 
restarting an existing, checkpointed stream or attempting to read from a 
checkpointed location, right?
   
   Yes, but you'll want to consider the case where end users "changes" 
modifiedDataFilter before restaring the query from the checkpoint. Every 
options can be changed during restart, which makes things very complicated. 
You'll need to consider combinations of `modifiedDataFilter`, `latestFirst`, 
`maxFilesAge`. And also make sure the behavior is intuitive to the end users. 
Please refer #28422 for what I proposed and which review comments it got.
   
   > When using an offset with a Kafka data source, some write has occurred by 
which a written checkpoint exists. With the file data source for files that 
have not yet been read or written, I'm curious how I would apply offset bounds 
in this way. I was thinking I would have to be reading from a data source that 
had used structured streaming with checkpointing in order for the offset to 
exist (committed).
   > 
   > Does this make sense? It seems like once you've written a checkpoint while 
writing to a stream from the readStream dataframe that's loading files, you 
would have clear context to apply offset-based semantics.
   
   Yeah, that should be ideally an "offset" instead of file stream source's own 
metadata, but I'm not sure how far we want to go. Probably I've been thinking 
too far, but there's a real issue on file stream source metadata growing, and I 
support the way it also clearly fixes the issue. I'm worried that once we 
introduce a relevent option without enough consideration, that would bug us via 
respecting backward compatiblity.
   
   > With_startingOffsetByTimestamp_, you have the ability to indicate 
start/end offsets per topic such as TopicA or TopicB. If this concept were 
applied to a file data source with the underlying intent that each file name 
represented a topic, problems begin to emerge. For example, if there are 
multiple files, they would have different file names, different file names may 
imply a new topic.
   
   `underlying intent that each file name represented a topic` I don't think 
so. Do you remember a file is an unit of processing on file source? Spark 
doesn't process the file partially, in other words, Spark never splits the file 
across micro-batches. If you reflect the file source as Kafka data source, an 
record in Kafka data source = a file in File data source.
   
   So the timestamp can represent the offset how Spark has been read so far - 
though there're some issues below:
   
   1) We allowed to read late files - there's new updated file but modified 
time being set to "previous time". I don't think it should be, but current File 
data source allows to do so, so that's arguable topic.
   
   2) There can be lots of files in same timestamp, and Spark has max files to 
process in a batch. That means there's a case only partial files get processed. 
That's why only storing timestamp doesn't work here and we still need to store 
some processed files as well to mitigate.
   
   3) That's technically really odd, but considering eventual consistency like 
S3... Listing operation may not show some of available files. This may be a 
blocker to use timestamp as an offset, but even with eventual consistency we 
don't expect it shows after days. (Do I understand correctly on expectation for 
eventual consistency on S3?) That said, (ts for (timestamp - some margin) + 
processed files within margin) could be used for timestamp instead. We can 
leverage purge timestamp in SeenFileMap, though I think 7 days are too long. 
And latestFirst should be replaced with something else, because it just simply 
defeats maxFilesAge and unable to apply offset-like approach.
   
   Does it make sense?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to