[jira] [Updated] (SPARK-23961) pyspark toLocalIterator throws an exception
[ https://issues.apache.org/jira/browse/SPARK-23961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-23961: -- Affects Version/s: 2.0.2 2.1.2 2.3.0 > pyspark toLocalIterator throws an exception > --- > > Key: SPARK-23961 > URL: https://issues.apache.org/jira/browse/SPARK-23961 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.2, 2.1.2, 2.2.1, 2.3.0 >Reporter: Michel Lemay >Priority: Minor > Labels: DataFrame, pyspark > > Given a dataframe and use toLocalIterator. If we do not consume all records, > it will throw: > {quote}ERROR PythonRDD: Error while sending iterator > java.net.SocketException: Connection reset by peer: socket write error > at java.net.SocketOutputStream.socketWrite0(Native Method) > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) > at java.net.SocketOutputStream.write(SocketOutputStream.java:155) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) > at java.io.DataOutputStream.write(DataOutputStream.java:107) > at java.io.FilterOutputStream.write(FilterOutputStream.java:97) > at > org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) > at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706) > {quote} > > To reproduce, here is a simple pyspark shell script that show the error: > {quote}import itertools > df = spark.read.parquet("large parquet folder").cache() > print(df.count()) > b = df.toLocalIterator() > print(len(list(itertools.islice(b, 20 > b = None # Make the iterator goes out of scope. Throws here. > {quote} > > Observations: > * Consuming all records do not throw. Taking only a subset of the > partitions create the error. > * In another experiment, doing the same on a regular RDD works if we > cache/materialize it. If we do not cache the RDD, it throws similarly. > * It works in scala shell > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23961) pyspark toLocalIterator throws an exception
[ https://issues.apache.org/jira/browse/SPARK-23961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michel Lemay updated SPARK-23961: - Description: Given a dataframe and use toLocalIterator. If we do not consume all records, it will throw: {quote}ERROR PythonRDD: Error while sending iterator java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509) at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705) at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705) at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706) {quote} To reproduce, here is a simple pyspark shell script that show the error: {quote}import itertools df = spark.read.parquet("large parquet folder").cache() print(df.count()) b = df.toLocalIterator() print(len(list(itertools.islice(b, 20 b = None # Make the iterator goes out of scope. Throws here. {quote} Observations: * Consuming all records do not throw. Taking only a subset of the partitions create the error. * In another experiment, doing the same on a regular RDD works if we cache/materialize it. If we do not cache the RDD, it throws similarly. * It works in scala shell was: Given a dataframe, take it's rdd and use toLocalIterator. If we do not consume all records, it will throw: {quote}ERROR PythonRDD: Error while sending iterator java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509) at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705) at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705) at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706) {quote} To reproduce, here is a simple pyspark shell script that show the error: {quote}import itertools df = spark.read.parquet("large parquet folder") cachedRDD = df.rdd.cache() print(cachedRDD.count()) # materialize b = cachedRDD.toLocalIterator() print(len(list(itertools.islice(b, 20 b = None # Make the iterator goes out of scope. Throws here. {quote} Observations: * Consuming all records do not throw. Taking only a subset of the partitions create the error. * In another experiment, doing the same on a regular RDD works if we cache/materialize it. If we do not cache the RDD, it throws similarly. * It works in scala shell > pyspark toLocalIterator throws an exception > --- > > Key: SPARK-23961 > URL: https://issues.apache.org/jira/browse/SPARK-23961 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.1 >
[jira] [Updated] (SPARK-23961) pyspark toLocalIterator throws an exception
[ https://issues.apache.org/jira/browse/SPARK-23961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michel Lemay updated SPARK-23961: - Issue Type: Bug (was: Improvement) > pyspark toLocalIterator throws an exception > --- > > Key: SPARK-23961 > URL: https://issues.apache.org/jira/browse/SPARK-23961 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.1 >Reporter: Michel Lemay >Priority: Minor > Labels: DataFrame, pyspark > > Given a dataframe, take it's rdd and use toLocalIterator. If we do not > consume all records, it will throw: > {quote}ERROR PythonRDD: Error while sending iterator > java.net.SocketException: Connection reset by peer: socket write error > at java.net.SocketOutputStream.socketWrite0(Native Method) > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) > at java.net.SocketOutputStream.write(SocketOutputStream.java:155) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) > at java.io.DataOutputStream.write(DataOutputStream.java:107) > at java.io.FilterOutputStream.write(FilterOutputStream.java:97) > at > org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705) > at > org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) > at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706) > {quote} > > To reproduce, here is a simple pyspark shell script that show the error: > {quote}import itertools > df = spark.read.parquet("large parquet folder") > cachedRDD = df.rdd.cache() > print(cachedRDD.count()) # materialize > b = cachedRDD.toLocalIterator() > print(len(list(itertools.islice(b, 20 > b = None # Make the iterator goes out of scope. Throws here. > {quote} > > Observations: > * Consuming all records do not throw. Taking only a subset of the > partitions create the error. > * In another experiment, doing the same on a regular RDD works if we > cache/materialize it. If we do not cache the RDD, it throws similarly. > * It works in scala shell > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org