I'm reliably getting a bug in PySpark where jobs with many iterative
calculations on cached data stall out.
Data is a folder of ~40 text files, each with 2 mil rows and 360 entries per
row, total size is ~250GB.
I'm testing with the KMeans analyses included as examples (though I see the
same error on my own iterative algorithms). The scala version completes 50+
iterations fine. In PySpark, it successfully completes 9 iterations, and
then stalls. On the driver, I'll get this:
java.net.NoRouteToHostException: Cannot assign requested address
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at java.net.Socket.<init>(Socket.java:425)
at java.net.Socket.<init>(Socket.java:208)
at
org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:328)
at
org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:311)
at org.apache.spark.Accumulable.$plus$plus$eq(Accumulators.scala:70)
at
org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:253)
at
org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:251)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
at org.apache.spark.Accumulators$.add(Accumulators.scala:251)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:598)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
But the job will continue after delivering this error, appearing to finish
the remaining tasks, until it displays:
INFO ClusterScheduler: Remove TaskSet 18.0 from pool
And then just stalls.
The web UI shows a subset of tasks completed; the number of the current task
is the number displayed on the driver around the time the error message
displayed. I don't see any errors in the stdout or stderr on the worker
executing that task, just on the driver. Memory usage on all workers and
driver are well below 50%.
Other observations:
- It's data size dependent. If I load ~50 GB, it finishes 20 iterations
before stalling. If I load ~10 GB, it finishes 35.
- It's not due to the multiple files; I see the same error on a single large
file.
- I always get the error with 30 or 60 nodes, but I don't see it when using
20.
- For a given cluster/data size, it stalls at the same point on every run.
I was going to test all this on EC2, in case it's something specific to our
set up (private HPC running Spark in standalone mode, 16 cores and 100 GB
used per node). But it'd be great if anyone had ideas in the meantime.
Thanks!
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Stalling-during-large-iterative-PySpark-jobs-tp492.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.