zsxwing commented on a change in pull request #28363:
URL: https://github.com/apache/spark/pull/28363#discussion_r532122181



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
##########
@@ -96,6 +97,19 @@ class FileStreamSinkLog(
   require(defaultCompactInterval > 0,
     s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was 
$defaultCompactInterval) " +
       "to a positive value.")
+
+  private val ttlMs = outputTimeToLiveMs.getOrElse(Long.MaxValue)
+
+  override def shouldRetain(log: SinkFileStatus): Boolean = {
+    val curTime = System.currentTimeMillis()

Review comment:
       It would be great to avoid calling `System.currentTimeMillis()` if the 
option is not set, considering we need to call this method once (a JNI call) 
for each log entry.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
##########
@@ -136,8 +136,9 @@ class FileStreamSink(
   private val basePath = new Path(path)
   private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), 
basePath,
     sparkSession.sessionState.conf)
-  private val fileLog =
-    new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, 
logPath.toString)
+  private val outputTimeToLive = options.get("outputRetentionMs").map(_.toLong)

Review comment:
       Can we use `Utils.timeStringAsMs` to parse this? Users likely set this 
to multiple days and asking them to calculate milliseconds is not user friendly.
   
   Nit: regarding the option name, can we call it `retention`? It's obvious 
that the query is outputting files, so `output` sounds redundant to me.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
##########
@@ -136,8 +136,9 @@ class FileStreamSink(
   private val basePath = new Path(path)
   private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), 
basePath,
     sparkSession.sessionState.conf)
-  private val fileLog =
-    new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, 
logPath.toString)
+  private val outputTimeToLive = options.get("outputRetentionMs").map(_.toLong)

Review comment:
       Nit: any reason to use a different name `outputTimeToLive`? Using the 
same name as the option would help other people read codes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to