[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22295 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r226184011 --- Diff: python/pyspark/sql/functions.py --- @@ -2713,6 +2713,25 @@ def from_csv(col, schema, options={}): return Column(jc) +@since(3.0) +def _getActiveSession(): --- End diff -- I mean the function itself .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r226178191 --- Diff: python/pyspark/sql/tests.py --- @@ -3863,6 +3863,145 @@ def test_jvm_default_session_already_set(self): spark.stop() +class SparkSessionTests2(unittest.TestCase): + +def test_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +activeSession = SparkSession.getActiveSession() +df = activeSession.createDataFrame([(1, 'Alice')], ['age', 'name']) +self.assertEqual(df.collect(), [Row(age=1, name=u'Alice')]) +finally: +spark.stop() + +def test_get_active_session_when_no_active_session(self): +active = SparkSession.getActiveSession() +self.assertEqual(active, None) +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +active = SparkSession.getActiveSession() +self.assertEqual(active, spark) +spark.stop() +active = SparkSession.getActiveSession() +self.assertEqual(active, None) + +def test_SparkSession(self): +spark = SparkSession.builder \ +.master("local") \ +.config("some-config", "v2") \ +.getOrCreate() +try: +self.assertEqual(spark.conf.get("some-config"), "v2") +self.assertEqual(spark.sparkContext._conf.get("some-config"), "v2") +self.assertEqual(spark.version, spark.sparkContext.version) +spark.sql("CREATE DATABASE test_db") +spark.catalog.setCurrentDatabase("test_db") +self.assertEqual(spark.catalog.currentDatabase(), "test_db") +spark.sql("CREATE TABLE table1 (name STRING, age INT) USING parquet") +self.assertEqual(spark.table("table1").columns, ['name', 'age']) +self.assertEqual(spark.range(3).count(), 3) +finally: +spark.stop() + +def test_global_default_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +self.assertEqual(SparkSession.builder.getOrCreate(), spark) +finally: +spark.stop() + +def test_default_and_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +activeSession = spark._jvm.SparkSession.getActiveSession() +defaultSession = spark._jvm.SparkSession.getDefaultSession() +try: +self.assertEqual(activeSession, defaultSession) +finally: +spark.stop() + +def test_config_option_propagated_to_existing_SparkSession(self): --- End diff -- Will change. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r226178127 --- Diff: python/pyspark/sql/tests.py --- @@ -3863,6 +3863,145 @@ def test_jvm_default_session_already_set(self): spark.stop() +class SparkSessionTests2(unittest.TestCase): + +def test_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +activeSession = SparkSession.getActiveSession() +df = activeSession.createDataFrame([(1, 'Alice')], ['age', 'name']) +self.assertEqual(df.collect(), [Row(age=1, name=u'Alice')]) +finally: +spark.stop() + +def test_get_active_session_when_no_active_session(self): +active = SparkSession.getActiveSession() +self.assertEqual(active, None) +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +active = SparkSession.getActiveSession() +self.assertEqual(active, spark) +spark.stop() +active = SparkSession.getActiveSession() +self.assertEqual(active, None) + +def test_SparkSession(self): +spark = SparkSession.builder \ +.master("local") \ +.config("some-config", "v2") \ +.getOrCreate() +try: +self.assertEqual(spark.conf.get("some-config"), "v2") +self.assertEqual(spark.sparkContext._conf.get("some-config"), "v2") +self.assertEqual(spark.version, spark.sparkContext.version) +spark.sql("CREATE DATABASE test_db") +spark.catalog.setCurrentDatabase("test_db") +self.assertEqual(spark.catalog.currentDatabase(), "test_db") +spark.sql("CREATE TABLE table1 (name STRING, age INT) USING parquet") +self.assertEqual(spark.table("table1").columns, ['name', 'age']) +self.assertEqual(spark.range(3).count(), 3) +finally: +spark.stop() + +def test_global_default_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +self.assertEqual(SparkSession.builder.getOrCreate(), spark) +finally: +spark.stop() + +def test_default_and_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +activeSession = spark._jvm.SparkSession.getActiveSession() +defaultSession = spark._jvm.SparkSession.getDefaultSession() +try: +self.assertEqual(activeSession, defaultSession) +finally: +spark.stop() + +def test_config_option_propagated_to_existing_SparkSession(self): +session1 = SparkSession.builder \ +.master("local") \ +.config("spark-config1", "a") \ +.getOrCreate() +self.assertEqual(session1.conf.get("spark-config1"), "a") +session2 = SparkSession.builder \ +.config("spark-config1", "b") \ +.getOrCreate() +try: +self.assertEqual(session1, session2) +self.assertEqual(session1.conf.get("spark-config1"), "b") +finally: +session1.stop() + +def test_new_session(self): +session = SparkSession.builder \ +.master("local") \ +.getOrCreate() +newSession = session.newSession() +try: +self.assertNotEqual(session, newSession) +finally: +session.stop() +newSession.stop() + +def test_create_new_session_if_old_session_stopped(self): +session = SparkSession.builder \ +.master("local") \ +.getOrCreate() +session.stop() +newSession = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +self.assertNotEqual(session, newSession) +finally: +newSession.stop() + +def test_active_session_with_None_and_not_None_context(self): +from pyspark.context import SparkContext +from pyspark.conf import SparkConf +sc = SparkContext._active_spark_context +self.assertEqual(sc, None) +activeSession = SparkSession.getActiveSession() +self.assertEqual(activeSession, None) +sparkConf = SparkConf() +sc = SparkContext.getOrCreate(sparkConf) +activeSession =
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r226178054 --- Diff: python/pyspark/sql/functions.py --- @@ -2713,6 +2713,25 @@ def from_csv(col, schema, options={}): return Column(jc) +@since(3.0) +def _getActiveSession(): --- End diff -- Do you mean the _ prefix or the function itself? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r226166020 --- Diff: python/pyspark/sql/tests.py --- @@ -3863,6 +3863,145 @@ def test_jvm_default_session_already_set(self): spark.stop() +class SparkSessionTests2(unittest.TestCase): + +def test_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +activeSession = SparkSession.getActiveSession() +df = activeSession.createDataFrame([(1, 'Alice')], ['age', 'name']) +self.assertEqual(df.collect(), [Row(age=1, name=u'Alice')]) +finally: +spark.stop() + +def test_get_active_session_when_no_active_session(self): +active = SparkSession.getActiveSession() +self.assertEqual(active, None) +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +active = SparkSession.getActiveSession() +self.assertEqual(active, spark) +spark.stop() +active = SparkSession.getActiveSession() +self.assertEqual(active, None) + +def test_SparkSession(self): +spark = SparkSession.builder \ +.master("local") \ +.config("some-config", "v2") \ +.getOrCreate() +try: +self.assertEqual(spark.conf.get("some-config"), "v2") +self.assertEqual(spark.sparkContext._conf.get("some-config"), "v2") +self.assertEqual(spark.version, spark.sparkContext.version) +spark.sql("CREATE DATABASE test_db") +spark.catalog.setCurrentDatabase("test_db") +self.assertEqual(spark.catalog.currentDatabase(), "test_db") +spark.sql("CREATE TABLE table1 (name STRING, age INT) USING parquet") +self.assertEqual(spark.table("table1").columns, ['name', 'age']) +self.assertEqual(spark.range(3).count(), 3) +finally: +spark.stop() + +def test_global_default_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +self.assertEqual(SparkSession.builder.getOrCreate(), spark) +finally: +spark.stop() + +def test_default_and_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +activeSession = spark._jvm.SparkSession.getActiveSession() +defaultSession = spark._jvm.SparkSession.getDefaultSession() +try: +self.assertEqual(activeSession, defaultSession) +finally: +spark.stop() + +def test_config_option_propagated_to_existing_SparkSession(self): +session1 = SparkSession.builder \ +.master("local") \ +.config("spark-config1", "a") \ +.getOrCreate() +self.assertEqual(session1.conf.get("spark-config1"), "a") +session2 = SparkSession.builder \ +.config("spark-config1", "b") \ +.getOrCreate() +try: +self.assertEqual(session1, session2) +self.assertEqual(session1.conf.get("spark-config1"), "b") +finally: +session1.stop() + +def test_new_session(self): +session = SparkSession.builder \ +.master("local") \ +.getOrCreate() +newSession = session.newSession() +try: +self.assertNotEqual(session, newSession) +finally: +session.stop() +newSession.stop() + +def test_create_new_session_if_old_session_stopped(self): +session = SparkSession.builder \ +.master("local") \ +.getOrCreate() +session.stop() +newSession = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +self.assertNotEqual(session, newSession) +finally: +newSession.stop() + +def test_active_session_with_None_and_not_None_context(self): +from pyspark.context import SparkContext +from pyspark.conf import SparkConf +sc = SparkContext._active_spark_context +self.assertEqual(sc, None) +activeSession = SparkSession.getActiveSession() +self.assertEqual(activeSession, None) +sparkConf = SparkConf() +sc = SparkContext.getOrCreate(sparkConf) +activeSession =
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r226166057 --- Diff: python/pyspark/sql/tests.py --- @@ -3863,6 +3863,145 @@ def test_jvm_default_session_already_set(self): spark.stop() +class SparkSessionTests2(unittest.TestCase): + +def test_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +activeSession = SparkSession.getActiveSession() +df = activeSession.createDataFrame([(1, 'Alice')], ['age', 'name']) +self.assertEqual(df.collect(), [Row(age=1, name=u'Alice')]) +finally: +spark.stop() + +def test_get_active_session_when_no_active_session(self): +active = SparkSession.getActiveSession() +self.assertEqual(active, None) +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +active = SparkSession.getActiveSession() +self.assertEqual(active, spark) +spark.stop() +active = SparkSession.getActiveSession() +self.assertEqual(active, None) + +def test_SparkSession(self): +spark = SparkSession.builder \ +.master("local") \ +.config("some-config", "v2") \ +.getOrCreate() +try: +self.assertEqual(spark.conf.get("some-config"), "v2") +self.assertEqual(spark.sparkContext._conf.get("some-config"), "v2") +self.assertEqual(spark.version, spark.sparkContext.version) +spark.sql("CREATE DATABASE test_db") +spark.catalog.setCurrentDatabase("test_db") +self.assertEqual(spark.catalog.currentDatabase(), "test_db") +spark.sql("CREATE TABLE table1 (name STRING, age INT) USING parquet") +self.assertEqual(spark.table("table1").columns, ['name', 'age']) +self.assertEqual(spark.range(3).count(), 3) +finally: +spark.stop() + +def test_global_default_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +self.assertEqual(SparkSession.builder.getOrCreate(), spark) +finally: +spark.stop() + +def test_default_and_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +activeSession = spark._jvm.SparkSession.getActiveSession() +defaultSession = spark._jvm.SparkSession.getDefaultSession() +try: +self.assertEqual(activeSession, defaultSession) +finally: +spark.stop() + +def test_config_option_propagated_to_existing_SparkSession(self): --- End diff -- Let's just above `SparkSession` -> `spark_session` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r226165866 --- Diff: python/pyspark/sql/functions.py --- @@ -2713,6 +2713,25 @@ def from_csv(col, schema, options={}): return Column(jc) +@since(3.0) +def _getActiveSession(): --- End diff -- eh.. why is it in functions.py? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r225667299 --- Diff: python/pyspark/sql/tests.py --- @@ -3654,6 +3654,109 @@ def test_jvm_default_session_already_set(self): spark.stop() +class SparkSessionTests2(unittest.TestCase): + +def test_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +activeSession = SparkSession.getActiveSession() +df = activeSession.createDataFrame([(1, 'Alice')], ['age', 'name']) +self.assertEqual(df.collect(), [Row(age=1, name=u'Alice')]) +finally: +spark.stop() + +def test_get_active_session_when_no_active_session(self): +active = SparkSession.getActiveSession() +self.assertEqual(active, None) +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +active = SparkSession.getActiveSession() +self.assertEqual(active, spark) +spark.stop() +active = SparkSession.getActiveSession() +self.assertEqual(active, None) --- End diff -- Thanks @holdenk I will add a test for the above comment and also add a test for your comment regarding ``` self._jvm.SparkSession.setActiveSession(self._jsparkSession) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r225667174 --- Diff: python/pyspark/sql/functions.py --- @@ -2633,6 +2633,23 @@ def sequence(start, stop, step=None): _to_java_column(start), _to_java_column(stop), _to_java_column(step))) +@since(3.0) +def getActiveSession(): +""" +Returns the active SparkSession for the current thread +""" +from pyspark.sql import SparkSession +sc = SparkContext._active_spark_context --- End diff -- @holdenk @HyukjinKwon Thanks for the comments. I checked Scala's behavior: ``` test("my test") { val cx = SparkContext.getActive val session = SparkSession.getActiveSession println(cx) println(session) } ``` The result is ``` None None ``` So it returns None if sc isNone. Actually my current code returns None if sc isNone, but I will change the code a bit to make it more obvious. I will also add _ prefix in the function name and mention in the docstring that this function is not supposed to be called directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r225666954 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- @holdenk @HyukjinKwon Thanks for the comments. I looked the scala code, it ```setActiveSession``` in ```createDataFrame```. ``` def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = { SparkSession.setActiveSession(this) ... } ``` I will do the same for python. ``` def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True): SparkSession._activeSession = self self._jvm.SparkSession.setActiveSession(self._jsparkSession) ``` Will also add a test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r224983583 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- Oh, okay. I had to be explicit. I meant: ```scala scala> import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession scala> SparkSession.getActiveSession res0: Option[org.apache.spark.sql.SparkSession] = Some(org.apache.spark.sql.SparkSession@3ef4a8fb) scala> val session1 = spark session1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@3ef4a8fb scala> val session2 = spark.newSession() session2: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4b74a4d scala> SparkSession.getActiveSession res1: Option[org.apache.spark.sql.SparkSession] = Some(org.apache.spark.sql.SparkSession@3ef4a8fb) scala> session2.createDataFrame(Seq(Tuple1(1))) res2: org.apache.spark.sql.DataFrame = [_1: int] scala> SparkSession.getActiveSession res3: Option[org.apache.spark.sql.SparkSession] = Some(org.apache.spark.sql.SparkSession@4b74a4d) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r224983437 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +253,22 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(3.0) --- End diff -- Yes, at that time, 2.5 was targeted. Now 3.0 is targeted per https://github.com/apache/spark/commit/9bf397c0e45cb161f3f12f09bd2bf14ff96dc823 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r224983406 --- Diff: python/pyspark/sql/functions.py --- @@ -2633,6 +2633,23 @@ def sequence(start, stop, step=None): _to_java_column(start), _to_java_column(stop), _to_java_column(step))) +@since(3.0) +def getActiveSession(): +""" +Returns the active SparkSession for the current thread +""" +from pyspark.sql import SparkSession +sc = SparkContext._active_spark_context --- End diff -- Yea, we should match the behaviour with Scala side - that was my point essentially. The problem about the previous approach was that session was being handled within Python - I believe we will basically reuse JVM's session implementation rather than reimplementing the seperate Python session support within PySpark side. > What about if sc isNone we just return Nonesince we can't have an activeSession without an active SparkContext -- does that sound reasonable? In that case, I think we should follow Scala's behaviour. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r224860828 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- If we're going to support this we should have test for it, or if we aren't going to support this right now we should document the behaviour. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r224858616 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +253,22 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(3.0) --- End diff -- @HyukjinKwon are you OK to mark this comment as resolved since we're now targeting `3.0`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r224858233 --- Diff: python/pyspark/sql/functions.py --- @@ -2633,6 +2633,23 @@ def sequence(start, stop, step=None): _to_java_column(start), _to_java_column(stop), _to_java_column(step))) +@since(3.0) +def getActiveSession(): +""" +Returns the active SparkSession for the current thread +""" +from pyspark.sql import SparkSession +sc = SparkContext._active_spark_context --- End diff -- If this is being done to simplify implementation and we don't expect people to call it directly here we should mention that in the docstring and also use an _ prefix. I disagree with @HyukjinKwon about this behaviour being what people would expect -- it doesn't match the Scala behaviour and one of the reasons to have something like `getActiveSession()` instead of `getOrCreate()` is to allow folks to do something if we have an active session or do something else if we don't. What about if `sc` is`None` we just return `None `since we can't have an `activeSession` without an active `SparkContext` -- does that sound reasonable? That being said if folks feel strongly about this I'm _ok_ with us setting up a SparkContext but we need to document that if that's the path we go. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r224860350 --- Diff: python/pyspark/sql/tests.py --- @@ -3654,6 +3654,109 @@ def test_jvm_default_session_already_set(self): spark.stop() +class SparkSessionTests2(unittest.TestCase): + +def test_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +activeSession = SparkSession.getActiveSession() +df = activeSession.createDataFrame([(1, 'Alice')], ['age', 'name']) +self.assertEqual(df.collect(), [Row(age=1, name=u'Alice')]) +finally: +spark.stop() + +def test_get_active_session_when_no_active_session(self): +active = SparkSession.getActiveSession() +self.assertEqual(active, None) +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +active = SparkSession.getActiveSession() +self.assertEqual(active, spark) +spark.stop() +active = SparkSession.getActiveSession() +self.assertEqual(active, None) --- End diff -- Given the change for how we construct the SparkSession can we add a test that makes sure we do whatever we decide to with the SparkContext? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r223173806 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +255,20 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@classmethod +@since(2.5) +def getActiveSession(cls): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = SparkSession.getActiveSession() +>>> l = [('Alice', 1)] +>>> rdd = s.sparkContext.parallelize(l) +>>> df = s.createDataFrame(rdd, ['name', 'age']) +>>> df.select("age").collect() +[Row(age=1)] +""" +return cls._activeSession --- End diff -- Yea, it should look like that --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r223165392 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +255,20 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@classmethod +@since(2.5) +def getActiveSession(cls): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = SparkSession.getActiveSession() +>>> l = [('Alice', 1)] +>>> rdd = s.sparkContext.parallelize(l) +>>> df = s.createDataFrame(rdd, ['name', 'age']) +>>> df.select("age").collect() +[Row(age=1)] +""" +return cls._activeSession --- End diff -- @HyukjinKwon I am not sure if I follow your suggestion correctly. Does the following look right to you? session.py ``` @classmethod @since(3.0) def getActiveSession(cls): from pyspark.sql import functions return functions.getActiveSession() ``` functions.py ``` @since(3.0) def getActiveSession(): from pyspark.sql import SparkSession sc = SparkContext._active_spark_context if sc is None: sc = SparkContext() if sc._jvm.SparkSession.getActiveSession().isDefined(): SparkSession(sc, sc._jvm.SparkSession.getActiveSession().get()) return SparkSession._activeSession else: return None ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r222959106 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +255,20 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@classmethod +@since(2.5) --- End diff -- Let's do this to 3.0. Per https://github.com/apache/spark/commit/9bf397c0e45cb161f3f12f09bd2bf14ff96dc823, looks we are going ahead for 3.0 now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r222958780 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +255,20 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@classmethod +@since(2.5) +def getActiveSession(cls): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = SparkSession.getActiveSession() +>>> l = [('Alice', 1)] +>>> rdd = s.sparkContext.parallelize(l) +>>> df = s.createDataFrame(rdd, ['name', 'age']) +>>> df.select("age").collect() +[Row(age=1)] +""" +return cls._activeSession --- End diff -- Yup. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r222700425 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +255,20 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@classmethod +@since(2.5) +def getActiveSession(cls): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = SparkSession.getActiveSession() +>>> l = [('Alice', 1)] +>>> rdd = s.sparkContext.parallelize(l) +>>> df = s.createDataFrame(rdd, ['name', 'age']) +>>> df.select("age").collect() +[Row(age=1)] +""" +return cls._activeSession --- End diff -- Do you mean in a multi-language notebook environment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r222699236 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- So @HyukjinKwon in this code session1 and session2 are already equal: > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/__ / .__/\_,_/_/ /_/\_\ version 2.3.1 > /_/ > > Using Python version 3.6.5 (default, Apr 29 2018 16:14:56) > SparkSession available as 'spark'. > >>> session1 = SparkSession.builder.config("key1", "value1").getOrCreate() > >>> session2 = SparkSession.builder.config("key2", "value2").getOrCreate() > >>> session1 > > >>> session2 > > >>> session1 == session2 > True > >>> > > > > > That being said the possibility of having multiple Spark session in Python is doable you manually have to call the init e.g.: > >>> session3 = SparkSession(sc) > >>> session3 > > >>> > And supporting that is reasonable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r222392450 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +255,20 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@classmethod +@since(2.5) +def getActiveSession(cls): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = SparkSession.getActiveSession() +>>> l = [('Alice', 1)] +>>> rdd = s.sparkContext.parallelize(l) +>>> df = s.createDataFrame(rdd, ['name', 'age']) +>>> df.select("age").collect() +[Row(age=1)] +""" +return cls._activeSession --- End diff -- The problem here is when we share single JVM like Zeppelin. It should get the session from JVM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r221429353 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +253,22 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(2.5) +def getActiveSession(self): --- End diff -- I think the class method should initialize JVM if non existent (see functions.py). Probably Spark context too. If exists, it should use the existing one. Also, let's define this as a property since that's closer to Scala's usage. I know it's difficult to define a static property. You can refer https://github.com/graphframes/graphframes/pull/169/files#diff-e81e6b169c0aa35012a3263b2f31b330R381 or we should consider adding this as a function --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r221429200 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +253,22 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(2.5) +def getActiveSession(self): --- End diff -- Wait .. this should be class method. since the scala usage is `SparkSession. getActiveSession` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r221429170 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- Simialr. I was expecting something like: ```python session1 = SparkSession.builder.config("key1", "value1").getOrCreate() session2 = SparkSession.builder.config("key2", "value2").getOrCreate() assert(session2 == SparkSession.getActiveSession()) session1.createDataFrame([(1, 'Alice')], ['age', 'name']) assert(session1 == SparkSession.getActiveSession()) ``` does this work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r221394694 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- @HyukjinKwon Do you mean something like this: ``` def test_two_spark_session(self): session1 = None session2 = None try: session1 = SparkSession.builder.config("key1", "value1").getOrCreate() session2 = SparkSession.builder.config("key2", "value2").getOrCreate() self.assertEqual(session1, session2) df = session1.createDataFrame([(1, 'Alice')], ['age', 'name']) self.assertEqual(df.collect(), [Row(age=1, name=u'Alice')]) activeSession1 = session1.getActiveSession() activeSession2 = session2.getActiveSession() self.assertEqual(activeSession1, activeSession1) finally: if session1 is not None: session1.stop() if session2 is not None: session2.stop() ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r221104383 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- > When createDataFrame, we already have a session but wouldn't we not set the active session properly if session A sets an active session in `__init__`, and then session B sets an active session in `__init__`, and then session A calls `createDataFrame` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r221089916 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- @HyukjinKwon Seems to me that active session is set OK in the ```__init__```. When createDataFrame, we already have a session, and the active session is already set in the ```__init__```. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r221005236 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- @huaxingao, can you check if the active session is set? for instance when we `createDataFrame`? From a cursory look, we are not setting it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r220787563 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +253,22 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(3.0) --- End diff -- Let's change this to 2.5 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r219551669 --- Diff: python/pyspark/sql/tests.py --- @@ -3654,6 +3654,107 @@ def test_jvm_default_session_already_set(self): spark.stop() +class SparkSessionTests2(ReusedSQLTestCase): --- End diff -- @HyukjinKwon there's no strong need for it, however it does mean that the first `getOrCreate` will already have a session it can use, but given that we set up and tear down the session this may be less than ideal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r219552522 --- Diff: python/pyspark/sql/tests.py --- @@ -3654,6 +3654,107 @@ def test_jvm_default_session_already_set(self): spark.stop() +class SparkSessionTests2(ReusedSQLTestCase): + +def test_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +activeSession = spark.getActiveSession() +df = activeSession.createDataFrame([(1, 'Alice')], ['age', 'name']) +self.assertEqual(df.collect(), [Row(age=1, name=u'Alice')]) +finally: +spark.stop() + +def test_SparkSession(self): +spark = SparkSession.builder \ +.master("local") \ +.config("some-config", "v2") \ +.getOrCreate() +try: +self.assertEqual(spark.conf.get("some-config"), "v2") +self.assertEqual(spark.sparkContext._conf.get("some-config"), "v2") +self.assertEqual(spark.version, spark.sparkContext.version) +spark.sql("CREATE DATABASE test_db") +spark.catalog.setCurrentDatabase("test_db") +self.assertEqual(spark.catalog.currentDatabase(), "test_db") +spark.sql("CREATE TABLE table1 (name STRING, age INT) USING parquet") +self.assertEqual(spark.table("table1").columns, ['name', 'age']) +self.assertEqual(spark.range(3).count(), 3) +finally: +spark.stop() + +def test_global_default_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +self.assertEqual(SparkSession.builder.getOrCreate(), spark) +finally: +spark.stop() + +def test_default_and_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +activeSession = spark._jvm.SparkSession.getActiveSession() +defaultSession = spark._jvm.SparkSession.getDefaultSession() +try: +self.assertEqual(activeSession, defaultSession) +finally: +spark.stop() + +def test_config_option_propagated_to_existing_SparkSession(self): +session1 = SparkSession.builder \ +.master("local") \ +.config("spark-config1", "a") \ +.getOrCreate() +self.assertEqual(session1.conf.get("spark-config1"), "a") +session2 = SparkSession.builder \ +.config("spark-config1", "b") \ +.getOrCreate() +try: +self.assertEqual(session1, session2) +self.assertEqual(session1.conf.get("spark-config1"), "b") +finally: +session1.stop() + +def test_newSession(self): +session = SparkSession.builder \ +.master("local") \ +.getOrCreate() +newSession = session.newSession() +try: +self.assertNotEqual(session, newSession) +finally: +session.stop() +newSession.stop() + +def test_create_new_session_if_old_session_stopped(self): +session = SparkSession.builder \ +.master("local") \ +.getOrCreate() +session.stop() +newSession = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +self.assertNotEqual(session, newSession) +finally: +newSession.stop() + +def test_create_SparkContext_then_SparkSession(self): --- End diff -- I don't strongly agree here. I think given that the method names are camel case in the `SparkSession` & `SparkContext` in Python this naming is perfectly reasonable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r219552270 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- Yes this seems like the right path forward, thanks for figuring out that was missing as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r219551059 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- Yes, that sounds like the right approach and I think we need that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r218371105 --- Diff: python/pyspark/sql/tests.py --- @@ -3654,6 +3654,107 @@ def test_jvm_default_session_already_set(self): spark.stop() +class SparkSessionTests2(ReusedSQLTestCase): + +def test_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +activeSession = spark.getActiveSession() +df = activeSession.createDataFrame([(1, 'Alice')], ['age', 'name']) +self.assertEqual(df.collect(), [Row(age=1, name=u'Alice')]) +finally: +spark.stop() + +def test_SparkSession(self): +spark = SparkSession.builder \ +.master("local") \ +.config("some-config", "v2") \ +.getOrCreate() +try: +self.assertEqual(spark.conf.get("some-config"), "v2") +self.assertEqual(spark.sparkContext._conf.get("some-config"), "v2") +self.assertEqual(spark.version, spark.sparkContext.version) +spark.sql("CREATE DATABASE test_db") +spark.catalog.setCurrentDatabase("test_db") +self.assertEqual(spark.catalog.currentDatabase(), "test_db") +spark.sql("CREATE TABLE table1 (name STRING, age INT) USING parquet") +self.assertEqual(spark.table("table1").columns, ['name', 'age']) +self.assertEqual(spark.range(3).count(), 3) +finally: +spark.stop() + +def test_global_default_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +self.assertEqual(SparkSession.builder.getOrCreate(), spark) +finally: +spark.stop() + +def test_default_and_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +activeSession = spark._jvm.SparkSession.getActiveSession() +defaultSession = spark._jvm.SparkSession.getDefaultSession() +try: +self.assertEqual(activeSession, defaultSession) +finally: +spark.stop() + +def test_config_option_propagated_to_existing_SparkSession(self): +session1 = SparkSession.builder \ +.master("local") \ +.config("spark-config1", "a") \ +.getOrCreate() +self.assertEqual(session1.conf.get("spark-config1"), "a") +session2 = SparkSession.builder \ +.config("spark-config1", "b") \ +.getOrCreate() +try: +self.assertEqual(session1, session2) +self.assertEqual(session1.conf.get("spark-config1"), "b") +finally: +session1.stop() + +def test_newSession(self): --- End diff -- ditto for naming. Let's just follow Python's convention in those names --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r218370828 --- Diff: python/pyspark/sql/tests.py --- @@ -3654,6 +3654,107 @@ def test_jvm_default_session_already_set(self): spark.stop() +class SparkSessionTests2(ReusedSQLTestCase): --- End diff -- Do we need to extend `ReusedSQLTestCase`? Looks we can just `unittest.TestCase`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r218370450 --- Diff: python/pyspark/sql/tests.py --- @@ -3654,6 +3654,107 @@ def test_jvm_default_session_already_set(self): spark.stop() +class SparkSessionTests2(ReusedSQLTestCase): + +def test_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +activeSession = spark.getActiveSession() +df = activeSession.createDataFrame([(1, 'Alice')], ['age', 'name']) +self.assertEqual(df.collect(), [Row(age=1, name=u'Alice')]) +finally: +spark.stop() + +def test_SparkSession(self): +spark = SparkSession.builder \ +.master("local") \ +.config("some-config", "v2") \ +.getOrCreate() +try: +self.assertEqual(spark.conf.get("some-config"), "v2") +self.assertEqual(spark.sparkContext._conf.get("some-config"), "v2") +self.assertEqual(spark.version, spark.sparkContext.version) +spark.sql("CREATE DATABASE test_db") +spark.catalog.setCurrentDatabase("test_db") +self.assertEqual(spark.catalog.currentDatabase(), "test_db") +spark.sql("CREATE TABLE table1 (name STRING, age INT) USING parquet") +self.assertEqual(spark.table("table1").columns, ['name', 'age']) +self.assertEqual(spark.range(3).count(), 3) +finally: +spark.stop() + +def test_global_default_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +self.assertEqual(SparkSession.builder.getOrCreate(), spark) +finally: +spark.stop() + +def test_default_and_active_session(self): +spark = SparkSession.builder \ +.master("local") \ +.getOrCreate() +activeSession = spark._jvm.SparkSession.getActiveSession() +defaultSession = spark._jvm.SparkSession.getDefaultSession() +try: +self.assertEqual(activeSession, defaultSession) +finally: +spark.stop() + +def test_config_option_propagated_to_existing_SparkSession(self): +session1 = SparkSession.builder \ +.master("local") \ +.config("spark-config1", "a") \ +.getOrCreate() +self.assertEqual(session1.conf.get("spark-config1"), "a") +session2 = SparkSession.builder \ +.config("spark-config1", "b") \ +.getOrCreate() +try: +self.assertEqual(session1, session2) +self.assertEqual(session1.conf.get("spark-config1"), "b") +finally: +session1.stop() + +def test_newSession(self): +session = SparkSession.builder \ +.master("local") \ +.getOrCreate() +newSession = session.newSession() +try: +self.assertNotEqual(session, newSession) +finally: +session.stop() +newSession.stop() + +def test_create_new_session_if_old_session_stopped(self): +session = SparkSession.builder \ +.master("local") \ +.getOrCreate() +session.stop() +newSession = SparkSession.builder \ +.master("local") \ +.getOrCreate() +try: +self.assertNotEqual(session, newSession) +finally: +newSession.stop() + +def test_create_SparkContext_then_SparkSession(self): --- End diff -- nit: let's just name it `spark_context` and `spark_session` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r218309924 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +253,22 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(3.0) +def getActiveSession(self): --- End diff -- cc @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r218237306 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- Thanks you very much for your comments. I have a question here. In stop() method, shall we clear the activeSession too? Currently, it has ``` def stop(self): """Stop the underlying :class:`SparkContext`. """ self._jvm.SparkSession.clearDefaultSession() SparkSession._instantiatedSession = None ``` Do I need to add the following? ``` self._jvm.SparkSession.clearActiveSession() ``` To test for getActiveSession when there is no active session, I am thinking of adding ``` def test_get_active_session_when_no_active_session(self): spark = SparkSession.builder \ .master("local") \ .getOrCreate() spark.stop() active = spark.getActiveSession() self.assertEqual(active, None) ``` The test didn't pass because in stop(), the active session is not cleared. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r217771519 --- Diff: python/pyspark/sql/session.py --- @@ -231,6 +231,7 @@ def __init__(self, sparkContext, jsparkSession=None): or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self self._jvm.SparkSession.setDefaultSession(self._jsparkSession) +self._jvm.SparkSession.setActiveSession(self._jsparkSession) --- End diff -- Thanks for catching this! Filed a follow up https://issues.apache.org/jira/browse/SPARK-25432 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r216553283 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +253,22 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(2.4) --- End diff -- @huaxingao, let's target this 3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r216115581 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +252,16 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(2.4) +def getActiveSession(self): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = spark.getActiveSession() +>>> spark._jsparkSession.getDefaultSession().get().equals(s.get()) +True +""" +return self._jsparkSession.getActiveSession() --- End diff -- @HyukjinKwon I add a set of tests. Some of them are borrowed from ```SparkSessionBuilderSuite.scala``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r215556683 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +252,16 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(2.4) +def getActiveSession(self): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = spark.getActiveSession() +>>> spark._jsparkSession.getDefaultSession().get().equals(s.get()) +True +""" +return self._jsparkSession.getActiveSession() --- End diff -- Yea, I think we should return Python session one. JVM instance should not be exposed .. I assume returning `None` is fine. The thing is, we have the lack of session supports in PySpark. It's partially implemented but not very well tested as far as I can tell. Can you add a set of tests for it, and manually test them as well? Actually, my guys say this is quite a big deal --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r215556819 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +252,16 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(2.4) +def getActiveSession(self): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = spark.getActiveSession() +>>> spark._jsparkSession.getDefaultSession().get().equals(s.get()) +True +""" +return self._jsparkSession.getActiveSession() --- End diff -- Yea, I think we should return Python session one. JVM instance should not be exposed .. I assume returning `None` is fine. The thing is, we have the lack of session supports in PySpark. It's partially implemented but not very well tested as far as I can tell. Can you add a set of tests for it, and manually test them as well? Actually, my guts say this is quite a big deal --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r215022091 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +252,16 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(2.4) +def getActiveSession(self): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = spark.getActiveSession() +>>> spark._jsparkSession.getDefaultSession().get().equals(s.get()) --- End diff -- @holdenk @felixcheung Thanks for the review. I will change this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r215022059 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +252,16 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(2.4) +def getActiveSession(self): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = spark.getActiveSession() +>>> spark._jsparkSession.getDefaultSession().get().equals(s.get()) +True +""" +return self._jsparkSession.getActiveSession() --- End diff -- @HyukjinKwon Sorry for the late reply. Yes, this returns a JVM instance. In the scala code, ```SparkSession.getActiveSession``` returns an ```Option[SparkSession]``` I am not sure how to do a python equivalent of Scala's ```Option```. In the following code, is there a way to wrap the python ```session``` in else path to something equivalent of Scala's ```Option```? If not, can I just return the python ```session```? ``` if self._jsparkSession.getActiveSession() is None: return None else: return self.__class__(self._sc, self._jsparkSession.getActiveSession().get()) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r214530177 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +252,16 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(2.4) +def getActiveSession(self): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = spark.getActiveSession() +>>> spark._jsparkSession.getDefaultSession().get().equals(s.get()) --- End diff -- ..and probably shouldn't access `_jsparkSession` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r214410416 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +252,16 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(2.4) +def getActiveSession(self): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = spark.getActiveSession() +>>> spark._jsparkSession.getDefaultSession().get().equals(s.get()) --- End diff -- So normally we try and have doc tests like these be examples of how the user should use this. So I would consider getting the active session and then doing something a normal user would with it (like paralleling a collection). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22295#discussion_r214237818 --- Diff: python/pyspark/sql/session.py --- @@ -252,6 +252,16 @@ def newSession(self): """ return self.__class__(self._sc, self._jsparkSession.newSession()) +@since(2.4) +def getActiveSession(self): +""" +Returns the active SparkSession for the current thread, returned by the builder. +>>> s = spark.getActiveSession() +>>> spark._jsparkSession.getDefaultSession().get().equals(s.get()) +True +""" +return self._jsparkSession.getActiveSession() --- End diff -- Does this return JVM instance? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22295: [SPARK-25255][PYTHON]Add getActiveSession to Spar...
GitHub user huaxingao opened a pull request: https://github.com/apache/spark/pull/22295 [SPARK-25255][PYTHON]Add getActiveSession to SparkSession in PySpark ## What changes were proposed in this pull request? add getActiveSession in session.py ## How was this patch tested? add doctest You can merge this pull request into a Git repository by running: $ git pull https://github.com/huaxingao/spark spark25255 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22295.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22295 commit e9885b38e35dcfd01a40e43cf442beeaea226b98 Author: Huaxin Gao Date: 2018-08-31T00:50:23Z [SPARK-25255][PYTHON]Add getActiveSession to SparkSession in PySpark --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org