spark git commit: [SPARK-18274][ML][PYSPARK] Memory leak in PySpark JavaWrapper

2016-12-01 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 729cadb6b -> 254e33f4b


[SPARK-18274][ML][PYSPARK] Memory leak in PySpark JavaWrapper

## What changes were proposed in this pull request?
In`JavaWrapper `'s destructor make Java Gateway dereference object in 
destructor, using `SparkContext._active_spark_context._gateway.detach`
Fixing the copying parameter bug, by moving the `copy` method from `JavaModel` 
to `JavaParams`

## How was this patch tested?
```scala
import random, string
from pyspark.ml.feature import StringIndexer

l = [(''.join(random.choice(string.ascii_uppercase) for _ in range(10)), ) for 
_ in range(int(7e5))]  # 70 random strings of 10 characters
df = spark.createDataFrame(l, ['string'])

for i in range(50):
indexer = StringIndexer(inputCol='string', outputCol='index')
indexer.fit(df)
```
* Before: would keep StringIndexer strong reference, causing GC issues and is 
halted midway
After: garbage collection works as the object is dereferenced, and computation 
completes
* Mem footprint tested using profiler
* Added a parameter copy related test which was failing before.

Author: Sandeep Singh 
Author: jkbradley 

Closes #15843 from techaddict/SPARK-18274.

(cherry picked from commit 78bb7f8071379114314c394e0167c4c5fd8545c5)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/254e33f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/254e33f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/254e33f4

Branch: refs/heads/branch-2.0
Commit: 254e33f4b2db14fe9438b67023a0b721f9f61a3f
Parents: 729cadb
Author: Sandeep Singh 
Authored: Thu Dec 1 13:22:40 2016 -0800
Committer: Joseph K. Bradley 
Committed: Thu Dec 1 13:23:10 2016 -0800

--
 python/pyspark/ml/tests.py   | 18 +
 python/pyspark/ml/wrapper.py | 41 ++-
 2 files changed, 41 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/254e33f4/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index de95a47..ae95f17 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -379,6 +379,24 @@ class ParamTests(PySparkTestCase):
 self.assertEqual(model.getWindowSize(), 6)
 
 
