This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fc6657624be9 [SPARK-46698][CORE] Replace Timer with single thread 
scheduled executor for ConsoleProgressBar
fc6657624be9 is described below

commit fc6657624be9caf98c50d3c95d11a7986c6eff0a
Author: beliefer <belie...@163.com>
AuthorDate: Fri Jan 12 03:07:02 2024 -0800

    [SPARK-46698][CORE] Replace Timer with single thread scheduled executor for 
ConsoleProgressBar
    
    ### What changes were proposed in this pull request?
    This PR propose to replace `Timer` with single thread scheduled executor 
for `ConsoleProgressBar`.
    
    ### Why are the changes needed?
    The javadoc recommends `ScheduledThreadPoolExecutor` instead of `Timer`.
    ![屏幕快照 2024-01-12 下午12 47 
57](https://github.com/apache/spark/assets/8486025/4fc5ed61-6bb9-4768-915a-ad919a067d04)
    
    This change based on the following two points.
    **System time sensitivity**
    
    Timer scheduling is based on the absolute time of the operating system and 
is sensitive to the operating system's time. Once the operating system's time 
changes, Timer scheduling is no longer precise.
    The scheduled Thread Pool Executor scheduling is based on relative time and 
is not affected by changes in operating system time.
    
    **Are anomalies captured**
    
    Timer does not capture exceptions thrown by Timer Tasks, and in addition, 
Timer is single threaded. Once a scheduling task encounters an exception, the 
entire thread will terminate and other tasks that need to be scheduled will no 
longer be executed.
    The scheduled Thread Pool Executor implements scheduling functions based on 
a thread pool. After a task throws an exception, other tasks can still execute 
normally.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    
    ### How was this patch tested?
    Manual tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #44701 from beliefer/replace-timer-with-scheduled-executor.
    
    Authored-by: beliefer <belie...@163.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/ui/ConsoleProgressBar.scala  | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala 
b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
index b5473e076946..333b71df33c4 100644
--- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.ui
 
-import java.util.{Timer, TimerTask}
+import java.util.concurrent.{Executors, TimeUnit}
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
@@ -46,12 +48,11 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) 
extends Logging {
   private var lastProgressBar = ""
 
   // Schedule a refresh thread to run periodically
-  private val timer = new Timer("refresh progress", true)
-  timer.schedule(new TimerTask {
-    override def run(): Unit = {
-      refresh()
-    }
-  }, firstDelayMSec, updatePeriodMSec)
+  private val threadFactory =
+    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("refresh 
progress").build()
+  private val timer = Executors.newSingleThreadScheduledExecutor(threadFactory)
+  timer.scheduleAtFixedRate(
+    () => refresh(), firstDelayMSec, updatePeriodMSec, TimeUnit.MILLISECONDS)
 
   /**
    * Try to refresh the progress bar in every cycle
@@ -123,5 +124,5 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) 
extends Logging {
    * Tear down the timer thread.  The timer thread is a GC root, and it 
retains the entire
    * SparkContext if it's not terminated.
    */
-  def stop(): Unit = timer.cancel()
+  def stop(): Unit = timer.shutdown()
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to