Repository: spark
Updated Branches:
  refs/heads/master ee10ca7ec -> aa4cf2b19


[SPARK-22651][PYTHON][ML] Prevent initiating multiple Hive clients for 
ImageSchema.readImages

## What changes were proposed in this pull request?

Calling `ImageSchema.readImages` multiple times as below in PySpark shell:

```python
from pyspark.ml.image import ImageSchema
data_path = 'data/mllib/images/kittens'
_ = ImageSchema.readImages(data_path, recursive=True, 
dropImageFailures=True).collect()
_ = ImageSchema.readImages(data_path, recursive=True, 
dropImageFailures=True).collect()
```

throws an error as below:

```
...
org.datanucleus.exceptions.NucleusDataStoreException: Unable to open a test 
connection to the given database. JDBC url = 
jdbc:derby:;databaseName=metastore_db;create=true, username = APP. Terminating 
connection pool (set lazyInit to true if you expect to start your database 
after your app). Original Exception: ------
java.sql.SQLException: Failed to start database 'metastore_db' with class 
loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1742f639f, 
see the next exception for details.
...
        at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source)
...
        at 
org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)
...
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:180)
...
        at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:195)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:194)
        at 
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:100)
        at 
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:88)
        at 
org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:39)
        at 
org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog$lzycompute(HiveSessionStateBuilder.scala:54)
        at 
org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:52)
        at 
org.apache.spark.sql.hive.HiveSessionStateBuilder$$anon$1.<init>(HiveSessionStateBuilder.scala:69)
        at 
org.apache.spark.sql.hive.HiveSessionStateBuilder.analyzer(HiveSessionStateBuilder.scala:69)
        at 
org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
        at 
org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
        at 
org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:79)
        at 
org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:79)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:70)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:68)
        at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:51)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:70)
        at 
org.apache.spark.sql.SparkSession.internalCreateDataFrame(SparkSession.scala:574)
        at 
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:593)
        at 
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:348)
        at 
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:348)
        at 
org.apache.spark.ml.image.ImageSchema$$anonfun$readImages$2$$anonfun$apply$1.apply(ImageSchema.scala:253)
...
Caused by: ERROR XJ040: Failed to start database 'metastore_db' with class 
loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1742f639f, 
see the next exception for details.
        at org.apache.derby.iapi.error.StandardException.newException(Unknown 
Source)
        at 
org.apache.derby.impl.jdbc.SQLExceptionFactory.wrapArgsForTransportAcrossDRDA(Unknown
 Source)
        ... 121 more
Caused by: ERROR XSDB6: Another instance of Derby may have already booted the 
database /.../spark/metastore_db.
...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/ml/image.py", line 190, in readImages
    dropImageFailures, float(sampleRatio), seed)
  File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 
1160, in __call__
  File "/.../spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'java.lang.RuntimeException: 
java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'
```

Seems we better stick to `SparkSession.builder.getOrCreate()` like:

https://github.com/apache/spark/blob/51620e288b5e0a7fffc3899c9deadabace28e6d7/python/pyspark/sql/streaming.py#L329

https://github.com/apache/spark/blob/dc5d34d8dcd6526d1dfdac8606661561c7576a62/python/pyspark/sql/column.py#L541

https://github.com/apache/spark/blob/33d43bf1b6f55594187066f0e38ba3985fa2542b/python/pyspark/sql/readwriter.py#L105

## How was this patch tested?

This was tested as below in PySpark shell:

```python
from pyspark.ml.image import ImageSchema
data_path = 'data/mllib/images/kittens'
_ = ImageSchema.readImages(data_path, recursive=True, 
dropImageFailures=True).collect()
_ = ImageSchema.readImages(data_path, recursive=True, 
dropImageFailures=True).collect()
```

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #19845 from HyukjinKwon/SPARK-22651.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa4cf2b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa4cf2b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa4cf2b1

Branch: refs/heads/master
Commit: aa4cf2b19e4cf5588af7e2192e0e9f687cd84bc5
Parents: ee10ca7
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Sat Dec 2 11:55:43 2017 +0900
Committer: hyukjinkwon <gurwls...@gmail.com>
Committed: Sat Dec 2 11:55:43 2017 +0900

----------------------------------------------------------------------
 python/pyspark/ml/image.py |  5 ++---
 python/pyspark/ml/tests.py | 31 ++++++++++++++++++++++++++++++-
 2 files changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aa4cf2b1/python/pyspark/ml/image.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/image.py b/python/pyspark/ml/image.py
index 2b61aa9..384599d 100644
--- a/python/pyspark/ml/image.py
+++ b/python/pyspark/ml/image.py
@@ -201,9 +201,8 @@ class _ImageSchema(object):
         .. versionadded:: 2.3.0
         """
 
-        ctx = SparkContext._active_spark_context
-        spark = SparkSession(ctx)
-        image_schema = ctx._jvm.org.apache.spark.ml.image.ImageSchema
+        spark = SparkSession.builder.getOrCreate()
+        image_schema = spark._jvm.org.apache.spark.ml.image.ImageSchema
         jsession = spark._jsparkSession
         jresult = image_schema.readImages(path, jsession, recursive, 
numPartitions,
                                           dropImageFailures, 
float(sampleRatio), seed)

http://git-wip-us.apache.org/repos/asf/spark/blob/aa4cf2b1/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 89ef555..3a0b816 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -67,7 +67,7 @@ from pyspark.ml.tuning import *
 from pyspark.ml.util import *
 from pyspark.ml.wrapper import JavaParams, JavaWrapper
 from pyspark.serializers import PickleSerializer
-from pyspark.sql import DataFrame, Row, SparkSession
+from pyspark.sql import DataFrame, Row, SparkSession, HiveContext
 from pyspark.sql.functions import rand
 from pyspark.sql.types import DoubleType, IntegerType
 from pyspark.storagelevel import *
@@ -1855,6 +1855,35 @@ class ImageReaderTest(SparkSessionTestCase):
                 lambda: ImageSchema.toImage("a"))
 
 
+class ImageReaderTest2(PySparkTestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        PySparkTestCase.setUpClass()
+        # Note that here we enable Hive's support.
+        try:
+            cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
+        except py4j.protocol.Py4JError:
+            cls.tearDownClass()
+            raise unittest.SkipTest("Hive is not available")
+        except TypeError:
+            cls.tearDownClass()
+            raise unittest.SkipTest("Hive is not available")
+        cls.spark = HiveContext._createForTesting(cls.sc)
+
+    @classmethod
+    def tearDownClass(cls):
+        PySparkTestCase.tearDownClass()
+        cls.spark.sparkSession.stop()
+
+    def test_read_images_multiple_times(self):
+        # This test case is to check if `ImageSchema.readImages` tries to
+        # initiate Hive client multiple times. See SPARK-22651.
+        data_path = 'data/mllib/images/kittens'
+        ImageSchema.readImages(data_path, recursive=True, 
dropImageFailures=True)
+        ImageSchema.readImages(data_path, recursive=True, 
dropImageFailures=True)
+
+
 class ALSTest(SparkSessionTestCase):
 
     def test_storage_levels(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to