+class EvaluatorTests(SparkSessionTestCase):
+
+def test_java_params(self):
+"""
+This tests a bug fixed by SPARK-18274 which causes multiple copies
+of a Params instance in Python to be linked to the same Java instance.
+"""
+evaluator = RegressionEvaluator(metricName="r2")
+df = self.spark.createDataFrame([Row(label=1.0, prediction=1.1)])
+evaluator.evaluate(df)
+self.assertEqual(evaluator._java_obj.getMetricName(), "r2")
+evaluatorCopy = evaluator.copy({evaluator.metricName: "mae"})
+evaluator.evaluate(df)
+evaluatorCopy.evaluate(df)
+self.assertEqual(evaluator._java_obj.getMetricName(), "r2")
+self.assertEqual(evaluatorCopy._java_obj.getMetricName(), "mae")
+
+
 class FeatureTests(SparkSessionTestCase):
 
 def test_binarizer(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/254e33f4/python/pyspark/ml/wrapper.py
--
diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py
index 25c44b7..13b75e9 100644
--- a/python/pyspark/ml/wrapper.py
+++ b/python/pyspark/ml/wrapper.py
@@ -71,6 +71,10 @@ class JavaParams(JavaWrapper, Params):
 
 __metaclass__ = ABCMeta
 
+def __del__(self):
+if SparkContext._active_spark_context:
+SparkContext._active_spark_context._gateway.detach(self._java_obj)
+
 def _make_java_param_pair(self, param, value):
 """
 Makes a Java parm pair.
@@ -180,6 +184,25 @@ class JavaParams(JavaWrapper, Params):
   % stage_name)
 return py_stage
 
+def copy(self, extra=None):
+"""
+Creates a copy of this instance with the same uid and some
+extra params. This implementation first calls Params.copy and
+then make a copy of the companion Java pipeline component with
+extra params. So both the Python wrapper and the Java pipeline
+component get copied.
+
+:param extra: Extra parameters to copy to the new instance
+:return: Copy of this instance
+"""
+if extra is None:
+extra = dict()
+that = super(JavaParams, self).copy(extra)
+if self._java_obj is not None:
+that._java_obj = self._java_obj.copy(self._empty_java_param_map())
+that._transfer_params_to_java()
+

spark git commit: [SPARK-18274][ML][PYSPARK] Memory leak in PySpark JavaWrapper

2016-12-01 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master e65348471 -> 78bb7f807


[SPARK-18274][ML][PYSPARK] Memory leak in PySpark JavaWrapper

## What changes were proposed in this pull request?
In`JavaWrapper `'s destructor make Java Gateway dereference object in 
destructor, using `SparkContext._active_spark_context._gateway.detach`
Fixing the copying parameter bug, by moving the `copy` method from `JavaModel` 
to `JavaParams`

## How was this patch tested?
```scala
import random, string
from pyspark.ml.feature import StringIndexer

l = [(''.join(random.choice(string.ascii_uppercase) for _ in range(10)), ) for 
_ in range(int(7e5))]  # 70 random strings of 10 characters
df = spark.createDataFrame(l, ['string'])

for i in range(50):
indexer = StringIndexer(inputCol='string', outputCol='index')
indexer.fit(df)
```
* Before: would keep StringIndexer strong reference, causing GC issues and is 
halted midway
After: garbage collection works as the object is dereferenced, and computation 
completes
* Mem footprint tested using profiler
* Added a parameter copy related test which was failing before.

Author: Sandeep Singh 
Author: jkbradley 

Closes #15843 from techaddict/SPARK-18274.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78bb7f80
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78bb7f80
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78bb7f80

Branch: refs/heads/master
Commit: 78bb7f8071379114314c394e0167c4c5fd8545c5
Parents: e653484
Author: Sandeep Singh 
Authored: Thu Dec 1 13:22:40 2016 -0800
Committer: Joseph K. Bradley 
Committed: Thu Dec 1 13:22:40 2016 -0800

--
 python/pyspark/ml/tests.py   | 18 +
 python/pyspark/ml/wrapper.py | 41 ++-
 2 files changed, 41 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/78bb7f80/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index a0c288a..68f5bc3 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -390,6 +390,24 @@ class ParamTests(PySparkTestCase):
 self.assertEqual(model.getWindowSize(), 6)
 
 
+class EvaluatorTests(SparkSessionTestCase):
+
+def test_java_params(self):
+"""
+This tests a bug fixed by SPARK-18274 which causes multiple copies
+of a Params instance in Python to be linked to the same Java instance.
+"""
+evaluator = RegressionEvaluator(metricName="r2")
+df = self.spark.createDataFrame([Row(label=1.0, prediction=1.1)])
+evaluator.evaluate(df)
+self.assertEqual(evaluator._java_obj.getMetricName(), "r2")
+evaluatorCopy = evaluator.copy({evaluator.metricName: "mae"})
+evaluator.evaluate(df)
+evaluatorCopy.evaluate(df)
+self.assertEqual(evaluator._java_obj.getMetricName(), "r2")
+self.assertEqual(evaluatorCopy._java_obj.getMetricName(), "mae")
+
+
 class FeatureTests(SparkSessionTestCase):
 
 def test_binarizer(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/78bb7f80/python/pyspark/ml/wrapper.py
--
diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py
index 25c44b7..13b75e9 100644
--- a/python/pyspark/ml/wrapper.py
+++ b/python/pyspark/ml/wrapper.py
@@ -71,6 +71,10 @@ class JavaParams(JavaWrapper, Params):
 
 __metaclass__ = ABCMeta
 
+def __del__(self):
+if SparkContext._active_spark_context:
+SparkContext._active_spark_context._gateway.detach(self._java_obj)
+
 def _make_java_param_pair(self, param, value):
 """
 Makes a Java parm pair.
@@ -180,6 +184,25 @@ class JavaParams(JavaWrapper, Params):
   % stage_name)
 return py_stage
 
+def copy(self, extra=None):
+"""
+Creates a copy of this instance with the same uid and some
+extra params. This implementation first calls Params.copy and
+then make a copy of the companion Java pipeline component with
+extra params. So both the Python wrapper and the Java pipeline
+component get copied.
+
+:param extra: Extra parameters to copy to the new instance
+:return: Copy of this instance
+"""
+if extra is None:
+extra = dict()
+that = super(JavaParams, self).copy(extra)
+if self._java_obj is not None:
+that._java_obj = self._java_obj.copy(self._empty_java_param_map())
+that._transfer_params_to_java()
+return that
+
 
 @inherit_doc
 class JavaEstimator(JavaParams, Estimator):
@@ -256,21 +279,3 @@ class Ja

