Re: reduceByKey issue in example wordcount (scala)
Hi Ian, When you run your packaged application, are you adding its jar file to the SparkContext (by calling the addJar() method)? That will distribute the code to all the worker nodes. The failure you're seeing seems to indicate the worker nodes do not have access to your code. On Mon, Apr 14, 2014 at 9:17 AM, Ian Bonnycastle ibo...@gmail.com wrote: Good afternoon, I'm attempting to get the wordcount example working, and I keep getting an error in the reduceByKey(_ + _) call. I've scoured the mailing lists, and haven't been able to find a sure fire solution, unless I'm missing something big. I did find something close, but it didn't appear to work in my case. The error is: org.apache.spark.SparkException: Job aborted: Task 2.0:3 failed 4 times (most recent failure: Exception failure: java.lang.ClassNotFoundException: SimpleApp$$anonfun$3) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) -- Marcelo
Re: reduceByKey issue in example wordcount (scala)
Hi Marcelo, thanks for answering. That didn't seem to help. I have the following now: val sc = new SparkContext(spark://masternodeip:7077, Simple App, /usr/local/pkg/spark, List(target/scala-2.10/simple-project_2.10-1.0.jar)) sc.addJar(/home/spark/workspace/SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar) I still get the error, though, with ClassNotFoundException, unless I'm not understanding how to run the sc.addJar. I find it a little weird, too, that the Spark platform has trouble finding the code that is itself. And why only with the reduceByKey function is it occuring? I have no problems with any other code running except for that. (BTW, I don't use masternodeip in my code above... I just removed it for security purposes.) Thanks, Ian On Mon, Apr 14, 2014 at 12:45 PM, Marcelo Vanzin van...@cloudera.comwrote: Hi Ian, When you run your packaged application, are you adding its jar file to the SparkContext (by calling the addJar() method)? That will distribute the code to all the worker nodes. The failure you're seeing seems to indicate the worker nodes do not have access to your code. On Mon, Apr 14, 2014 at 9:17 AM, Ian Bonnycastle ibo...@gmail.com wrote: Good afternoon, I'm attempting to get the wordcount example working, and I keep getting an error in the reduceByKey(_ + _) call. I've scoured the mailing lists, and haven't been able to find a sure fire solution, unless I'm missing something big. I did find something close, but it didn't appear to work in my case. The error is: org.apache.spark.SparkException: Job aborted: Task 2.0:3 failed 4 times (most recent failure: Exception failure: java.lang.ClassNotFoundException: SimpleApp$$anonfun$3) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) -- Marcelo
Re: reduceByKey issue in example wordcount (scala)
Hi Ian, On Mon, Apr 14, 2014 at 11:30 AM, Ian Bonnycastle ibo...@gmail.com wrote: val sc = new SparkContext(spark://masternodeip:7077, Simple App, /usr/local/pkg/spark, List(target/scala-2.10/simple-project_2.10-1.0.jar)) Hmmm... does /usr/local/pkg/spark exist on all the worker nodes? (I haven't particularly tried using the sparkHome argument myself, nor have I traced through the code to see exactly what it does, but...). I'd try to set the sparkHome argument to null and seeing if that helps. (It has been working for me without it.) Since you're already listing you app's jar file there, you don't need to explicitly call addJar(). Note that the class that isn't being found is not a Spark class, it's a class form your app (SimpleApp$$anonfun$3). That's most probably the class that implements the closure you're passing as an argument to the reduceByKey() method. Although I can't really explain why the same isn't happening for the closure you're passing to map()... Sorry I can't be more helpful. I still get the error, though, with ClassNotFoundException, unless I'm not understanding how to run the sc.addJar. I find it a little weird, too, that the Spark platform has trouble finding the code that is itself. And why only with the reduceByKey function is it occuring? I have no problems with any other code running except for that. (BTW, I don't use masternodeip in my code above... I just removed it for security purposes.) Thanks, Ian On Mon, Apr 14, 2014 at 12:45 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Ian, When you run your packaged application, are you adding its jar file to the SparkContext (by calling the addJar() method)? That will distribute the code to all the worker nodes. The failure you're seeing seems to indicate the worker nodes do not have access to your code. On Mon, Apr 14, 2014 at 9:17 AM, Ian Bonnycastle ibo...@gmail.com wrote: Good afternoon, I'm attempting to get the wordcount example working, and I keep getting an error in the reduceByKey(_ + _) call. I've scoured the mailing lists, and haven't been able to find a sure fire solution, unless I'm missing something big. I did find something close, but it didn't appear to work in my case. The error is: org.apache.spark.SparkException: Job aborted: Task 2.0:3 failed 4 times (most recent failure: Exception failure: java.lang.ClassNotFoundException: SimpleApp$$anonfun$3) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) -- Marcelo -- Marcelo
Re: reduceByKey issue in example wordcount (scala)
Hi Marcelo, Changing it to null didn't make any difference at all. /usr/local/pkg/spark is also on all the nodes... it has to be in order to get all the nodes up and running in the cluster. Also, I'm confused by what you mean with That's most probably the class that implements the closure you're passing as an argument to the reduceByKey() method. The only thing I'm passing to it is _ + _.. and as you mentioned, its pretty much the same as the map() method. If I run the following code, it runs 100% properly on the cluster: val numAs = logData.filter(line = line.contains(a)).count() So, this is a closure to the filter() method, and it doesn't have any problems at all. Also, if I run the reduceByKey in local mode, it runs perfectly. So, as you mentioned, it almost sounds like the code, or the closure, is not getting to all the nodes properly. But why reduceByKey is the only method affected is beyond me. Ian On Mon, Apr 14, 2014 at 2:45 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Ian, On Mon, Apr 14, 2014 at 11:30 AM, Ian Bonnycastle ibo...@gmail.com wrote: val sc = new SparkContext(spark://masternodeip:7077, Simple App, /usr/local/pkg/spark, List(target/scala-2.10/simple-project_2.10-1.0.jar)) Hmmm... does /usr/local/pkg/spark exist on all the worker nodes? (I haven't particularly tried using the sparkHome argument myself, nor have I traced through the code to see exactly what it does, but...). I'd try to set the sparkHome argument to null and seeing if that helps. (It has been working for me without it.) Since you're already listing you app's jar file there, you don't need to explicitly call addJar(). Note that the class that isn't being found is not a Spark class, it's a class form your app (SimpleApp$$anonfun$3). That's most probably the class that implements the closure you're passing as an argument to the reduceByKey() method. Although I can't really explain why the same isn't happening for the closure you're passing to map()... Sorry I can't be more helpful. I still get the error, though, with ClassNotFoundException, unless I'm not understanding how to run the sc.addJar. I find it a little weird, too, that the Spark platform has trouble finding the code that is itself. And why only with the reduceByKey function is it occuring? I have no problems with any other code running except for that. (BTW, I don't use masternodeip in my code above... I just removed it for security purposes.) Thanks, Ian On Mon, Apr 14, 2014 at 12:45 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Ian, When you run your packaged application, are you adding its jar file to the SparkContext (by calling the addJar() method)? That will distribute the code to all the worker nodes. The failure you're seeing seems to indicate the worker nodes do not have access to your code. On Mon, Apr 14, 2014 at 9:17 AM, Ian Bonnycastle ibo...@gmail.com wrote: Good afternoon, I'm attempting to get the wordcount example working, and I keep getting an error in the reduceByKey(_ + _) call. I've scoured the mailing lists, and haven't been able to find a sure fire solution, unless I'm missing something big. I did find something close, but it didn't appear to work in my case. The error is: org.apache.spark.SparkException: Job aborted: Task 2.0:3 failed 4 times (most recent failure: Exception failure: java.lang.ClassNotFoundException: SimpleApp$$anonfun$3) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) -- Marcelo -- Marcelo