IGNITE-9180: IgniteSparkSession Should Copy State on cloneSession - Fixes #4531.

Signed-off-by: Nikolay Izhikov <nizhi...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18524159
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18524159
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18524159

Branch: refs/heads/ignite-9273
Commit: 18524159d11d9c74606983eb3cc0d258017eedfd
Parents: b6f67f5
Author: Stuart Macdonald <29892836+stuartm...@users.noreply.github.com>
Authored: Thu Aug 23 08:19:26 2018 +0300
Committer: Nikolay Izhikov <nizhi...@apache.org>
Committed: Thu Aug 23 08:20:54 2018 +0300

----------------------------------------------------------------------
 .../spark/sql/ignite/IgniteSparkSession.scala   | 31 ++++++--
 .../ignite/spark/IgniteDataFrameSuite.scala     |  4 +-
 .../sql/ignite/IgniteSparkSessionSpec.scala     | 79 ++++++++++++++++++++
 3 files changed, 106 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/18524159/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
 
b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
index 1cc63ed..9bf6017 100644
--- 
a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
+++ 
b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
@@ -43,9 +43,16 @@ import org.apache.spark.util.Utils
 /**
   * Implementation of Spark Session for Ignite.
   */
-class IgniteSparkSession private(ic: IgniteContext, proxy: SparkSession) 
extends SparkSession(proxy.sparkContext) {
+class IgniteSparkSession private(
+    ic: IgniteContext,
+    proxy: SparkSession,
+    existingSharedState: Option[SharedState],
+    parentSessionState: Option[SessionState]) extends 
SparkSession(proxy.sparkContext) {
     self ⇒
 
+    private def this(ic: IgniteContext, proxy: SparkSession) =
+        this(ic, proxy, None, None)
+
     private def this(proxy: SparkSession) =
         this(new IgniteContext(proxy.sparkContext, IgnitionEx.DFLT_CFG), proxy)
 
@@ -63,16 +70,20 @@ class IgniteSparkSession private(ic: IgniteContext, proxy: 
SparkSession) extends
 
     /** @inheritdoc */
     @transient override lazy val sharedState: SharedState =
-        new IgniteSharedState(ic, sparkContext)
+        existingSharedState.getOrElse(new IgniteSharedState(ic, sparkContext))
 
     /** @inheritdoc */
     @transient override lazy val sessionState: SessionState = {
-        val sessionState = new SessionStateBuilder(self, None).build()
+        parentSessionState
+            .map(_.clone(this))
+            .getOrElse {
+                val sessionState = new SessionStateBuilder(self, None).build()
 
-        sessionState.experimentalMethods.extraOptimizations =
-            sessionState.experimentalMethods.extraOptimizations :+ 
IgniteOptimization
+                sessionState.experimentalMethods.extraOptimizations =
+                    sessionState.experimentalMethods.extraOptimizations :+ 
IgniteOptimization
 
-        sessionState
+                sessionState
+          }
     }
 
     /** @inheritdoc */
@@ -172,7 +183,13 @@ class IgniteSparkSession private(ic: IgniteContext, proxy: 
SparkSession) extends
     }
 
     /** @inheritdoc */
-    override private[sql] def cloneSession() = new IgniteSparkSession(ic, 
proxy.cloneSession())
+    override private[sql] def cloneSession(): IgniteSparkSession = {
+        val session = new IgniteSparkSession(ic, proxy.cloneSession(), 
Some(sharedState), Some(sessionState))
+
+        session.sessionState // Force copy of SessionState.
+
+        session
+    }
 
     /** @inheritdoc */
     @transient override private[sql] val extensions =

http://git-wip-us.apache.org/repos/asf/ignite/blob/18524159/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
index e1bb7ff..728cde6 100644
--- 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
+++ 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spark
 
+import org.apache.spark.sql.ignite.IgniteSparkSessionSpec
 import org.scalatest.Suites
 
 /**
@@ -36,5 +37,6 @@ class IgniteDataFrameSuite extends Suites (
     new IgniteOptimizationSystemFuncSpec,
     new IgniteOptimizationJoinSpec,
     new IgniteOptimizationDateFuncSpec,
-    new IgniteOptimizationDisableEnableSpec
+    new IgniteOptimizationDisableEnableSpec,
+    new IgniteSparkSessionSpec
 )

http://git-wip-us.apache.org/repos/asf/ignite/blob/18524159/modules/spark/src/test/scala/org/apache/spark/sql/ignite/IgniteSparkSessionSpec.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/test/scala/org/apache/spark/sql/ignite/IgniteSparkSessionSpec.scala
 
b/modules/spark/src/test/scala/org/apache/spark/sql/ignite/IgniteSparkSessionSpec.scala
new file mode 100644
index 0000000..fa9d2ee
--- /dev/null
+++ 
b/modules/spark/src/test/scala/org/apache/spark/sql/ignite/IgniteSparkSessionSpec.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.ignite
+
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
+import org.apache.ignite.spark.AbstractDataFrameSpec
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, 
TEST_CONFIG_FILE, enclose}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+/**
+  * Tests to check Spark Session implementation.
+  */
+@RunWith(classOf[JUnitRunner])
+class IgniteSparkSessionSpec extends AbstractDataFrameSpec {
+    var igniteSession: IgniteSparkSession = _
+
+    describe("Ignite Spark Session Implementation") {
+        it("should keep session state after session clone") {
+            val dfProvider = (s: IgniteSparkSession) => {
+                
s.read.json(resolveIgnitePath("modules/spark/src/test/resources/cities.json").getAbsolutePath)
+                    .filter("name = 'Denver'")
+            }
+
+            var df = dfProvider(igniteSession).cache()
+
+            val cachedData = 
igniteSession.sharedState.cacheManager.lookupCachedData(df)
+
+            cachedData shouldBe defined
+
+            val otherSession = igniteSession.cloneSession()
+
+            df = dfProvider(otherSession)
+
+            val otherCachedData = 
otherSession.sharedState.cacheManager.lookupCachedData(df)
+
+            otherCachedData shouldBe defined
+
+            cachedData shouldEqual otherCachedData
+        }
+    }
+
+    override protected def beforeAll(): Unit = {
+        super.beforeAll()
+
+        createCityTable(client, DEFAULT_CACHE)
+
+        val configProvider = enclose(null)(_ ⇒ () ⇒ {
+            val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+            cfg.setClientMode(true)
+
+            cfg.setIgniteInstanceName("client-2")
+
+            cfg
+        })
+
+        igniteSession = IgniteSparkSession.builder()
+            .config(spark.sparkContext.getConf)
+            .igniteConfigProvider(configProvider)
+            .getOrCreate()
+    }
+}

Reply via email to