[GitHub] [spark] cchighman commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-07-02 Thread GitBox


cchighman commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-652860425


   I appreciate your thoughtful feedback.  I will open the issue as requested 
and remove the streaming references in this PR. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-07-02 Thread GitBox


cchighman commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-652853468


   > @cchighman Thanks for reading through the huge wall of text!
   > 
   > I agree the option can be provided to batch query only, and consider how 
to apply the option to structured streaming later (as we don't have solid idea 
yet). Just to reiterate, I guess we may want to still discuss only lower bound 
vs lower & upper bound, even in batch case.
   > 
   > I also agree the option can be simply applied to the structured streaming 
(only for lower bound) on top of current options. That would play as a 
"filter". As I mentioned in #28422 I already proposed the similar thing, though 
the purpose was for applying "retention" hence dynamically changing instead of 
be static.
   > 
   > That said, the problem is, this approach doesn't help to cut down file 
stream source metadata log, which is another known major issue in file stream 
source. File stream source remembers every file you processed and never drops 
anything. My viewpoint is focused on how we can minimize the entries to 
remember across long query run. We have maxFileAge option but due to some issue 
it doesn't help minimizing the entries. It wouldn't be good if we introduce 
another similar option but leave the major issue behind.
   > 
   > That's why I cannot agree simply to apply the option to SS. It deserves 
another level of consideration.
   
   In terms of cutting down the file stream source metadata log, if we apply 
this option as a filter, it means the files are never available to be added to 
the metadata log.  If we apply this option with offsets, we effectively filter 
out timestamps that occur before a given starting offset and/or between a 
specified offset range, which results in the files not being added to the 
metadata log, correct? 
   
   Considering right now when streaming from a folder path, you have to 
consider all files in the path and those have to be added to the log, both 
approaches seem like they would cut back on the the log size in some shape or 
form.  Is my thinking correct here?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-07-02 Thread GitBox


cchighman commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-652843996


   @HeartSaVioR I still think implementing this at the 
_PartitioningAwareFileIndex_ level makes a lot of sense and bypasses all the 
complexities you mentioned above.  There can be some cases where you begin 
streaming from a file source that could have hundreds of thousands of files and 
many with the same timestamp.  You want to start the process at a specified 
point.  _PartitioningAwareFileIndex_ is processed before any other options for 
structured streaming are considered during _fetchMaxOffset_.  I believe 
_modifiedDateFilter_ is a great way to determine where you want to start 
streaming from and is limited to that use case.  The semantics for offset I 
believe completely apply but I think they would apply to the files that are 
returned from _InMemoryFileIndex_ or _MetadataLogFileIndex_.
   
   This option is very intuitive for the consumer because, for any given path, 
they can explicitly set the population of files that would be considered for 
structured streaming.  `allFiles` in `fetchMaxOffset` would return the starting 
point that would represent the earliest/latest offsets.  Do you see the 
difference?
   
   Granted, I can conceptualize how this could be implemented in 
_FileStreamSource_.  It seems though like the problems you're describing 
shouldn't impact how we would ultimately filter files based on parameters which 
seek to limit more of an unbounded problem we might have currently?  I'm asking 
this just to understand if the complexity is as easy as just adding an extra 
layer of filtering if the options are specified.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-07-02 Thread GitBox


cchighman commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-652807186


   @HeartSaVioR Thank you for your detailed comments.  I've been digging into 
the PR you mentioned along with the associated Kafka Batch sources, etc.  I'm 
leaning towards separating the PRs mainly to reduce complexity in any one PR.  
I have a few questions.
   
   1.) By separating these PRs, the offset-based semantics would just apply to 
structured streaming correct?  Meaning, _modifiedDateFilter_ would just be used 
for the batch case?  The Kafka batch example uses batch reading with 
offset-based semantics but that seems unintuitive for the file data source uses 
case.
   
   2.) _startingOffsetbyTimestamp_ and the associated semantics refer to _the 
start point of timestamp when a query is started_.  In the file stream source 
use case, there seems to be a distinctive difference between the  file 
_modified date_ and when the query itself is started.  From what I'm gathering, 
because an offset represents a file itself, the language in this sense would 
actually relate the the modified timestamp on the file as opposed to when the 
query itself was started?  In effect, the file stream is abstract based on the 
modified time of the file itself?
   
   3.) If a file is modified and exists in SeenFilesMap, but is subsequently 
