Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-04-12 Thread via GitHub


TakawaAkirayo commented on PR #45367:
URL: https://github.com/apache/spark/pull/45367#issuecomment-2053512982

   @mridulm @beliefer @LuciferYang Thanks for your review and guidance to 
improve the PR :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-04-12 Thread via GitHub


mridulm commented on PR #45367:
URL: https://github.com/apache/spark/pull/45367#issuecomment-2053492389

   I have updated the description, and merged to master.
   Thanks for fixing this @TakawaAkirayo !
   Thanks for the review @beliefer and @LuciferYang :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-04-12 Thread via GitHub


mridulm closed pull request #45367: [SPARK-47253][CORE] Allow LiveEventBus to 
stop without the completely draining of event queue
URL: https://github.com/apache/spark/pull/45367


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-04-12 Thread via GitHub


mridulm commented on PR #45367:
URL: https://github.com/apache/spark/pull/45367#issuecomment-2053490996

   The test failures are unrelated to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-04-12 Thread via GitHub


TakawaAkirayo commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1563578933


##
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##
@@ -176,6 +176,56 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
 assert(drained)
   }
 
+  test("allow bus.stop() to not wait for the event queue to completely drain") 
{
+@volatile var drained = false
+
+// When Listener has started
+val listenerStarted = new Semaphore(0)
+
+// Tells the listener to stop blocking
+val listenerWait = new Semaphore(0)
+
+// Make sure the event drained
+val drainWait = new Semaphore(0)
+
+class BlockingListener extends SparkListener {
+  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+listenerStarted.release()
+listenerWait.acquire()
+drained = true
+drainWait.release()
+  }
+}
+
+val sparkConf = new SparkConf().set(LISTENER_BUS_EXIT_TIMEOUT, 100L)
+val bus = new LiveListenerBus(sparkConf)
+val blockingListener = new BlockingListener
+
+bus.addToSharedQueue(blockingListener)
+bus.start(mockSparkContext, mockMetricsSystem)
+bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+
+listenerStarted.acquire()
+// if reach here, the dispatch thread should be blocked at onJobEnd
+
+// stop the bus now, the queue will waiting for event drain with specified 
timeout
+bus.stop()
+// if reach here, the bus has exited without draining completely,
+// otherwise it will hung here forever.
+
+// the event dispatch thread should remain blocked after the bus has 
stopped.
+// which means the bus exited upon reaching the timeout
+// without all the events being completely drained
+assert(!drained)
+
+// unblock the dispatch thread
+listenerWait.release()
+
+// let the event drained now
+drainWait.acquire()
+assert(drained)

Review Comment:
   @mridulm Yes, this is less relevant to the major change, it's just a check 
that the dispatch thread should unblocked. I removed those two lines now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-04-12 Thread via GitHub


mridulm commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1563152817


##
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##
@@ -176,6 +176,56 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
 assert(drained)
   }
 
+  test("allow bus.stop() to not wait for the event queue to completely drain") 
{
+@volatile var drained = false
+
+// When Listener has started
+val listenerStarted = new Semaphore(0)
+
+// Tells the listener to stop blocking
+val listenerWait = new Semaphore(0)
+
+// Make sure the event drained
+val drainWait = new Semaphore(0)
+
+class BlockingListener extends SparkListener {
+  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+listenerStarted.release()
+listenerWait.acquire()
+drained = true
+drainWait.release()
+  }
+}
+
+val sparkConf = new SparkConf().set(LISTENER_BUS_EXIT_TIMEOUT, 100L)
+val bus = new LiveListenerBus(sparkConf)
+val blockingListener = new BlockingListener
+
+bus.addToSharedQueue(blockingListener)
+bus.start(mockSparkContext, mockMetricsSystem)
+bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+
+listenerStarted.acquire()
+// if reach here, the dispatch thread should be blocked at onJobEnd
+
+// stop the bus now, the queue will waiting for event drain with specified 
timeout
+bus.stop()
+// if reach here, the bus has exited without draining completely,
+// otherwise it will hung here forever.
+
+// the event dispatch thread should remain blocked after the bus has 
stopped.
+// which means the bus exited upon reaching the timeout
+// without all the events being completely drained
+assert(!drained)
+
+// unblock the dispatch thread
+listenerWait.release()
+
+// let the event drained now
+drainWait.acquire()
+assert(drained)

