Repository: carbondata Updated Branches: refs/heads/branch-1.3 6493892af -> 25d9adb9c
[CARBONDATA-2363][branch-1.3] Add CarbonStreamingQueryListener to SparkSession This closes #2189 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/25d9adb9 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/25d9adb9 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/25d9adb9 Branch: refs/heads/branch-1.3 Commit: 25d9adb9cc49c11f4085ca8fb729e47d9469a390 Parents: 6493892 Author: QiangCai <qiang...@qq.com> Authored: Thu Apr 19 15:49:31 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Fri Apr 20 00:24:01 2018 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/CarbonSource.scala | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/25d9adb9/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 7d70534..e9adb39 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.language.implicitConversions import org.apache.commons.lang.StringUtils @@ -43,7 +44,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CarbonScalaUtil -import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory} +import org.apache.carbondata.streaming.{CarbonStreamException, CarbonStreamingQueryListener, StreamSinkFactory} /** * Carbon relation provider compliant to data source api. @@ -241,6 +242,19 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider s"${carbonTable.getTableName} is not a streaming table") } + // CarbonSession has added CarbonStreamingQueryListener during the initialization. + // But other SparkSessions didn't, so here will add the listener once. + if (!"CarbonSession".equals(sparkSession.getClass.getSimpleName)) { + if (CarbonSource.listenerAdded.get(sparkSession.hashCode()).isEmpty) { + synchronized { + if (CarbonSource.listenerAdded.get(sparkSession.hashCode()).isEmpty) { + sparkSession.streams.addListener(new CarbonStreamingQueryListener(sparkSession)) + CarbonSource.listenerAdded.put(sparkSession.hashCode(), true) + } + } + } + } + // create sink StreamSinkFactory.createStreamTableSink( sqlContext.sparkSession, @@ -253,6 +267,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider object CarbonSource { + lazy val listenerAdded = new mutable.HashMap[Int, Boolean]() + def createTableInfoFromParams( parameters: Map[String, String], dataSchema: StructType,