modified, I'm guessing one file being modified means the entire file will be 
reconsumed as we don't consider partial files, correct?
   
   4.) Is there an ideal way to exclude the streaming use case from 
_PartioningAwareFileIndex_?
   
   Thank you



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-06-28 Thread GitBox


cchighman commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-650900831


   > > I wonder though if structured streaming always implied an event source, 
particularly when streaming from a file source?
   > 
   > Ideally it should be. It's not 100% really for file stream source (as it 
doesn't store offset and allows reading late files), but it's partially 
considered as file stream source "sorts" the files by its timestamp, in both 
directions, forward and backward. Probably you're thinking about "offset" as a 
file offset, while I mean "offset" to mark where we read so far. In this case, 
modified time, with some processed files as there could be new files having 
same modified time (some of files may not be picked up due to max number of 
files per trigger).
   > 
   > > For example, modifiedDateFilter applies specifically to a point in time 
when you begin structured streaming on a file data source. You would not have 
an offset yet in commitedOffsets. The offset use case would imply you are 
restarting an existing, checkpointed stream or attempting to read from a 
checkpointed location, right?
   > 
   > Yes, but you'll want to consider the case where end users "changes" 
modifiedDataFilter before restaring the query from the checkpoint. Every 
options can be changed during restart, which makes things very complicated. 
You'll need to consider combinations of `modifiedDataFilter`, `latestFirst`, 
`maxFilesAge`. And also make sure the behavior is intuitive to the end users. 
Please refer #28422 for what I proposed and which review comments it got.
   > 
   > > When using an offset with a Kafka data source, some write has occurred 
by which a written checkpoint exists. With the file data source for files that 
have not yet been read or written, I'm curious how I would apply offset bounds 
in this way. I was thinking I would have to be reading from a data source that 
had used structured streaming with checkpointing in order for the offset to 
exist (committed).
   > > 
   > > Does this make sense? It seems like once you've written a checkpoint 
while writing to a stream from the readStream dataframe that's loading files, 
you would have clear context to apply offset-based semantics.
   > 
   > Yeah, that should be ideally an "offset" instead of file stream source's 
own metadata, but I'm not sure how far we want to go. Probably I've been 
thinking too far, but there's a real issue on file stream source metadata 
growing, and I support the way it also clearly fixes the issue. I'm worried 
that once we introduce a relevent option without enough consideration, that 
would bug us via respecting backward compatiblity.
   > 
   > > With_startingOffsetByTimestamp_, you have the ability to indicate 
start/end offsets per topic such as TopicA or TopicB. If this concept were 
applied to a file data source with the underlying intent that each file name 
represented a topic, problems begin to emerge. For example, if there are 
multiple files, they would have different file names, different file names may 
imply a new topic.
   > 
   > `underlying intent that each file name represented a topic` - I don't 
think so (If you thought I proposed it then I didn't mean that). Do you 
remember a file is an unit of processing on file source? Spark doesn't process 
the file partially, in other words, Spark never splits the file across 
micro-batches. If you reflect the file source as Kafka data source, an record 
in Kafka data source = a file in File data source.
   > 
   > So the timestamp can represent the offset how Spark has been read so far - 
though there're some issues below:
   > 
   > 1) We allowed to read late files - there's new updated file but modified 
time being set to "previous time". I don't think it should be, but current File 
data source allows to do so, so that's arguable topic.
   > 
   > 2) There can be lots of files in same timestamp, and Spark has max files 
to process in a batch. That means there's a case only partial files get 
processed. That's why only storing timestamp doesn't work here and we still 
need to store some processed files as well to mitigate.
   > 
   > 3) That's technically really odd, but considering eventual consistency 
like S3... Listing operation may not show some of available files. This may be 
a blocker to use timestamp as an offset, but even with eventual consistency we 
don't expect it shows after days. (Do I understand correctly on expectation for 
eventual consistency on S3?) That said, (ts for (timestamp - some margin) + 
processed files within margin) could be used for timestamp instead. We can 
leverage purge timestamp in SeenFileMap, though I think 7 days are too long. 
And latestFirst should be replaced with something else, because it just simply 
defeats maxFilesAge and unable to apply offset-like approach.
   > 
   > Does it make sense?
   
   We were writing responses at the same time.  Just saw this and will read 

