I think I met the similar problem before. In my opinion, we cannot set up the memory for each python process on worker nodes and the memory consumption is determined by spark. I have tried setting up SPARK_DAEMON_MEMORY and SPARK_DAEMON_JAVA_OPTS because the name of python process on worker nodes is daemon.py, and these settings cannot control the memory of python processes, either. I agree with what Matei has said and I think you can also increase the memory for jvm. In addition, as you have four workers per node, I think SPARK_WORKER_MEMORY is for each worker, so the total memory consumption should be 512mb + 8gb
2013/10/18 eshishki <[email protected]> > >I'm not sure I understand your problem -- is it that Spark used *less* > memory than the 2 GB? > jvm used memory as expected - 512mb > but all the python workers was not bounded by 2GB limit - they grew in RES > size until OOM Killer came itno play > > Yes, i can change parallelism level for map and reduce, but think about it > - i can not have one script for every file. I must know in advance its > size, so i can estimate memory comsumption by worker and adjust parallelism > level accordinaly. > What worker memory limit is for? > > > On Thu, Oct 17, 2013 at 10:15 PM, Matei Zaharia > <[email protected]>wrote: > >> Hi there, >> >> I'm not sure I understand your problem -- is it that Spark used *less* >> memory than the 2 GB? That out of memory message seems to be from your >> operating system, so maybe there were other things using RAM on that >> machine, or maybe Linux is configured to kill tasks quickly when the memory >> gets full. >> >> When you're running PySpark, the underlying Spark process is unlikely to >> use a ton of memory unless you cache stuff, because it just pipes data to >> Python. However, it does launch one Python process per core, and those may >> be using a fair amount of RAM. If you'd like to decrease the memory usage >> per process, try changing the reduceByKey(add) in wordcount.py to use more >> reduce tasks by passing a second parameter to it (for example, >> reduceByKey(add, 20) will have it use 20 parallel tasks). Likewise you can >> set a "minimum number of tasks" value on the textFile call; it's 1 by >> default but you can increase it to, say, 100, to make sure that there are >> at least 100 map tasks. This will make the load per task smaller. >> >> Matei >> >> On Oct 15, 2013, at 7:29 AM, eshishki <[email protected]> wrote: >> >> >> Hello, >> >> i setuped spark-0.8.0-incubating-bin-cdh4 on 5 node cluster. >> >> I limited SPARK_WORKER_MEMORY to 2g and there are 4 cores per node, so i >> expected total memory consumption by spark to be 512mb + 2gb. >> Spark webui shows *Memory:* 10.0 GB Total, 0.0 B Used >> >> Then i tried to run simple wordcount.py from examples on a hdfs file, >> which size is 11GB. >> Spark launched 4 workers per node, and did not limited its total size by >> 2gb - top showed RES consumption about 750mb and then >> Out of memory: Kill process 26336 (python) score 97 or sacrifice child >> Killed process 26336, UID 500, (python) total-vm:969696kB, >> anon-rss:782976kB, file-rss:196kB >> >> and in the logs >> >> INFO cluster.ClusterTaskSetManager: Loss was due to >> org.apache.spark.SparkException >> org.apache.spark.SparkException: Python worker exited unexpectedly >> (crashed) >> at >> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:167) >> at >> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:173) >> at >> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:116) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >> at >> org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:193) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >> at >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149) >> at >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >> at java.lang.Thread.run(Thread.java:662) >> >> So i could not finished the task. Yes, spark resubmited the task, but it >> was continuing OOM Killed. >> >> Against a smaller file spark was doing good. >> >> So the question is - why spark does not limit its memory accordinaly and >> how to analyze files larger than ram with it? >> >> Thanks. >> >> >> > > > -- > Евгений > -- -- Shangyu, Luo Department of Computer Science Rice University -- Not Just Think About It, But Do It! -- Success is never final. -- Losers always whine about their best
