IGNITE-9180: IgniteSparkSession Should Copy State on cloneSession - Fixes #4531.
Signed-off-by: Nikolay Izhikov <nizhi...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18524159 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18524159 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18524159 Branch: refs/heads/ignite-9273 Commit: 18524159d11d9c74606983eb3cc0d258017eedfd Parents: b6f67f5 Author: Stuart Macdonald <29892836+stuartm...@users.noreply.github.com> Authored: Thu Aug 23 08:19:26 2018 +0300 Committer: Nikolay Izhikov <nizhi...@apache.org> Committed: Thu Aug 23 08:20:54 2018 +0300 ---------------------------------------------------------------------- .../spark/sql/ignite/IgniteSparkSession.scala | 31 ++++++-- .../ignite/spark/IgniteDataFrameSuite.scala | 4 +- .../sql/ignite/IgniteSparkSessionSpec.scala | 79 ++++++++++++++++++++ 3 files changed, 106 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/18524159/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala index 1cc63ed..9bf6017 100644 --- a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala +++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala @@ -43,9 +43,16 @@ import org.apache.spark.util.Utils /** * Implementation of Spark Session for Ignite. */ -class IgniteSparkSession private(ic: IgniteContext, proxy: SparkSession) extends SparkSession(proxy.sparkContext) { +class IgniteSparkSession private( + ic: IgniteContext, + proxy: SparkSession, + existingSharedState: Option[SharedState], + parentSessionState: Option[SessionState]) extends SparkSession(proxy.sparkContext) { self â + private def this(ic: IgniteContext, proxy: SparkSession) = + this(ic, proxy, None, None) + private def this(proxy: SparkSession) = this(new IgniteContext(proxy.sparkContext, IgnitionEx.DFLT_CFG), proxy) @@ -63,16 +70,20 @@ class IgniteSparkSession private(ic: IgniteContext, proxy: SparkSession) extends /** @inheritdoc */ @transient override lazy val sharedState: SharedState = - new IgniteSharedState(ic, sparkContext) + existingSharedState.getOrElse(new IgniteSharedState(ic, sparkContext)) /** @inheritdoc */ @transient override lazy val sessionState: SessionState = { - val sessionState = new SessionStateBuilder(self, None).build() + parentSessionState + .map(_.clone(this)) + .getOrElse { + val sessionState = new SessionStateBuilder(self, None).build() - sessionState.experimentalMethods.extraOptimizations = - sessionState.experimentalMethods.extraOptimizations :+ IgniteOptimization + sessionState.experimentalMethods.extraOptimizations = + sessionState.experimentalMethods.extraOptimizations :+ IgniteOptimization - sessionState + sessionState + } } /** @inheritdoc */ @@ -172,7 +183,13 @@ class IgniteSparkSession private(ic: IgniteContext, proxy: SparkSession) extends } /** @inheritdoc */ - override private[sql] def cloneSession() = new IgniteSparkSession(ic, proxy.cloneSession()) + override private[sql] def cloneSession(): IgniteSparkSession = { + val session = new IgniteSparkSession(ic, proxy.cloneSession(), Some(sharedState), Some(sessionState)) + + session.sessionState // Force copy of SessionState. + + session + } /** @inheritdoc */ @transient override private[sql] val extensions = http://git-wip-us.apache.org/repos/asf/ignite/blob/18524159/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala index e1bb7ff..728cde6 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala @@ -17,6 +17,7 @@ package org.apache.ignite.spark +import org.apache.spark.sql.ignite.IgniteSparkSessionSpec import org.scalatest.Suites /** @@ -36,5 +37,6 @@ class IgniteDataFrameSuite extends Suites ( new IgniteOptimizationSystemFuncSpec, new IgniteOptimizationJoinSpec, new IgniteOptimizationDateFuncSpec, - new IgniteOptimizationDisableEnableSpec + new IgniteOptimizationDisableEnableSpec, + new IgniteSparkSessionSpec ) http://git-wip-us.apache.org/repos/asf/ignite/blob/18524159/modules/spark/src/test/scala/org/apache/spark/sql/ignite/IgniteSparkSessionSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/spark/sql/ignite/IgniteSparkSessionSpec.scala b/modules/spark/src/test/scala/org/apache/spark/sql/ignite/IgniteSparkSessionSpec.scala new file mode 100644 index 0000000..fa9d2ee --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/spark/sql/ignite/IgniteSparkSessionSpec.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.ignite + +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath +import org.apache.ignite.spark.AbstractDataFrameSpec +import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, enclose} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner + +/** + * Tests to check Spark Session implementation. + */ +@RunWith(classOf[JUnitRunner]) +class IgniteSparkSessionSpec extends AbstractDataFrameSpec { + var igniteSession: IgniteSparkSession = _ + + describe("Ignite Spark Session Implementation") { + it("should keep session state after session clone") { + val dfProvider = (s: IgniteSparkSession) => { + s.read.json(resolveIgnitePath("modules/spark/src/test/resources/cities.json").getAbsolutePath) + .filter("name = 'Denver'") + } + + var df = dfProvider(igniteSession).cache() + + val cachedData = igniteSession.sharedState.cacheManager.lookupCachedData(df) + + cachedData shouldBe defined + + val otherSession = igniteSession.cloneSession() + + df = dfProvider(otherSession) + + val otherCachedData = otherSession.sharedState.cacheManager.lookupCachedData(df) + + otherCachedData shouldBe defined + + cachedData shouldEqual otherCachedData + } + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + + createCityTable(client, DEFAULT_CACHE) + + val configProvider = enclose(null)(_ â () â { + val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() + + cfg.setClientMode(true) + + cfg.setIgniteInstanceName("client-2") + + cfg + }) + + igniteSession = IgniteSparkSession.builder() + .config(spark.sparkContext.getConf) + .igniteConfigProvider(configProvider) + .getOrCreate() + } +}