close dictionary server on application end

Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/43e06b65
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/43e06b65
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/43e06b65

Branch: refs/heads/branch-1.1
Commit: 43e06b65a7fbeaf35dced6ece4f8014015960ba2
Parents: 50da524
Author: kunal642 <kunal.kap...@knoldus.in>
Authored: Sun May 21 23:12:59 2017 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Jun 15 12:58:11 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/dictionary/server/DictionaryServer.java   | 4 +---
 .../spark/sql/execution/command/carbonTableSchema.scala       | 6 ++++++
 .../spark/sql/execution/command/carbonTableSchema.scala       | 7 +++++++
 3 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/43e06b65/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
 
b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
index f86cd6b..84f2a0d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
@@ -135,11 +135,9 @@ public class DictionaryServer {
    * @throws Exception
    */
   public void shutdown() throws Exception {
+    LOGGER.info("Shutting down dictionary server");
     worker.shutdownGracefully();
     boss.shutdownGracefully();
-    // Wait until all threads are terminated.
-    boss.terminationFuture().sync();
-    worker.terminationFuture().sync();
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/43e06b65/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 494beff..7258511 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.commons.lang3.StringUtils
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -525,6 +526,11 @@ case class LoadTable(
             val dictionaryServer = DictionaryServer
               .getInstance(dictionaryServerPort.toInt)
             carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+            sqlContext.sparkContext.addSparkListener(new SparkListener() {
+              override def onApplicationEnd(applicationEnd: 
SparkListenerApplicationEnd) {
+                dictionaryServer.shutdown()
+              }
+            })
             Some(dictionaryServer)
           } else {
             None

http://git-wip-us.apache.org/repos/asf/carbondata/blob/43e06b65/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 09824d8..5dd6832 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.commons.lang3.StringUtils
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -533,10 +534,16 @@ case class LoadTable(
             val dictionaryServer = DictionaryServer
               .getInstance(dictionaryServerPort.toInt)
             carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+            sparkSession.sparkContext.addSparkListener(new SparkListener() {
+              override def onApplicationEnd(applicationEnd: 
SparkListenerApplicationEnd) {
+                dictionaryServer.shutdown()
+              }
+            })
             Some(dictionaryServer)
           } else {
             None
           }
+
           CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,

Reply via email to