HeartSaVioR opened a new pull request #28422:
URL: https://github.com/apache/spark/pull/28422


   ### What changes were proposed in this pull request?
   
   This PR introduces a new option, `inputRetention` to provide a way to 
specify retention on input files.
   
   `maxAgeMs` plays as `soft` limit (it doesn't apply for some conditions like 
first batch, as well as it's applied relatively to the modified time of input 
files). Given it's not consistently applied across the matrix of 
configurations, Spark cannot purge the entries based on the configuration. 
(Streaming query can change the configurations and be relaunched.)
   
   `inputRetention` plays as `hard` limit - Spark will not include files older 
than the retention as input files, as well as tries to exclude file entries 
older than the retention (it actually happens on compaction, as it's the only 
phase to remove entries).
   
   `inputRetention` is relative to the system timestamp unlike `maxAgeMs`, 
which is easier for end users to reason about. This would require end users to 
correctly set the nodes' timestamp, but in most cases they would do it in other 
reasons as well. Also, this would filter out old files when the query intends 
to replay from input files, hence this should be considered as well.
   
   ### Why are the changes needed?
   
   This has been a pain to deal with metadata growing in both file stream 
source and file stream sink. For file stream source, all processed input files 
are tracked which size is continuously growing, and there's no approach on 
reducing the size/entries. In compact batch, it reads all previous input files 
to write new compact file, which brings major latency. 
   
   The issue is even reported from user group, refer here: 
https://lists.apache.org/thread.html/r897771f5526d10d0b13da9177a6b7d2e378888b22823c839cceea457%40%3Cuser.spark.apache.org%3E
   
   ### Does this PR introduce _any_ user-facing change?
   
   This doesn't bring any change "by default", as the new configuration is 
optional. (The default value is set to unrealistic one making it effectively 
none.)
   
   This adds a new configuration - previous sections described the behavior.
   
   ### How was this patch tested?
   
   New UTs verifying two behaviors per test. 
   
   1) old files should not be included as input files if input retention is 
specified
   2) when compacting, outdated entries should be filtered out 
   
   I've manually tested with above two behaviors as well.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to