Michel Lemay created SPARK-23961: ------------------------------------ Summary: pyspark toLocalIterator throws an exception Key: SPARK-23961 URL: https://issues.apache.org/jira/browse/SPARK-23961 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 2.2.1 Reporter: Michel Lemay
Given a dataframe, take it's rdd and use toLocalIterator. If we do not consume all records, it will throw: {quote}ERROR PythonRDD: Error while sending iterator java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509) at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705) at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705) at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706) {quote} To reproduce, here is a simple pyspark shell script that show the error: {quote}import itertools df = spark.read.parquet("large parquet folder") cachedRDD = df.rdd.cache() print(cachedRDD.count()) # materialize b = cachedRDD.toLocalIterator() print(len(list(itertools.islice(b, 20)))) b = None # Make the iterator goes out of scope. Throws here. {quote} Observations: * Consuming all records do not throw. Taking only a subset of the partitions create the error. * In another experiment, doing the same on a regular RDD works if we cache/materialize it. If we do not cache the RDD, it throws similarly. * It works in scala shell -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org