gaborgsomogyi commented on a change in pull request #26502: [SPARK-29876][SS] 
Delete/archive file source completed files in separate thread
URL: https://github.com/apache/spark/pull/26502#discussion_r346451378
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##########
 @@ -342,7 +344,14 @@ object FileStreamSource {
     def size: Int = map.size()
   }
 
-  private[sql] trait FileStreamSourceCleaner {
+  private[sql] abstract class FileStreamSourceCleaner {
+    protected val cleanThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+      "file-source-cleaner-threadpool",
+      SQLConf.get.getConf(SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS)
+    )
+
+    def stop(): Unit = cleanThreadPool.shutdown()
 
 Review comment:
   +1 doing more best-effort. I think `awaitTermination` can be added with 
timeout but telling which file was not deleted/archived would require more 
effort than what it worth. At the moment don't see how a printout of 
`List<Runnable>` would help (not to mentioned not found such pattern in Spark).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to