zsxwing commented on a change in pull request #28363:
URL: https://github.com/apache/spark/pull/28363#discussion_r532122181
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
##########
@@ -96,6 +97,19 @@ class FileStreamSinkLog(
require(defaultCompactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was
$defaultCompactInterval) " +
"to a positive value.")
+
+ private val ttlMs = outputTimeToLiveMs.getOrElse(Long.MaxValue)
+
+ override def shouldRetain(log: SinkFileStatus): Boolean = {
+ val curTime = System.currentTimeMillis()
Review comment:
It would be great to avoid calling `System.currentTimeMillis()` if the
option is not set, considering we need to call this method once (a JNI call)
for each log entry.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
##########
@@ -136,8 +136,9 @@ class FileStreamSink(
private val basePath = new Path(path)
private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf),
basePath,
sparkSession.sessionState.conf)
- private val fileLog =
- new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession,
logPath.toString)
+ private val outputTimeToLive = options.get("outputRetentionMs").map(_.toLong)
Review comment:
Can we use `Utils.timeStringAsMs` to parse this? Users likely set this
to multiple days and asking them to calculate milliseconds is not user friendly.
Nit: regarding the option name, can we call it `retention`? It's obvious
that the query is outputting files, so `output` sounds redundant to me.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
##########
@@ -136,8 +136,9 @@ class FileStreamSink(
private val basePath = new Path(path)
private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf),
basePath,
sparkSession.sessionState.conf)
- private val fileLog =
- new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession,
logPath.toString)
+ private val outputTimeToLive = options.get("outputRetentionMs").map(_.toLong)
Review comment:
Nit: any reason to use a different name `outputTimeToLive`? Using the
same name as the option would help other people read codes.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]