Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 6493892af -> 25d9adb9c


[CARBONDATA-2363][branch-1.3] Add CarbonStreamingQueryListener to SparkSession

This closes #2189


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/25d9adb9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/25d9adb9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/25d9adb9

Branch: refs/heads/branch-1.3
Commit: 25d9adb9cc49c11f4085ca8fb729e47d9469a390
Parents: 6493892
Author: QiangCai <qiang...@qq.com>
Authored: Thu Apr 19 15:49:31 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Fri Apr 20 00:24:01 2018 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/CarbonSource.scala | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/25d9adb9/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 7d70534..e9adb39 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.language.implicitConversions
 
 import org.apache.commons.lang.StringUtils
@@ -43,7 +44,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, 
CarbonUtil}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CarbonScalaUtil
-import org.apache.carbondata.streaming.{CarbonStreamException, 
StreamSinkFactory}
+import org.apache.carbondata.streaming.{CarbonStreamException, 
CarbonStreamingQueryListener, StreamSinkFactory}
 
 /**
  * Carbon relation provider compliant to data source api.
@@ -241,6 +242,19 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
                                       s"${carbonTable.getTableName} is not a 
streaming table")
     }
 
+    // CarbonSession has added CarbonStreamingQueryListener during the 
initialization.
+    // But other SparkSessions didn't, so here will add the listener once.
+    if (!"CarbonSession".equals(sparkSession.getClass.getSimpleName)) {
+      if (CarbonSource.listenerAdded.get(sparkSession.hashCode()).isEmpty) {
+        synchronized {
+          if (CarbonSource.listenerAdded.get(sparkSession.hashCode()).isEmpty) 
{
+            sparkSession.streams.addListener(new 
CarbonStreamingQueryListener(sparkSession))
+            CarbonSource.listenerAdded.put(sparkSession.hashCode(), true)
+          }
+        }
+      }
+    }
+
     // create sink
     StreamSinkFactory.createStreamTableSink(
       sqlContext.sparkSession,
@@ -253,6 +267,8 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
 
 object CarbonSource {
 
+  lazy val listenerAdded = new mutable.HashMap[Int, Boolean]()
+
   def createTableInfoFromParams(
       parameters: Map[String, String],
       dataSchema: StructType,

Reply via email to