[GitHub] [spark] cchighman commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-06-28 Thread GitBox


cchighman commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-650899177


   @HeartSaVioR 
   
   It's in effect no different than a path globular filter except that instead 
instead of my wildcard specifying a file extension, it's a wildcard on other 
metadata, the modified date.  `pathGlobFilter` doesn't use offset-based 
semantics.
   
   What it sounds like, though, is the ability to use a timestamp so that you 
can replay some segment of an event sourced stream that's acting as an 
append-only transaction log.  This would allow much better control of playing 
back streaming data from files.  I believe that would be an awesome feature but 
not what this is trying to achieve.
   
   Here's a clear example of the difference: suppose I'm reading from a folder 
path having files from 2008.  If I were using offset by timestamp, the 
timestamp may refer to a point in time when I had consumed a particular file 
with no context to when the file itself was modified last.  So, this would mean 
if my goal was to only begin streaming with files in the path that began after 
2019, I'd still be consuming older files.
   
   Let me know if my train of thought here is off, I appreciate your patience.
   
   @gengliangwang for comment as the current implementation followed guidance 
for `pathGlobFilter`.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-06-28 Thread GitBox


cchighman commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-650728661


   @HeartSaVioR 
   With_startingOffsetByTimestamp_, you have the ability to indicate start/end 
offsets per topic such as TopicA or TopicB.  If this concept were applied to a 
file data source with the underlying intent that each file name represented a 
topic, problems begin to emerge.  For example, if there are multiple files, 
they would have different file names,  different file names may imply a new 
topic.
   
   This would mean a naming convention would have to be followed if you were 
reading from a file data source by _path_ since that path could have different 
file names...or topics...and you couldn't consider the whole as one stream.
   
   
   Thoughts?
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-06-28 Thread GitBox


cchighman commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-650726701


   > Please take a look at how Kafka data source options apply with both batch 
and streaming query. The semantic of the option should be applied differently.
   > 
   > 
http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries
   > 
   > `startingOffsetsByTimestamp`, `startingOffsets`, 
`endingOffsetsByTimestamp`, `endingOffsets`
   > 
   > If we are not fully sure about how to do it, let's only apply the option 
to batch query, and file an issue to address for the streaming query.
   > 
   > Btw, that said, I prefer to have lower bound + upper bound instead of only 
lower bound, as commented earlier on reviewing.
   
   @HeartSaVioR 
   
   Hmm, I'm wondering if this isn't a different feature.  The goal of this 
feature is to begin reading from a file data source with files that have a 
particular modified date.  It's key value is really with having the ability to 
_start_ at a particular _physical_ location.
   
   With a Kafka data source, you're exclusively dealing with an event stream 
where event sourcing patterns leveraging offsets are at play.  I wonder though 
if structured streaming always implied an event source, particularly when 
streaming from a file source?
   
   For example, _modifiedDateFilter_ applies specifically to a point in time 
when you begin structured streaming on a file data source.  You would not have 
an offset yet in _availableOffsets_.  The offset use case would imply you are 
restarting an existing, checkpointed stream.
   
   When using an offset with a Kafka data source, some write has occurred by 
which a written checkpoint exists.  With the file data source for files that 
have not yet been read or written, I'm curious how I would apply offset bounds 
in this way.  I was thinking I would have to be reading from a data source that 
had used structured streaming with checkpointing in order for the offset to 
exist (committed).  
   
   Does this make sense?  It seems like once you've written a checkpoint while 
writing to a stream from the readStream dataframe that's loading files, you 
would have clear context to apply offset-based semantics.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-06-27 Thread GitBox


cchighman commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-650518946


   @HeartSaVioR The three files which had indentations without changes are now 
removed from this PR after corrections. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cchighman commented on pull request #28841: [SPARK-31962][SQL][SS] Provide option to load files after a specified date when reading from a folder path

2020-06-26 Thread GitBox


cchighman commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-650478832


   > The PR has lots of changed lines which are actually not changed 
(indentation). Indentation is the one of style guides, and they didn't seem to 
violate the guide (that said, these changed lines violate the guide).
   > 
   > https://github.com/databricks/scala-style-guide
   > 
   > Could you please go through the PR changes and revert these changes? 
Thanks in advance!
   
   Very good point.  I will correct.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org