Re: spark.akka.frameSize stalls job in 1.1.0
Is it because countByValue or toArray put too much stress on the driver, if there are many unique words To me it is a typical word count problem, then you can solve it as follows (correct me if I am wrong) val textFile = sc.textFile(“file) val counts = textFile.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey((a, b) = a + b) counts.saveAsTextFile(“file”)//any way you don’t want to collect results to master, and instead putting them in file. Thanks. Zhan Zhang On Aug 16, 2014, at 9:18 AM, Jerry Ye jerr...@gmail.com wrote: The job ended up running overnight with no progress. :-( On Sat, Aug 16, 2014 at 12:16 AM, Jerry Ye jerr...@gmail.com wrote: Hi Xiangrui, I actually tried branch-1.1 and master and it resulted in the job being stuck at the TaskSetManager: 14/08/16 06:55:48 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 2 on executor 8: ip-10-226-199-225.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 28055875 bytes in 162 ms 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 0: ip-10-249-53-62.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 28055875 bytes in 178 ms It's been 10 minutes with no progress on relatively small data. I'll let it run overnight and update in the morning. Is there some place that I should look to see what is happening? I tried to ssh into the executor and look at /root/spark/logs but there wasn't anything informative there. I'm sure using CountByValue works fine but my use of a HashMap is only an example. In my actual task, I'm loading a Trie data structure to perform efficient string matching between a dataset of locations and strings possibly containing mentions of locations. This seems like a common thing, to process input with a relatively memory intensive object like a Trie. I hope I'm not missing something obvious. Do you know of any example code like my use case? Thanks! - jerry -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: spark.akka.frameSize stalls job in 1.1.0
Hi Zhan, Thanks for looking into this. I'm actually using the hash map as an example of the simplest snippet of code that is failing for me. I know that this is just the word count. In my actual problem I'm using a Trie data structure to find substring matches. On Sun, Aug 17, 2014 at 11:35 PM, Zhan Zhang zzh...@hortonworks.com wrote: Is it because countByValue or toArray put too much stress on the driver, if there are many unique words To me it is a typical word count problem, then you can solve it as follows (correct me if I am wrong) val textFile = sc.textFile(“file) val counts = textFile.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey((a, b) = a + b) counts.saveAsTextFile(“file”)//any way you don’t want to collect results to master, and instead putting them in file. Thanks. Zhan Zhang On Aug 16, 2014, at 9:18 AM, Jerry Ye jerr...@gmail.com wrote: The job ended up running overnight with no progress. :-( On Sat, Aug 16, 2014 at 12:16 AM, Jerry Ye jerr...@gmail.com wrote: Hi Xiangrui, I actually tried branch-1.1 and master and it resulted in the job being stuck at the TaskSetManager: 14/08/16 06:55:48 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 2 on executor 8: ip-10-226-199-225.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 28055875 bytes in 162 ms 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 0: ip-10-249-53-62.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 28055875 bytes in 178 ms It's been 10 minutes with no progress on relatively small data. I'll let it run overnight and update in the morning. Is there some place that I should look to see what is happening? I tried to ssh into the executor and look at /root/spark/logs but there wasn't anything informative there. I'm sure using CountByValue works fine but my use of a HashMap is only an example. In my actual task, I'm loading a Trie data structure to perform efficient string matching between a dataset of locations and strings possibly containing mentions of locations. This seems like a common thing, to process input with a relatively memory intensive object like a Trie. I hope I'm not missing something obvious. Do you know of any example code like my use case? Thanks! - jerry -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: spark.akka.frameSize stalls job in 1.1.0
Not sure exactly how you use it. My understanding is that in spark it would be better to keep the overhead of driver as less as possible. Is it possible to broadcast trie to executors, do computation there and then aggregate the counters (??) in reduct phase? Thanks. Zhan Zhang On Aug 18, 2014, at 8:54 AM, Jerry Ye jerr...@gmail.com wrote: Hi Zhan, Thanks for looking into this. I'm actually using the hash map as an example of the simplest snippet of code that is failing for me. I know that this is just the word count. In my actual problem I'm using a Trie data structure to find substring matches. On Sun, Aug 17, 2014 at 11:35 PM, Zhan Zhang zzh...@hortonworks.com wrote: Is it because countByValue or toArray put too much stress on the driver, if there are many unique words To me it is a typical word count problem, then you can solve it as follows (correct me if I am wrong) val textFile = sc.textFile(“file) val counts = textFile.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey((a, b) = a + b) counts.saveAsTextFile(“file”)//any way you don’t want to collect results to master, and instead putting them in file. Thanks. Zhan Zhang On Aug 16, 2014, at 9:18 AM, Jerry Ye jerr...@gmail.com wrote: The job ended up running overnight with no progress. :-( On Sat, Aug 16, 2014 at 12:16 AM, Jerry Ye jerr...@gmail.com wrote: Hi Xiangrui, I actually tried branch-1.1 and master and it resulted in the job being stuck at the TaskSetManager: 14/08/16 06:55:48 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 2 on executor 8: ip-10-226-199-225.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 28055875 bytes in 162 ms 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 0: ip-10-249-53-62.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 28055875 bytes in 178 ms It's been 10 minutes with no progress on relatively small data. I'll let it run overnight and update in the morning. Is there some place that I should look to see what is happening? I tried to ssh into the executor and look at /root/spark/logs but there wasn't anything informative there. I'm sure using CountByValue works fine but my use of a HashMap is only an example. In my actual task, I'm loading a Trie data structure to perform efficient string matching between a dataset of locations and strings possibly containing mentions of locations. This seems like a common thing, to process input with a relatively memory intensive object like a Trie. I hope I'm not missing something obvious. Do you know of any example code like my use case? Thanks! - jerry -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: spark.akka.frameSize stalls job in 1.1.0
Hi Xiangrui, I actually tried branch-1.1 and master and it resulted in the job being stuck at the TaskSetManager: 14/08/16 06:55:48 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 2 on executor 8: ip-10-226-199-225.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 28055875 bytes in 162 ms 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 0: ip-10-249-53-62.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 28055875 bytes in 178 ms It's been 10 minutes with no progress on relatively small data. I'll let it run overnight and update in the morning. Is there some place that I should look to see what is happening? I tried to ssh into the executor and look at /root/spark/logs but there wasn't anything informative there. I'm sure using CountByValue works fine but my use of a HashMap is only an example. In my actual task, I'm loading a Trie data structure to perform efficient string matching between a dataset of locations and strings possibly containing mentions of locations. This seems like a common thing, to process input with a relatively memory intensive object like a Trie. I hope I'm not missing something obvious. Do you know of any example code like my use case? Thanks! - jerry On Fri, Aug 15, 2014 at 10:02 PM, Xiangrui Meng men...@gmail.com wrote: Just saw you used toArray on an RDD. That copies all data to the driver and it is deprecated. countByValue is what you need: val samples = sc.textFile(s3n://geonames) val counts = samples.countByValue() val result = samples.map(l = (l, counts.getOrElse(l, 0L)) Could you also try to use the latest branch-1.1 or master with the default akka.frameSize setting? The serialized task size should be small because we now use broadcast RDD objects. -Xiangrui On Fri, Aug 15, 2014 at 5:11 PM, jerryye jerr...@gmail.com wrote: Hi Xiangrui, You were right, I had to use --driver_memory instead of setting it in spark-defaults.conf. However, now my just hangs with the following message: 4/08/15 23:54:46 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 29433434 bytes in 202 ms 14/08/15 23:54:46 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 1: ip-10-226-198-31.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/15 23:54:46 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 29433434 bytes in 203 ms Any ideas on where else to look? On Fri, Aug 15, 2014 at 3:29 PM, Xiangrui Meng [via Apache Spark Developers List] ml-node+s1001551n7883...@n3.nabble.com wrote: Did you verify the driver memory in the Executor tab of the WebUI? I think you need `--driver-memory 8g` with spark-shell or spark-submit instead of setting it in spark-defaults.conf. On Fri, Aug 15, 2014 at 12:41 PM, jerryye [hidden email] http://user/SendEmail.jtp?type=nodenode=7883i=0 wrote: Setting spark.driver.memory has no effect. It's still hanging trying to compute result.count when I'm sampling greater than 35% regardless of what value of spark.driver.memory I'm setting. Here's my settings: export SPARK_JAVA_OPTS=-Xms5g -Xmx10g -XX:MaxPermSize=10g export SPARK_MEM=10g in conf/spark-defaults: spark.driver.memory 1500 spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryoserializer.buffer.mb 500 spark.executor.memory 58315m spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/ spark.executor.extraClassPath /root/ephemeral-hdfs/conf -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/spark-akka-frameSize-stalls-job-in-1-1-0-tp7865p7877.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=7883i=1 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=7883i=2 - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=7883i=3 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=7883i=4 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-developers-list.1001551.n3.nabble.com/spark-akka-frameSize-stalls-job-in-1-1-0-tp7865p7883.html To start a new topic under Apache Spark Developers List, email ml-node+s1001551n1...@n3.nabble.com To unsubscribe from spark.akka.frameSize stalls job in 1.1.0, click here http://apache-spark-developers-list.1001551.n3
Re: spark.akka.frameSize stalls job in 1.1.0
Hi Xiangrui, I wasn't setting spark.driver.memory. I'll try that and report back. In terms of this running on the cluster, my assumption was that calling foreach on an array(I converted samples using toArray) would mean counts is propagated locally. The object would then be serialized to executors fully propagated. Is this correct? I'm actually trying to load a trie and used the hashmap as an example of loading data into an object that needs to be serialized. Is there a better way of doing this? - jerry On Aug 15, 2014, at 8:36 AM, Xiangrui Meng [via Apache Spark Developers List] ml-node+s1001551n7866...@n3.nabble.com wrote: Did you set driver memory? You can confirm it in the Executors tab of the WebUI. Btw, the code may only work in local mode. In a cluster mode, counts will be serialized to remote workers and the result is not fetched by the driver after foreach. You can use RDD.countByValue instead. -Xiangrui On Fri, Aug 15, 2014 at 8:18 AM, jerryye [hidden email] wrote: Hi All, I'm not sure if I should file a JIRA or if I'm missing something obvious since the test code I'm trying is so simple. I've isolated the problem I'm seeing to a memory issue but I don't know what parameter I need to tweak, it does seem related to spark.akka.frameSize. If I sample my RDD with 35% of the data, everything runs to completion, with more than 35%, it fails. In standalone mode, I can run on the full RDD without any problems. // works val samples = sc.textFile(s3n://geonames).sample(false,0.35) // 64MB, 2849439 Lines // fails val samples = sc.textFile(s3n://geonames).sample(false,0.4) // 64MB, 2849439 Lines Any ideas? 1) RDD size is causing the problem. The code below as is fails but if I swap smallSample for samples, the code runs end to end on both cluster and standalone. 2) The error I get is: rg.apache.spark.SparkException: Job aborted due to stage failure: Task 3.0:1 failed 4 times, most recent failure: TID 12 on host ip-10-251-14-74.us-west-2.compute.internal failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) 3) Using the 1.1.0 branch the driver freezes instead of aborting with the previous error in #2. 4) In 1.1.0, changing spark.akka.frameSize also has the effect of no progress in the driver. Code: val smallSample = sc.parallelize(Array(foo word, bar word, baz word)) val samples = sc.textFile(s3n://geonames) // 64MB, 2849439 Lines of short strings val counts = new collection.mutable.HashMap[String, Int].withDefaultValue(0) samples.toArray.foreach(counts(_) += 1) val result = samples.map( l = (l, counts.get(l)) ) result.count Settings (with or without Kryo doesn't matter): export SPARK_JAVA_OPTS=-Xms5g -Xmx10g -XX:MaxPermSize=10g export SPARK_MEM=10g spark.akka.frameSize 40 #spark.serializer org.apache.spark.serializer.KryoSerializer #spark.kryoserializer.buffer.mb 1000 spark.executor.memory 58315m spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/ spark.executor.extraClassPath /root/ephemeral-hdfs/conf -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/spark-akka-frameSize-stalls-job-in-1-1-0-tp7865.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] If you reply to this email, your message will be added to the discussion below: http://apache-spark-developers-list.1001551.n3.nabble.com/spark-akka-frameSize-stalls-job-in-1-1-0-tp7865p7866.html To start a new topic under Apache Spark Developers List, email ml-node+s1001551n1...@n3.nabble.com To unsubscribe from spark.akka.frameSize stalls job in 1.1.0, click here. NAML -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/spark-akka-frameSize-stalls-job-in-1-1-0-tp7865p7871.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
Re: spark.akka.frameSize stalls job in 1.1.0
Setting spark.driver.memory has no effect. It's still hanging trying to compute result.count when I'm sampling greater than 35% regardless of what value of spark.driver.memory I'm setting. Here's my settings: export SPARK_JAVA_OPTS=-Xms5g -Xmx10g -XX:MaxPermSize=10g export SPARK_MEM=10g in conf/spark-defaults: spark.driver.memory 1500 spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryoserializer.buffer.mb 500 spark.executor.memory 58315m spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/ spark.executor.extraClassPath /root/ephemeral-hdfs/conf -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/spark-akka-frameSize-stalls-job-in-1-1-0-tp7865p7877.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org