close dictionary server on application end
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/43e06b65 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/43e06b65 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/43e06b65 Branch: refs/heads/branch-1.1 Commit: 43e06b65a7fbeaf35dced6ece4f8014015960ba2 Parents: 50da524 Author: kunal642 <kunal.kap...@knoldus.in> Authored: Sun May 21 23:12:59 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Jun 15 12:58:11 2017 +0530 ---------------------------------------------------------------------- .../carbondata/core/dictionary/server/DictionaryServer.java | 4 +--- .../spark/sql/execution/command/carbonTableSchema.scala | 6 ++++++ .../spark/sql/execution/command/carbonTableSchema.scala | 7 +++++++ 3 files changed, 14 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/43e06b65/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java index f86cd6b..84f2a0d 100644 --- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java +++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java @@ -135,11 +135,9 @@ public class DictionaryServer { * @throws Exception */ public void shutdown() throws Exception { + LOGGER.info("Shutting down dictionary server"); worker.shutdownGracefully(); boss.shutdownGracefully(); - // Wait until all threads are terminated. - boss.terminationFuture().sync(); - worker.terminationFuture().sync(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/43e06b65/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 494beff..7258511 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.language.implicitConversions import org.apache.commons.lang3.StringUtils +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -525,6 +526,11 @@ case class LoadTable( val dictionaryServer = DictionaryServer .getInstance(dictionaryServerPort.toInt) carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort) + sqlContext.sparkContext.addSparkListener(new SparkListener() { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { + dictionaryServer.shutdown() + } + }) Some(dictionaryServer) } else { None http://git-wip-us.apache.org/repos/asf/carbondata/blob/43e06b65/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 09824d8..5dd6832 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.language.implicitConversions import org.apache.commons.lang3.StringUtils +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -533,10 +534,16 @@ case class LoadTable( val dictionaryServer = DictionaryServer .getInstance(dictionaryServerPort.toInt) carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort) + sparkSession.sparkContext.addSparkListener(new SparkListener() { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { + dictionaryServer.shutdown() + } + }) Some(dictionaryServer) } else { None } + CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, carbonLoadModel, relation.tableMeta.storePath,