Hi Jeremy,

If you look at the stdout and stderr files on that worker, do you see any 
earlier errors? I wonder if one of the Python workers crashed earlier.

It would also be good to run “top” and see if more memory is used during the 
computation. I guess the cached RDD itself fits in less than 50% of the RAM as 
you said?

Matei


On Jan 12, 2014, at 8:45 PM, Jeremy Freeman <[email protected]> wrote:

> I'm reliably getting a bug in PySpark where jobs with many iterative
> calculations on cached data stall out. 
> 
> Data is a folder of ~40 text files, each with 2 mil rows and 360 entries per
> row, total size is ~250GB. 
> 
> I'm testing with the KMeans analyses included as examples (though I see the
> same error on my own iterative algorithms). The scala version completes 50+
> iterations fine. In PySpark, it successfully completes 9 iterations, and
> then stalls. On the driver, I'll get this: 
> 
> java.net.NoRouteToHostException: Cannot assign requested address 
>        at java.net.PlainSocketImpl.socketConnect(Native Method) 
>        at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 
>        at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
>  
>        at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
>        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
>        at java.net.Socket.connect(Socket.java:579) 
>        at java.net.Socket.connect(Socket.java:528) 
>        at java.net.Socket.<init>(Socket.java:425) 
>        at java.net.Socket.<init>(Socket.java:208) 
>        at
> org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:328)
>  
>        at
> org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:311)
>  
>        at org.apache.spark.Accumulable.$plus$plus$eq(Accumulators.scala:70) 
>        at
> org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:253) 
>        at
> org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:251) 
>        at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) 
>        at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) 
>        at scala.collection.Iterator$class.foreach(Iterator.scala:772) 
>        at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) 
>        at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) 
>        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) 
>        at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) 
>        at org.apache.spark.Accumulators$.add(Accumulators.scala:251) 
>        at
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:598)
>  
>        at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376) 
>        at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>  
>        at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) 
> 
> But the job will continue after delivering this error, appearing to finish
> the remaining tasks, until it displays: 
> 
> INFO ClusterScheduler: Remove TaskSet 18.0 from pool 
> 
> And then just stalls. 
> 
> The web UI shows a subset of tasks completed; the number of the current task
> is the number displayed on the driver around the time the error message
> displayed. I don't see any errors in the stdout or stderr on the worker
> executing that task, just on the driver. Memory usage on all workers and
> driver are well below 50%. 
> 
> Other observations: 
> - It's data size dependent. If I load ~50 GB, it finishes 20 iterations
> before stalling. If I load ~10 GB, it finishes 35. 
> - It's not due to the multiple files; I see the same error on a single large
> file. 
> - I always get the error with 30 or 60 nodes, but I don't see it when using
> 20. 
> - For a given cluster/data size, it stalls at the same point on every run. 
> 
> I was going to test all this on EC2, in case it's something specific to our
> set up (private HPC running Spark in standalone mode, 16 cores and 100 GB
> used per node). But it'd be great if anyone had ideas in the meantime. 
> 
> Thanks!
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Stalling-during-large-iterative-PySpark-jobs-tp492.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to