Review Comment:
   Are these two lines testing any behavior ?
   It is a consequence of `onJobEnd` concluding, right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-04-12 Thread via GitHub


mridulm commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1562045723


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1014,6 +1014,16 @@ package object config {
   .timeConf(TimeUnit.NANOSECONDS)
   .createWithDefaultString("1s")
 
+  private[spark] val 
LISTENER_BUS_EVENT_QUEUE_EVENT_DISPATCH_EXIT_WAITING_TIME_ON_STOP =
+
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.eventDispatchExitWaitingTimeOnStop")

Review Comment:
   ```suggestion
   ConfigBuilder("spark.scheduler.listenerbus.exitTimeout")
   ```



##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1014,6 +1014,16 @@ package object config {
   .timeConf(TimeUnit.NANOSECONDS)
   .createWithDefaultString("1s")
 
+  private[spark] val 
LISTENER_BUS_EVENT_QUEUE_EVENT_DISPATCH_EXIT_WAITING_TIME_ON_STOP =

Review Comment:
   ```suggestion
 private[spark] val LISTENER_BUS_EXIT_TIMEOUT =
   ```



##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1014,6 +1014,16 @@ package object config {
   .timeConf(TimeUnit.NANOSECONDS)
   .createWithDefaultString("1s")
 
+  private[spark] val 
LISTENER_BUS_EVENT_QUEUE_EVENT_DISPATCH_EXIT_WAITING_TIME_ON_STOP =
+
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.eventDispatchExitWaitingTimeOnStop")
+  .doc("The time that event queue waits until the dispatch thread exits " +
+"when stop is invoked. " +
+"This is set to 0 by default for graceful shutdown of the event queue, 
" +
+"but allow the user to configure the waiting time.")
+  .version("4.0.0")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefault(0)

Review Comment:
   ```suggestion
 .checkValue(_ >= 0, "Listener bus exit timeout must be non-negative 
duration")
 .createWithDefault(0)
   ```



##
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala:
##
@@ -142,10 +142,17 @@ private class AsyncEventQueue(
   eventCount.incrementAndGet()
   eventQueue.put(POISON_PILL)
 }
-// this thread might be trying to stop itself as part of error handling -- 
we can't join
-// in that case.
-if (Thread.currentThread() != dispatchThread) {
-  dispatchThread.join()
+// 1. This thread might be trying to stop itself as part of error handling 
-- we can't join
+//in that case.
+// 2. If users don't want to wait for the dispatch to end until all events 
are drained,
+//they can control the waiting time by themselves
+//or omit the thread join by set the waiting time to a negative value.
+val waitingTimeMs =
+  
conf.get(LISTENER_BUS_EVENT_QUEUE_EVENT_DISPATCH_EXIT_WAITING_TIME_ON_STOP)
+if (waitingTimeMs >= 0 && Thread.currentThread() != dispatchThread) {
+  // By default, the `waitingTimeMs` is set to 0,
+  // which means it will wait until all events are drained.
+  dispatchThread.join(waitingTimeMs)
 }

Review Comment:
   ```suggestion
   if (Thread.currentThread() != dispatchThread) {
 // By default, the `waitingTimeMs` is set to 0,
 // which means it will wait until all events are drained.
 dispatchThread.join(waitingTimeMs)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-04-11 Thread via GitHub


TakawaAkirayo commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1561947997


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1014,6 +1014,15 @@ package object config {
   .timeConf(TimeUnit.NANOSECONDS)
   .createWithDefaultString("1s")
 
+  private[spark] val 
LISTENER_BUS_EVENT_QUEUE_WAIT_FOR_EVENT_DISPATCH_EXIT_ON_STOP =
+
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.waitForEventDispatchExitOnStop")
+  .doc("Whether wait until the dispatch thread exit when stop invoked. " +
+"This is set to true by default for graceful shutdown of the event 
queue, " +
+"but allow user to configure the behavior if they don't need.")
+  .version("4.0.0")
+  .booleanConf
+  .createWithDefault(true)

Review Comment:
   @mridulm Got it, thanks for your suggestion, I just commit the change, 
please have a check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-04-11 Thread via GitHub


mridulm commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1561552345


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1014,6 +1014,15 @@ package object config {
   .timeConf(TimeUnit.NANOSECONDS)
   .createWithDefaultString("1s")
 
+  private[spark] val 
LISTENER_BUS_EVENT_QUEUE_WAIT_FOR_EVENT_DISPATCH_EXIT_ON_STOP =
+
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.waitForEventDispatchExitOnStop")
+  .doc("Whether wait until the dispatch thread exit when stop invoked. " +
+"This is set to true by default for graceful shutdown of the event 
queue, " +
+"but allow user to configure the behavior if they don't need.")
+  .version("4.0.0")
+  .booleanConf
+  .createWithDefault(true)

Review Comment:
   To clarify, I am suggesting to use int to represent a boolean.
   Rather, use it to set the wait time - where `0`, the default value, will 
mean wait until stopped - which preserves the current behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-04-10 Thread via GitHub


TakawaAkirayo commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1560378130


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1014,6 +1014,15 @@ package object config {
   .timeConf(TimeUnit.NANOSECONDS)
   .createWithDefaultString("1s")
 
+  private[spark] val 
LISTENER_BUS_EVENT_QUEUE_WAIT_FOR_EVENT_DISPATCH_EXIT_ON_STOP =
+
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.waitForEventDispatchExitOnStop")
+  .doc("Whether wait until the dispatch thread exit when stop invoked. " +
+"This is set to true by default for graceful shutdown of the event 
queue, " +
+"but allow user to configure the behavior if they don't need.")
+  .version("4.0.0")
+  .booleanConf
+  .createWithDefault(true)

Review Comment:
   Hi @mridulm I checked other configurations, seems like Boolean type are used 
in the type of conf like enable/disable something, so I would like to follow 
the style. Here is one example:
   `
 private[spark] val LISTENER_BUS_LOG_SLOW_EVENT_ENABLED =
   ConfigBuilder("spark.scheduler.listenerbus.logSlowEvent")
 .internal()
 .doc("When enabled, log the event that takes too much time to process. 
This helps us " +
   "discover the event types that cause performance bottlenecks. The 
time threshold is " +
   "controlled by spark.scheduler.listenerbus.logSlowEvent.threshold.")
 .version("3.0.0")
 .booleanConf
 .createWithDefault(true)
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-04-10 Thread via GitHub


mridulm commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1559712537


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -1014,6 +1014,15 @@ package object config {
   .timeConf(TimeUnit.NANOSECONDS)
   .createWithDefaultString("1s")
 
+  private[spark] val 
LISTENER_BUS_EVENT_QUEUE_WAIT_FOR_EVENT_DISPATCH_EXIT_ON_STOP =
+
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.waitForEventDispatchExitOnStop")
+  .doc("Whether wait until the dispatch thread exit when stop invoked. " +
+"This is set to true by default for graceful shutdown of the event 
queue, " +
+"but allow user to configure the behavior if they don't need.")
+  .version("4.0.0")
+  .booleanConf
+  .createWithDefault(true)

Review Comment:
   Instead of making this a Boolean, I would suggest setting this as an int 
instead - and set default to 0 to preserve existing behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-04-10 Thread via GitHub


TakawaAkirayo commented on PR #45367:
URL: https://github.com/apache/spark/pull/45367#issuecomment-2047258464

   Hi @LuciferYang @beliefer Just a follow up on this, what's the next action 
items? Any other reviewers need be involved?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-03-05 Thread via GitHub


TakawaAkirayo commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1512354669


##
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala:
##
@@ -142,9 +142,11 @@ private class AsyncEventQueue(
   eventCount.incrementAndGet()
   eventQueue.put(POISON_PILL)
 }
-// this thread might be trying to stop itself as part of error handling -- 
we can't join
+// 1.If the user does not want to wait for the dispatch to end,
+// they can omit the thread join.
+// 2.this thread might be trying to stop itself as part of error handling 
-- we can't join
 // in that case.
-if (Thread.currentThread() != dispatchThread) {
+if (waitForEventDispatchExit() && Thread.currentThread() != 
dispatchThread) {

Review Comment:
   @beliefer Sure, already commit, please have a check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-03-05 Thread via GitHub


beliefer commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1512329502


##
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala:
##
@@ -142,9 +142,11 @@ private class AsyncEventQueue(
   eventCount.incrementAndGet()
   eventQueue.put(POISON_PILL)
 }
-// this thread might be trying to stop itself as part of error handling -- 
we can't join
+// 1.If the user does not want to wait for the dispatch to end,
+// they can omit the thread join.
+// 2.this thread might be trying to stop itself as part of error handling 
-- we can't join
 // in that case.
-if (Thread.currentThread() != dispatchThread) {
+if (waitForEventDispatchExit() && Thread.currentThread() != 
dispatchThread) {

Review Comment:
   The name of `LISTENER_BUS_EVENT_QUEUE_WAIT_FOR_EVENT_DISPATCH_EXIT_ON_STOP` 
have better readability.
   It's not worth extract a method used only once too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-03-04 Thread via GitHub


TakawaAkirayo commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1512107482


##
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala:
##
@@ -142,9 +142,11 @@ private class AsyncEventQueue(
   eventCount.incrementAndGet()
   eventQueue.put(POISON_PILL)
 }
-// this thread might be trying to stop itself as part of error handling -- 
we can't join
+// 1.If the user does not want to wait for the dispatch to end,
+// they can omit the thread join.
+// 2.this thread might be trying to stop itself as part of error handling 
-- we can't join
 // in that case.
-if (Thread.currentThread() != dispatchThread) {
+if (waitForEventDispatchExit() && Thread.currentThread() != 
dispatchThread) {

Review Comment:
   @beliefer Thanks for the advise! I think use waitForEventDispatchExit is 
more readable here, it's explaining what it does. Using 
LISTENER_BUS_EVENT_QUEUE_WAIT_FOR_EVENT_DISPATCH_EXIT_ON_STOP, you have to jump 
to the place that define this conf to figure out the meaning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-03-04 Thread via GitHub


TakawaAkirayo commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1512107482


##
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala:
##
@@ -142,9 +142,11 @@ private class AsyncEventQueue(
   eventCount.incrementAndGet()
   eventQueue.put(POISON_PILL)
 }
-// this thread might be trying to stop itself as part of error handling -- 
we can't join
+// 1.If the user does not want to wait for the dispatch to end,
+// they can omit the thread join.
+// 2.this thread might be trying to stop itself as part of error handling 
-- we can't join
 // in that case.
-if (Thread.currentThread() != dispatchThread) {
+if (waitForEventDispatchExit() && Thread.currentThread() != 
dispatchThread) {

Review Comment:
   I think use waitForEventDispatchExit is more readable here, it's explaining 
what it does. Using 
LISTENER_BUS_EVENT_QUEUE_WAIT_FOR_EVENT_DISPATCH_EXIT_ON_STOP, you have to jump 
to the place that define this conf to figure out the meaning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-03-04 Thread via GitHub


beliefer commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1512093558


##
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala:
##
@@ -142,9 +142,11 @@ private class AsyncEventQueue(
   eventCount.incrementAndGet()
   eventQueue.put(POISON_PILL)
 }
-// this thread might be trying to stop itself as part of error handling -- 
we can't join
+// 1.If the user does not want to wait for the dispatch to end,
+// they can omit the thread join.
+// 2.this thread might be trying to stop itself as part of error handling 
-- we can't join
 // in that case.
-if (Thread.currentThread() != dispatchThread) {
+if (waitForEventDispatchExit() && Thread.currentThread() != 
dispatchThread) {

Review Comment:
   Shall we remove `waitForEventDispatchExit()` and inline 
`conf.get(LISTENER_BUS_EVENT_QUEUE_WAIT_FOR_EVENT_DISPATCH_EXIT_ON_STOP)` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue [spark]

2024-03-04 Thread via GitHub


TakawaAkirayo commented on PR #45367:
URL: https://github.com/apache/spark/pull/45367#issuecomment-1975954383

   @LuciferYang I believe it's an improvement rather than a bug. If it's not 
back ported to version 3.5, will this change be included in the next release 
version? I'm okay for now with it because I can change the code in our internal 
spark version and use it. Once this change is available in the public version, 
I can switch to using the public Spark binary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org