Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19211#discussion_r139787945
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala ---
    @@ -0,0 +1,196 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler
    +
    +import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
    +
    +import com.codahale.metrics.{Gauge, Timer}
    +
    +import org.apache.spark.{SparkConf, SparkContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.util.Utils
    +
    +/**
    + * An asynchronous queue for events. All events posted to this queue will 
be delivered to the child
    + * listeners in a separate thread.
    + *
    + * Delivery will only begin when the `start()` method is called. The 
`stop()` method should be
    + * called when no more events need to be delivered.
    + */
    +private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: 
LiveListenerBusMetrics)
    +  extends SparkListenerBus
    +  with Logging {
    +
    +  import AsyncEventQueue._
    +
    +  // Cap the capacity of the queue so we get an explicit error (rather 
than an OOM exception) if
    +  // it's perpetually being added to more quickly than it's being drained.
    +  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
    +    conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
    +
    +  // Keep the event count separately, so that waitUntilEmpty() can be 
implemented properly;
    +  // this allows that method to return only when the events in the queue 
have been fully
    +  // processed (instead of just dequeued).
    +  private val eventCount = new AtomicLong()
    +
    +  /** A counter for dropped events. It will be reset every time we log it. 
*/
    +  private val droppedEventsCounter = new AtomicLong(0L)
    +
    +  /** When `droppedEventsCounter` was logged last time in milliseconds. */
    +  @volatile private var lastReportTimestamp = 0L
    +
    +  private val logDroppedEvent = new AtomicBoolean(false)
    +
    +  private var sc: SparkContext = null
    +
    +  private val started = new AtomicBoolean(false)
    +  private val stopped = new AtomicBoolean(false)
    +
    +  private val droppedEvents = 
metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
    +  private val processingTime = 
metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")
    +
    +  // Remove the queue size gauge first, in case it was created by a 
previous incarnation of
    +  // this queue that was removed from the listener bus.
    +  metrics.metricRegistry.remove(s"queue.$name.size")
    +  metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] {
    +    override def getValue: Int = eventQueue.size()
    +  })
    +
    +  private val dispatchThread = new Thread(s"spark-listener-group-$name") {
    +    setDaemon(true)
    +    override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
    +      dispatch()
    +    }
    +  }
    +
    +  private def dispatch(): Unit = 
LiveListenerBus.withinListenerThread.withValue(true) {
    +    try {
    +      var next: SparkListenerEvent = eventQueue.take()
    +      while (next != POISON_PILL) {
    +        val ctx = processingTime.time()
    +        try {
    +          super.postToAll(next)
    +        } finally {
    +          ctx.stop()
    +        }
    +        eventCount.decrementAndGet()
    +        next = eventQueue.take()
    +      }
    +      eventCount.decrementAndGet()
    +    } catch {
    +      case ie: InterruptedException =>
    +        logInfo(s"Stopping listener queue $name.", ie)
    +    }
    +  }
    +
    +  override protected def getTimer(listener: SparkListenerInterface): 
Option[Timer] = {
    +    
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
    +  }
    +
    +  /**
    +   * Start an asynchronous thread to dispatch events to the underlying 
listeners.
    +   *
    +   * @param sc Used to stop the SparkContext in case the async dispatcher 
fails.
    +   */
    +  private[scheduler] def start(sc: SparkContext): Unit = {
    +    if (started.compareAndSet(false, true)) {
    +      this.sc = sc
    +      dispatchThread.start()
    +    } else {
    +      throw new IllegalStateException(s"$name already started!")
    --- End diff --
    
    seems fine to me -- its only called in LiveListenerBus, where we guarantee 
this is true.  seems better to fail-fast if its messed up


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to