spark git commit: [SPARK-18274][ML][PYSPARK] Memory leak in PySpark JavaWrapper

2016-12-01 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 6916ddc38 -> 4c673c656


[SPARK-18274][ML][PYSPARK] Memory leak in PySpark JavaWrapper

## What changes were proposed in this pull request?
In`JavaWrapper `'s destructor make Java Gateway dereference object in 
destructor, using `SparkContext._active_spark_context._gateway.detach`
Fixing the copying parameter bug, by moving the `copy` method from `JavaModel` 
to `JavaParams`

## How was this patch tested?
```scala
import random, string
from pyspark.ml.feature import StringIndexer

l = [(''.join(random.choice(string.ascii_uppercase) for _ in range(10)), ) for 
_ in range(int(7e5))]  # 70 random strings of 10 characters
df = spark.createDataFrame(l, ['string'])

for i in range(50):
indexer = StringIndexer(inputCol='string', outputCol='index')
indexer.fit(df)
```
* Before: would keep StringIndexer strong reference, causing GC issues and is 
halted midway
After: garbage collection works as the object is dereferenced, and computation 
completes
* Mem footprint tested using profiler
* Added a parameter copy related test which was failing before.

Author: Sandeep Singh 
Author: jkbradley 

Closes #15843 from techaddict/SPARK-18274.

(cherry picked from commit 78bb7f8071379114314c394e0167c4c5fd8545c5)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c673c65
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c673c65
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c673c65

Branch: refs/heads/branch-2.1
Commit: 4c673c656d52d29813979e942851b9205e4ace06
Parents: 6916ddc
Author: Sandeep Singh 
Authored: Thu Dec 1 13:22:40 2016 -0800
Committer: Joseph K. Bradley 
Committed: Thu Dec 1 13:22:55 2016 -0800

--
 python/pyspark/ml/tests.py   | 18 +
 python/pyspark/ml/wrapper.py | 41 ++-
 2 files changed, 41 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4c673c65/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index a0c288a..68f5bc3 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -390,6 +390,24 @@ class ParamTests(PySparkTestCase):
 self.assertEqual(model.getWindowSize(), 6)
 
 
+class EvaluatorTests(SparkSessionTestCase):
+
+def test_java_params(self):
+"""
+This tests a bug fixed by SPARK-18274 which causes multiple copies
+of a Params instance in Python to be linked to the same Java instance.
+"""
+evaluator = RegressionEvaluator(metricName="r2")
+df = self.spark.createDataFrame([Row(label=1.0, prediction=1.1)])
+evaluator.evaluate(df)
+self.assertEqual(evaluator._java_obj.getMetricName(), "r2")
+evaluatorCopy = evaluator.copy({evaluator.metricName: "mae"})
+evaluator.evaluate(df)
+evaluatorCopy.evaluate(df)
+self.assertEqual(evaluator._java_obj.getMetricName(), "r2")
+self.assertEqual(evaluatorCopy._java_obj.getMetricName(), "mae")
+
+
 class FeatureTests(SparkSessionTestCase):
 
 def test_binarizer(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/4c673c65/python/pyspark/ml/wrapper.py
--
diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py
index 25c44b7..13b75e9 100644
--- a/python/pyspark/ml/wrapper.py
+++ b/python/pyspark/ml/wrapper.py
@@ -71,6 +71,10 @@ class JavaParams(JavaWrapper, Params):
 
 __metaclass__ = ABCMeta
 
+def __del__(self):
+if SparkContext._active_spark_context:
+SparkContext._active_spark_context._gateway.detach(self._java_obj)
+
 def _make_java_param_pair(self, param, value):
 """
 Makes a Java parm pair.
@@ -180,6 +184,25 @@ class JavaParams(JavaWrapper, Params):
   % stage_name)
 return py_stage
 
+def copy(self, extra=None):
+"""
+Creates a copy of this instance with the same uid and some
+extra params. This implementation first calls Params.copy and
+then make a copy of the companion Java pipeline component with
+extra params. So both the Python wrapper and the Java pipeline
+component get copied.
+
+:param extra: Extra parameters to copy to the new instance
+:return: Copy of this instance
+"""
+if extra is None:
+extra = dict()
+that = super(JavaParams, self).copy(extra)
+if self._java_obj is not None:
+that._java_obj = self._java_obj.copy(self._empty_java_param_map())
+that._transfer_params_to_java()
+