[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21504


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r194950151
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.language.reflectiveCalls
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.streaming.StreamingQueryListener._
+
+
+class StreamingQueryListenersConfSuite extends StreamTest with 
BeforeAndAfter {
+
+  import testImplicits._
+
+
+  override protected def sparkConf: SparkConf =
+super.sparkConf.set("spark.sql.streamingQueryListeners",
--- End diff --

seems a mistake here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-12 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r194817758
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala 
---
@@ -96,6 +96,14 @@ object StaticSQLConf {
 .toSequence
 .createOptional
 
+  val STREAMING_QUERY_LISTENERS = 
buildStaticConf("spark.sql.streamingQueryListeners")
--- End diff --

ok makes sense. renamed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r194688537
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala 
---
@@ -96,6 +96,14 @@ object StaticSQLConf {
 .toSequence
 .createOptional
 
+  val STREAMING_QUERY_LISTENERS = 
buildStaticConf("spark.sql.streamingQueryListeners")
--- End diff --

maybe -> `spark.sql.streaming.streamingQueryListeners ` for consistency.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-08 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r194101270
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  try {
+sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach 
{ classNames =>
+  Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+sparkSession.sparkContext.conf).foreach(listener => {
+addListener(listener)
+logInfo(s"Registered listener ${listener.getClass.getName}")
+  })
+}
+  } catch {
+case e: Exception =>
+  throw new SparkException(s"Exception when registering 
StreamingQueryListener", e)
--- End diff --

Addressed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-08 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r194100709
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  try {
+sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach 
{ classNames =>
+  Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+sparkSession.sparkContext.conf).foreach(listener => {
+addListener(listener)
+logInfo(s"Registered listener ${listener.getClass.getName}")
--- End diff --

Since its only once and provides information to user I guess info is fine. 
Similar pattern here 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L2359


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r193945288
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  try {
+sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach 
{ classNames =>
+  Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+sparkSession.sparkContext.conf).foreach(listener => {
+addListener(listener)
+logInfo(s"Registered listener ${listener.getClass.getName}")
--- End diff --

Either debug or info is fine for me, since it would add just couple of log 
lines only once.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r193936818
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  try {
+sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach 
{ classNames =>
+  Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+sparkSession.sparkContext.conf).foreach(listener => {
+addListener(listener)
+logInfo(s"Registered listener ${listener.getClass.getName}")
+  })
+}
+  } catch {
+case e: Exception =>
+  throw new SparkException(s"Exception when registering 
StreamingQueryListener", e)
--- End diff --

nit: `s` seems not needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r193936698
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  try {
+sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach 
{ classNames =>
+  Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+sparkSession.sparkContext.conf).foreach(listener => {
+addListener(listener)
+logInfo(s"Registered listener ${listener.getClass.getName}")
--- End diff --

I would do this at debug level .. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-07 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r193923588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +56,11 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { 
classNames =>
+Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+  sparkSession.sparkContext.conf).foreach(addListener)
+  }
+
--- End diff --

Good point. Addressed, please check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-07 Thread merlintang
Github user merlintang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r193911087
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +56,11 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { 
classNames =>
+Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+  sparkSession.sparkContext.conf).foreach(addListener)
+  }
+
--- End diff --

two comments here: 
1. we need to log the registration here 
2. we need to use try catch for this, it is possible that register fail. 
this would break the job. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org