spark git commit: [SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect()
Repository: spark Updated Branches: refs/heads/branch-1.3 dbee7e16c - 170af49bb [SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect() Because circular reference between JavaObject and JavaMember, an Java object can not be released until Python GC kick in, then it will cause memory leak in collect(), which may consume lots of memory in JVM. This PR change the way we sending collected data back into Python from local file to socket, which could avoid any disk IO during collect, also avoid any referrers of Java object in Python. cc JoshRosen Author: Davies Liu dav...@databricks.com Closes #4923 from davies/fix_collect and squashes the following commits: d730286 [Davies Liu] address comments 24c92a4 [Davies Liu] fix style ba54614 [Davies Liu] use socket to transfer data from JVM 9517c8f [Davies Liu] fix memory leak in collect() (cherry picked from commit 8767565cef01d847f57b7293d8b63b2422009b90) Signed-off-by: Josh Rosen joshro...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/170af49b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/170af49b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/170af49b Branch: refs/heads/branch-1.3 Commit: 170af49bb0b183b2f4cb3ebbb3e9ab5327f907c9 Parents: dbee7e1 Author: Davies Liu dav...@databricks.com Authored: Mon Mar 9 16:24:06 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Fri Mar 13 10:44:10 2015 -0700 -- .../org/apache/spark/api/python/PythonRDD.scala | 76 +++- python/pyspark/context.py | 13 ++-- python/pyspark/rdd.py | 30 python/pyspark/sql/dataframe.py | 14 +--- 4 files changed, 82 insertions(+), 51 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/170af49b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index fd89669..4c71b69 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -19,26 +19,27 @@ package org.apache.spark.api.python import java.io._ import java.net._ -import java.util.{List = JList, ArrayList = JArrayList, Map = JMap, UUID, Collections} - -import org.apache.spark.input.PortableDataStream +import java.util.{Collections, ArrayList = JArrayList, List = JList, Map = JMap} import scala.collection.JavaConversions._ import scala.collection.mutable import scala.language.existentials import com.google.common.base.Charsets.UTF_8 - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf} +import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{InputFormat = NewInputFormat, OutputFormat = NewOutputFormat} + import org.apache.spark._ -import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils +import scala.util.control.NonFatal + private[spark] class PythonRDD( @transient parent: RDD[_], command: Array[Byte], @@ -344,21 +345,33 @@ private[spark] object PythonRDD extends Logging { /** * Adapter for calling SparkContext#runJob from Python. * - * This method will return an iterator of an array that contains all elements in the RDD + * This method will serve an iterator of an array that contains all elements in the RDD * (effectively a collect()), but allows you to run on a certain subset of partitions, * or to enable local execution. + * + * @return the port number of a local socket which serves the data collected from this job. */ def runJob( sc: SparkContext, rdd: JavaRDD[Array[Byte]], partitions: JArrayList[Int], - allowLocal: Boolean): Iterator[Array[Byte]] = { + allowLocal: Boolean): Int = { type ByteArray = Array[Byte] type UnrolledPartition = Array[ByteArray] val allPartitions: Array[UnrolledPartition] = sc.runJob(rdd, (x: Iterator[ByteArray]) = x.toArray, partitions, allowLocal) val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*) -flattenedPartition.iterator +serveIterator(flattenedPartition.iterator, + sserve RDD ${rdd.id} with partitions ${partitions.mkString(,)}) + } + + /**
spark git commit: [SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect()
Repository: spark Updated Branches: refs/heads/master 3cac1991a - 8767565ce [SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect() Because circular reference between JavaObject and JavaMember, an Java object can not be released until Python GC kick in, then it will cause memory leak in collect(), which may consume lots of memory in JVM. This PR change the way we sending collected data back into Python from local file to socket, which could avoid any disk IO during collect, also avoid any referrers of Java object in Python. cc JoshRosen Author: Davies Liu dav...@databricks.com Closes #4923 from davies/fix_collect and squashes the following commits: d730286 [Davies Liu] address comments 24c92a4 [Davies Liu] fix style ba54614 [Davies Liu] use socket to transfer data from JVM 9517c8f [Davies Liu] fix memory leak in collect() Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8767565c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8767565c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8767565c Branch: refs/heads/master Commit: 8767565cef01d847f57b7293d8b63b2422009b90 Parents: 3cac199 Author: Davies Liu dav...@databricks.com Authored: Mon Mar 9 16:24:06 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Mar 9 16:24:06 2015 -0700 -- .../org/apache/spark/api/python/PythonRDD.scala | 76 +++- python/pyspark/context.py | 13 ++-- python/pyspark/rdd.py | 30 python/pyspark/sql/dataframe.py | 14 +--- 4 files changed, 82 insertions(+), 51 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8767565c/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index b1cec0f..8d4a53b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -19,26 +19,27 @@ package org.apache.spark.api.python import java.io._ import java.net._ -import java.util.{List = JList, ArrayList = JArrayList, Map = JMap, UUID, Collections} - -import org.apache.spark.input.PortableDataStream +import java.util.{Collections, ArrayList = JArrayList, List = JList, Map = JMap} import scala.collection.JavaConversions._ import scala.collection.mutable import scala.language.existentials import com.google.common.base.Charsets.UTF_8 - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf} +import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{InputFormat = NewInputFormat, OutputFormat = NewOutputFormat} + import org.apache.spark._ -import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils +import scala.util.control.NonFatal + private[spark] class PythonRDD( @transient parent: RDD[_], command: Array[Byte], @@ -341,21 +342,33 @@ private[spark] object PythonRDD extends Logging { /** * Adapter for calling SparkContext#runJob from Python. * - * This method will return an iterator of an array that contains all elements in the RDD + * This method will serve an iterator of an array that contains all elements in the RDD * (effectively a collect()), but allows you to run on a certain subset of partitions, * or to enable local execution. + * + * @return the port number of a local socket which serves the data collected from this job. */ def runJob( sc: SparkContext, rdd: JavaRDD[Array[Byte]], partitions: JArrayList[Int], - allowLocal: Boolean): Iterator[Array[Byte]] = { + allowLocal: Boolean): Int = { type ByteArray = Array[Byte] type UnrolledPartition = Array[ByteArray] val allPartitions: Array[UnrolledPartition] = sc.runJob(rdd, (x: Iterator[ByteArray]) = x.toArray, partitions, allowLocal) val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*) -flattenedPartition.iterator +serveIterator(flattenedPartition.iterator, + sserve RDD ${rdd.id} with partitions ${partitions.mkString(,)}) + } + + /** + * A helper function to collect an RDD as an iterator, then serve it via socket. + * + * @return the port number of a
spark git commit: [SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect()
Repository: spark Updated Branches: refs/heads/branch-1.2 e753f9c9b - d7c359b49 [SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect() Because circular reference between JavaObject and JavaMember, an Java object can not be released until Python GC kick in, then it will cause memory leak in collect(), which may consume lots of memory in JVM. This PR change the way we sending collected data back into Python from local file to socket, which could avoid any disk IO during collect, also avoid any referrers of Java object in Python. cc JoshRosen Author: Davies Liu dav...@databricks.com Closes #4923 from davies/fix_collect and squashes the following commits: d730286 [Davies Liu] address comments 24c92a4 [Davies Liu] fix style ba54614 [Davies Liu] use socket to transfer data from JVM 9517c8f [Davies Liu] fix memory leak in collect() (cherry picked from commit 8767565cef01d847f57b7293d8b63b2422009b90) Signed-off-by: Josh Rosen joshro...@databricks.com Conflicts: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala python/pyspark/rdd.py python/pyspark/sql/dataframe.py Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7c359b4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7c359b4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7c359b4 Branch: refs/heads/branch-1.2 Commit: d7c359b495a10484e7240eae491d00e67e2dee2d Parents: e753f9c Author: Davies Liu dav...@databricks.com Authored: Mon Mar 9 16:24:06 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Mar 9 18:19:46 2015 -0700 -- .../org/apache/spark/api/python/PythonRDD.scala | 74 +++- python/pyspark/context.py | 13 ++-- python/pyspark/rdd.py | 43 +++- python/pyspark/sql.py | 8 ++- .../scala/org/apache/spark/sql/SchemaRDD.scala | 12 5 files changed, 96 insertions(+), 54 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d7c359b4/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index bfd36c7..2715722 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -19,27 +19,28 @@ package org.apache.spark.api.python import java.io._ import java.net._ -import java.util.{List = JList, ArrayList = JArrayList, Map = JMap, UUID, Collections} - -import org.apache.spark.input.PortableDataStream +import java.util.{Collections, ArrayList = JArrayList, List = JList, Map = JMap} import scala.collection.JavaConversions._ import scala.collection.mutable import scala.language.existentials import com.google.common.base.Charsets.UTF_8 - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf} +import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{InputFormat = NewInputFormat, OutputFormat = NewOutputFormat} + import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils +import scala.util.control.NonFatal + private[spark] class PythonRDD( @transient parent: RDD[_], command: Array[Byte], @@ -331,21 +332,33 @@ private[spark] object PythonRDD extends Logging { /** * Adapter for calling SparkContext#runJob from Python. * - * This method will return an iterator of an array that contains all elements in the RDD + * This method will serve an iterator of an array that contains all elements in the RDD * (effectively a collect()), but allows you to run on a certain subset of partitions, * or to enable local execution. + * + * @return the port number of a local socket which serves the data collected from this job. */ def runJob( sc: SparkContext, rdd: JavaRDD[Array[Byte]], partitions: JArrayList[Int], - allowLocal: Boolean): Iterator[Array[Byte]] = { + allowLocal: Boolean): Int = { type ByteArray = Array[Byte] type UnrolledPartition = Array[ByteArray] val allPartitions: Array[UnrolledPartition] = sc.runJob(rdd, (x: Iterator[ByteArray]) = x.toArray, partitions, allowLocal) val flattenedPartition: UnrolledPartition =