spark git commit: [SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect()

2015-03-13 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 dbee7e16c - 170af49bb


[SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect()

Because circular reference between JavaObject and JavaMember, an Java object 
can not be released until Python GC kick in, then it will cause memory leak in 
collect(), which may consume lots of memory in JVM.

This PR change the way we sending collected data back into Python from local 
file to socket, which could avoid any disk IO during collect, also avoid any 
referrers of Java object in Python.

cc JoshRosen

Author: Davies Liu dav...@databricks.com

Closes #4923 from davies/fix_collect and squashes the following commits:

d730286 [Davies Liu] address comments
24c92a4 [Davies Liu] fix style
ba54614 [Davies Liu] use socket to transfer data from JVM
9517c8f [Davies Liu] fix memory leak in collect()

(cherry picked from commit 8767565cef01d847f57b7293d8b63b2422009b90)
Signed-off-by: Josh Rosen joshro...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/170af49b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/170af49b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/170af49b

Branch: refs/heads/branch-1.3
Commit: 170af49bb0b183b2f4cb3ebbb3e9ab5327f907c9
Parents: dbee7e1
Author: Davies Liu dav...@databricks.com
Authored: Mon Mar 9 16:24:06 2015 -0700
Committer: Josh Rosen joshro...@databricks.com
Committed: Fri Mar 13 10:44:10 2015 -0700

--
 .../org/apache/spark/api/python/PythonRDD.scala | 76 +++-
 python/pyspark/context.py   | 13 ++--
 python/pyspark/rdd.py   | 30 
 python/pyspark/sql/dataframe.py | 14 +---
 4 files changed, 82 insertions(+), 51 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/170af49b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index fd89669..4c71b69 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -19,26 +19,27 @@ package org.apache.spark.api.python
 
 import java.io._
 import java.net._
-import java.util.{List = JList, ArrayList = JArrayList, Map = JMap, UUID, 
Collections}
-
-import org.apache.spark.input.PortableDataStream
+import java.util.{Collections, ArrayList = JArrayList, List = JList, Map = 
JMap}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.language.existentials
 
 import com.google.common.base.Charsets.UTF_8
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
+import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat = NewInputFormat, 
OutputFormat = NewOutputFormat}
+
 import org.apache.spark._
-import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.input.PortableDataStream
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.Utils
 
+import scala.util.control.NonFatal
+
 private[spark] class PythonRDD(
 @transient parent: RDD[_],
 command: Array[Byte],
@@ -344,21 +345,33 @@ private[spark] object PythonRDD extends Logging {
   /**
* Adapter for calling SparkContext#runJob from Python.
*
-   * This method will return an iterator of an array that contains all 
elements in the RDD
+   * This method will serve an iterator of an array that contains all elements 
in the RDD
* (effectively a collect()), but allows you to run on a certain subset of 
partitions,
* or to enable local execution.
+   *
+   * @return the port number of a local socket which serves the data collected 
from this job.
*/
   def runJob(
   sc: SparkContext,
   rdd: JavaRDD[Array[Byte]],
   partitions: JArrayList[Int],
-  allowLocal: Boolean): Iterator[Array[Byte]] = {
+  allowLocal: Boolean): Int = {
 type ByteArray = Array[Byte]
 type UnrolledPartition = Array[ByteArray]
 val allPartitions: Array[UnrolledPartition] =
   sc.runJob(rdd, (x: Iterator[ByteArray]) = x.toArray, partitions, 
allowLocal)
 val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
-flattenedPartition.iterator
+serveIterator(flattenedPartition.iterator,
+  sserve RDD ${rdd.id} with partitions ${partitions.mkString(,)})
+  }
+
+  /**

spark git commit: [SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect()

2015-03-09 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 3cac1991a - 8767565ce


[SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect()

Because circular reference between JavaObject and JavaMember, an Java object 
can not be released until Python GC kick in, then it will cause memory leak in 
collect(), which may consume lots of memory in JVM.

This PR change the way we sending collected data back into Python from local 
file to socket, which could avoid any disk IO during collect, also avoid any 
referrers of Java object in Python.

cc JoshRosen

Author: Davies Liu dav...@databricks.com

Closes #4923 from davies/fix_collect and squashes the following commits:

d730286 [Davies Liu] address comments
24c92a4 [Davies Liu] fix style
ba54614 [Davies Liu] use socket to transfer data from JVM
9517c8f [Davies Liu] fix memory leak in collect()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8767565c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8767565c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8767565c

Branch: refs/heads/master
Commit: 8767565cef01d847f57b7293d8b63b2422009b90
Parents: 3cac199
Author: Davies Liu dav...@databricks.com
Authored: Mon Mar 9 16:24:06 2015 -0700
Committer: Josh Rosen joshro...@databricks.com
Committed: Mon Mar 9 16:24:06 2015 -0700

--
 .../org/apache/spark/api/python/PythonRDD.scala | 76 +++-
 python/pyspark/context.py   | 13 ++--
 python/pyspark/rdd.py   | 30 
 python/pyspark/sql/dataframe.py | 14 +---
 4 files changed, 82 insertions(+), 51 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8767565c/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index b1cec0f..8d4a53b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -19,26 +19,27 @@ package org.apache.spark.api.python
 
 import java.io._
 import java.net._
-import java.util.{List = JList, ArrayList = JArrayList, Map = JMap, UUID, 
Collections}
-
-import org.apache.spark.input.PortableDataStream
+import java.util.{Collections, ArrayList = JArrayList, List = JList, Map = 
JMap}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.language.existentials
 
 import com.google.common.base.Charsets.UTF_8
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
+import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat = NewInputFormat, 
OutputFormat = NewOutputFormat}
+
 import org.apache.spark._
-import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.input.PortableDataStream
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.Utils
 
+import scala.util.control.NonFatal
+
 private[spark] class PythonRDD(
 @transient parent: RDD[_],
 command: Array[Byte],
@@ -341,21 +342,33 @@ private[spark] object PythonRDD extends Logging {
   /**
* Adapter for calling SparkContext#runJob from Python.
*
-   * This method will return an iterator of an array that contains all 
elements in the RDD
+   * This method will serve an iterator of an array that contains all elements 
in the RDD
* (effectively a collect()), but allows you to run on a certain subset of 
partitions,
* or to enable local execution.
+   *
+   * @return the port number of a local socket which serves the data collected 
from this job.
*/
   def runJob(
   sc: SparkContext,
   rdd: JavaRDD[Array[Byte]],
   partitions: JArrayList[Int],
-  allowLocal: Boolean): Iterator[Array[Byte]] = {
+  allowLocal: Boolean): Int = {
 type ByteArray = Array[Byte]
 type UnrolledPartition = Array[ByteArray]
 val allPartitions: Array[UnrolledPartition] =
   sc.runJob(rdd, (x: Iterator[ByteArray]) = x.toArray, partitions, 
allowLocal)
 val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
-flattenedPartition.iterator
+serveIterator(flattenedPartition.iterator,
+  sserve RDD ${rdd.id} with partitions ${partitions.mkString(,)})
+  }
+
+  /**
+   * A helper function to collect an RDD as an iterator, then serve it via 
socket.
+   *
+   * @return the port number of a 

spark git commit: [SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect()

2015-03-09 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 e753f9c9b - d7c359b49


[SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect()

Because circular reference between JavaObject and JavaMember, an Java object 
can not be released until Python GC kick in, then it will cause memory leak in 
collect(), which may consume lots of memory in JVM.

This PR change the way we sending collected data back into Python from local 
file to socket, which could avoid any disk IO during collect, also avoid any 
referrers of Java object in Python.

cc JoshRosen

Author: Davies Liu dav...@databricks.com

Closes #4923 from davies/fix_collect and squashes the following commits:

d730286 [Davies Liu] address comments
24c92a4 [Davies Liu] fix style
ba54614 [Davies Liu] use socket to transfer data from JVM
9517c8f [Davies Liu] fix memory leak in collect()

(cherry picked from commit 8767565cef01d847f57b7293d8b63b2422009b90)
Signed-off-by: Josh Rosen joshro...@databricks.com

Conflicts:
core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
python/pyspark/rdd.py
python/pyspark/sql/dataframe.py


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7c359b4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7c359b4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7c359b4

Branch: refs/heads/branch-1.2
Commit: d7c359b495a10484e7240eae491d00e67e2dee2d
Parents: e753f9c
Author: Davies Liu dav...@databricks.com
Authored: Mon Mar 9 16:24:06 2015 -0700
Committer: Josh Rosen joshro...@databricks.com
Committed: Mon Mar 9 18:19:46 2015 -0700

--
 .../org/apache/spark/api/python/PythonRDD.scala | 74 +++-
 python/pyspark/context.py   | 13 ++--
 python/pyspark/rdd.py   | 43 +++-
 python/pyspark/sql.py   |  8 ++-
 .../scala/org/apache/spark/sql/SchemaRDD.scala  | 12 
 5 files changed, 96 insertions(+), 54 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d7c359b4/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index bfd36c7..2715722 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -19,27 +19,28 @@ package org.apache.spark.api.python
 
 import java.io._
 import java.net._
-import java.util.{List = JList, ArrayList = JArrayList, Map = JMap, UUID, 
Collections}
-
-import org.apache.spark.input.PortableDataStream
+import java.util.{Collections, ArrayList = JArrayList, List = JList, Map = 
JMap}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.language.existentials
 
 import com.google.common.base.Charsets.UTF_8
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
+import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat = NewInputFormat, 
OutputFormat = NewOutputFormat}
+
 import org.apache.spark._
 import org.apache.spark.SparkContext._
 import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.input.PortableDataStream
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.Utils
 
+import scala.util.control.NonFatal
+
 private[spark] class PythonRDD(
 @transient parent: RDD[_],
 command: Array[Byte],
@@ -331,21 +332,33 @@ private[spark] object PythonRDD extends Logging {
   /**
* Adapter for calling SparkContext#runJob from Python.
*
-   * This method will return an iterator of an array that contains all 
elements in the RDD
+   * This method will serve an iterator of an array that contains all elements 
in the RDD
* (effectively a collect()), but allows you to run on a certain subset of 
partitions,
* or to enable local execution.
+   *
+   * @return the port number of a local socket which serves the data collected 
from this job.
*/
   def runJob(
   sc: SparkContext,
   rdd: JavaRDD[Array[Byte]],
   partitions: JArrayList[Int],
-  allowLocal: Boolean): Iterator[Array[Byte]] = {
+  allowLocal: Boolean): Int = {
 type ByteArray = Array[Byte]
 type UnrolledPartition = Array[ByteArray]
 val allPartitions: Array[UnrolledPartition] =
   sc.runJob(rdd, (x: Iterator[ByteArray]) = x.toArray, partitions, 
allowLocal)
 val flattenedPartition: UnrolledPartition =