cchighman commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-650900831


   > > I wonder though if structured streaming always implied an event source, 
particularly when streaming from a file source?
   > 
   > Ideally it should be. It's not 100% really for file stream source (as it 
doesn't store offset and allows reading late files), but it's partially 
considered as file stream source "sorts" the files by its timestamp, in both 
directions, forward and backward. Probably you're thinking about "offset" as a 
file offset, while I mean "offset" to mark where we read so far. In this case, 
modified time, with some processed files as there could be new files having 
same modified time (some of files may not be picked up due to max number of 
files per trigger).
   > 
   > > For example, modifiedDateFilter applies specifically to a point in time 
when you begin structured streaming on a file data source. You would not have 
an offset yet in commitedOffsets. The offset use case would imply you are 
restarting an existing, checkpointed stream or attempting to read from a 
checkpointed location, right?
   > 
   > Yes, but you'll want to consider the case where end users "changes" 
modifiedDataFilter before restaring the query from the checkpoint. Every 
options can be changed during restart, which makes things very complicated. 
You'll need to consider combinations of `modifiedDataFilter`, `latestFirst`, 
`maxFilesAge`. And also make sure the behavior is intuitive to the end users. 
Please refer #28422 for what I proposed and which review comments it got.
   > 
   > > When using an offset with a Kafka data source, some write has occurred 
by which a written checkpoint exists. With the file data source for files that 
have not yet been read or written, I'm curious how I would apply offset bounds 
in this way. I was thinking I would have to be reading from a data source that 
had used structured streaming with checkpointing in order for the offset to 
exist (committed).
   > > 
   > > Does this make sense? It seems like once you've written a checkpoint 
while writing to a stream from the readStream dataframe that's loading files, 
you would have clear context to apply offset-based semantics.
   > 
   > Yeah, that should be ideally an "offset" instead of file stream source's 
own metadata, but I'm not sure how far we want to go. Probably I've been 
thinking too far, but there's a real issue on file stream source metadata 
growing, and I support the way it also clearly fixes the issue. I'm worried 
that once we introduce a relevent option without enough consideration, that 
would bug us via respecting backward compatiblity.
   > 
   > > With_startingOffsetByTimestamp_, you have the ability to indicate 
start/end offsets per topic such as TopicA or TopicB. If this concept were 
applied to a file data source with the underlying intent that each file name 
represented a topic, problems begin to emerge. For example, if there are 
multiple files, they would have different file names, different file names may 
imply a new topic.
   > 
   > `underlying intent that each file name represented a topic` - I don't 
think so (If you thought I proposed it then I didn't mean that). Do you 
remember a file is an unit of processing on file source? Spark doesn't process 
the file partially, in other words, Spark never splits the file across 
micro-batches. If you reflect the file source as Kafka data source, an record 
in Kafka data source = a file in File data source.
   > 
   > So the timestamp can represent the offset how Spark has been read so far - 
though there're some issues below:
   > 
   > 1) We allowed to read late files - there's new updated file but modified 
time being set to "previous time". I don't think it should be, but current File 
data source allows to do so, so that's arguable topic.
   > 
   > 2) There can be lots of files in same timestamp, and Spark has max files 
to process in a batch. That means there's a case only partial files get 
processed. That's why only storing timestamp doesn't work here and we still 
need to store some processed files as well to mitigate.
   > 
   > 3) That's technically really odd, but considering eventual consistency 
like S3... Listing operation may not show some of available files. This may be 
a blocker to use timestamp as an offset, but even with eventual consistency we 
don't expect it shows after days. (Do I understand correctly on expectation for 
eventual consistency on S3?) That said, (ts for (timestamp - some margin) + 
processed files within margin) could be used for timestamp instead. We can 
leverage purge timestamp in SeenFileMap, though I think 7 days are too long. 
And latestFirst should be replaced with something else, because it just simply 
defeats maxFilesAge and unable to apply offset-like approach.
   > 
   > Does it make sense?
   
   We were writing responses at the same time.  Just saw this and will read 
through


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to