Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6380#discussion_r34516555
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -538,6 +540,54 @@ class StreamingContext private[streaming] (
         scheduler.listenerBus.addListener(streamingListener)
       }
     
    +  /**
    +   * Registers streamingListeners specified in 
spark.streaming.extraListeners
    +   */
    +  private def setupStreamingListeners(): Unit = {
    +    // Use reflection to instantiate listeners specified via 
`spark.extraListeners`
    +    try {
    +      val listenerClassNames: Seq[String] =
    +        conf.get("spark.streaming.extraListeners", 
"").split(',').map(_.trim).filter(_ != "")
    +      for (className <- listenerClassNames) {
    +        // Use reflection to find the right constructor
    +        val constructors = {
    +          val listenerClass = 
org.apache.spark.util.Utils.classForName(className)
    +          listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ 
<: StreamingListener]]]
    +        }
    +        val constructorTakingSparkConf = constructors.find { c =>
    +          c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
    +        }
    +        lazy val zeroArgumentConstructor = constructors.find { c =>
    +          c.getParameterTypes.isEmpty
    +        }
    +        val listener: StreamingListener = {
    +          if (constructorTakingSparkConf.isDefined) {
    +            constructorTakingSparkConf.get.newInstance(conf)
    +          } else if (zeroArgumentConstructor.isDefined) {
    +            zeroArgumentConstructor.get.newInstance()
    +          } else {
    +            throw new SparkException(
    +              s"$className did not have a zero-argument constructor or a" +
    +                " single-argument constructor that accepts SparkConf. 
Note: if the class is" +
    +                " defined inside of another Scala class, then its 
constructors may accept an" +
    +                " implicit parameter that references the enclosing class; 
in this case, you must" +
    +                " define the listener as a top-level class in order to 
prevent this extra" +
    +                " parameter from breaking Spark's ability to find a valid 
constructor.")
    +          }
    +        }
    +        addStreamingListener(listener)
    +        logInfo(s"Registered StreamingListener $className")
    +      }
    +    } catch {
    +      case e: Exception =>
    +        try {
    --- End diff --
    
    This `stop()` is not needed as the StreamingContext has not been started 
yet at this point. And in that case the whole `try ... catch` is not needed, 
and the whole things can be simplified to a single `throw new 
SparkException("Exception when registering StreamingListener: $className did 
not have a zero-argument constructor or a ..."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to