This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c212c9d  [SPARK-28574][CORE] Allow to config different sizes for event 
queues
c212c9d is described below

commit c212c9d9ed7375cd1ea16c118733edd84037ec0d
Author: yunzoud <yun....@databricks.com>
AuthorDate: Fri Aug 2 15:27:33 2019 -0700

    [SPARK-28574][CORE] Allow to config different sizes for event queues
    
    ## What changes were proposed in this pull request?
    Add configuration spark.scheduler.listenerbus.eventqueue.${name}.capacity 
to allow configuration of different event queue size.
    
    ## How was this patch tested?
    Unit test in 
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
    
    Closes #25307 from yunzoud/SPARK-28574.
    
    Authored-by: yunzoud <yun....@databricks.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 .../apache/spark/scheduler/AsyncEventQueue.scala   | 14 +++++++++--
 .../apache/spark/scheduler/LiveListenerBus.scala   |  4 ++++
 .../spark/scheduler/SparkListenerSuite.scala       | 28 ++++++++++++++++++++++
 3 files changed, 44 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala 
b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
index 7cd2b86..11e2c47 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
@@ -46,8 +46,18 @@ private class AsyncEventQueue(
 
   // Cap the capacity of the queue so we get an explicit error (rather than an 
OOM exception) if
   // it's perpetually being added to more quickly than it's being drained.
-  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
-    conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
+  // The capacity can be configured by 
spark.scheduler.listenerbus.eventqueue.${name}.capacity,
+  // if no such conf is specified, use the value specified in
+  // LISTENER_BUS_EVENT_QUEUE_CAPACITY
+  private[scheduler] def capacity: Int = {
+    val queuesize = 
conf.getInt(s"spark.scheduler.listenerbus.eventqueue.${name}.capacity",
+                                conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
+    assert(queuesize > 0, s"capacity for event queue $name must be greater 
than 0, " +
+      s"but $queuesize is configured.")
+    queuesize
+  }
+
+  private val eventQueue = new 
LinkedBlockingQueue[SparkListenerEvent](capacity)
 
   // Keep the event count separately, so that waitUntilEmpty() can be 
implemented properly;
   // this allows that method to return only when the events in the queue have 
been fully
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index d135190..302ebd3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -236,6 +236,10 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
     queues.asScala.map(_.name).toSet
   }
 
+  // For testing only.
+  private[scheduler] def getQueueCapacity(name: String): Option[Int] = {
+    queues.asScala.find(_.name == name).map(_.capacity)
+  }
 }
 
 private[spark] object LiveListenerBus {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index a7869d3..8903e10 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -532,6 +532,34 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     }
   }
 
+  test("event queue size can be configued through spark conf") {
+    // configure the shared queue size to be 1, event log queue size to be 2,
+    // and listner bus event queue size to be 5
+    val conf = new SparkConf(false)
+      .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5)
+      .set(s"spark.scheduler.listenerbus.eventqueue.${SHARED_QUEUE}.capacity", 
"1")
+      
.set(s"spark.scheduler.listenerbus.eventqueue.${EVENT_LOG_QUEUE}.capacity", "2")
+
+    val bus = new LiveListenerBus(conf)
+    val counter1 = new BasicJobCounter()
+    val counter2 = new BasicJobCounter()
+    val counter3 = new BasicJobCounter()
+
+    // add a new shared, status and event queue
+    bus.addToSharedQueue(counter1)
+    bus.addToStatusQueue(counter2)
+    bus.addToEventLogQueue(counter3)
+
+    assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, 
EVENT_LOG_QUEUE))
+    // check the size of shared queue is 1 as configured
+    assert(bus.getQueueCapacity(SHARED_QUEUE) == Some(1))
+    // no specific size of status queue is configured,
+    // it shoud use the LISTENER_BUS_EVENT_QUEUE_CAPACITY
+    assert(bus.getQueueCapacity(APP_STATUS_QUEUE) == Some(5))
+    // check the size of event log queue is 5 as configured
+    assert(bus.getQueueCapacity(EVENT_LOG_QUEUE) == Some(2))
+  }
+
   /**
    * Assert that the given list of numbers has an average that is greater than 
zero.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to