Hi Marcelo,

Changing it to null didn't make any difference at all. /usr/local/pkg/spark
is also on all the nodes... it has to be in order to get all the nodes up
and running in the cluster. Also, I'm confused by what you mean with
"That's most probably the class that implements the closure you're passing
as an argument to the reduceByKey() method". The only thing I'm passing to
it is "_ + _".. and as you mentioned, its pretty much the same as the map()
method.

If I run the following code, it runs 100% properly on the cluster:

    val numAs = logData.filter(line => line.contains("a")).count()

So, this is a closure to the filter() method, and it doesn't have any
problems at all. Also, if I run the reduceByKey in local mode, it runs
perfectly. So, as you mentioned, it almost sounds like the code, or the
closure, is not getting to all the nodes properly. But why reduceByKey is
the only method affected is beyond me.

Ian


On Mon, Apr 14, 2014 at 2:45 PM, Marcelo Vanzin <van...@cloudera.com> wrote:

> Hi Ian,
>
> On Mon, Apr 14, 2014 at 11:30 AM, Ian Bonnycastle <ibo...@gmail.com>
> wrote:
> >     val sc = new SparkContext("spark://<masternodeip>:7077",
> >                               "Simple App", "/usr/local/pkg/spark",
> >              List("target/scala-2.10/simple-project_2.10-1.0.jar"))
>
> Hmmm... does /usr/local/pkg/spark exist on all the worker nodes? (I
> haven't particularly tried using the sparkHome argument myself, nor
> have I traced through the code to see exactly what it does, but...).
> I'd try to set the "sparkHome" argument to null and seeing if that
> helps. (It has been working for me without it.) Since you're already
> listing you app's jar file there, you don't need to explicitly call
> addJar().
>
> Note that the class that isn't being found is not a Spark class, it's
> a class form your app (SimpleApp$$anonfun$3). That's most probably the
> class that implements the closure you're passing as an argument to the
> reduceByKey() method. Although I can't really explain why the same
> isn't happening for the closure you're passing to map()...
>
> Sorry I can't be more helpful.
>
> > I still get the error, though, with ClassNotFoundException, unless I'm
> not
> > understanding how to run the sc.addJar. I find it a little weird, too,
> that
> > the Spark platform has trouble finding the code that is itself. And why
> only
> > with the reduceByKey function is it occuring? I have no problems with any
> > other code running except for that. (BTW, I don't use <masternodeip> in
> my
> > code above... I just removed it for security purposes.)
> >
> > Thanks,
> >
> > Ian
> >
> >
> >
> > On Mon, Apr 14, 2014 at 12:45 PM, Marcelo Vanzin <van...@cloudera.com>
> > wrote:
> >>
> >> Hi Ian,
> >>
> >> When you run your packaged application, are you adding its jar file to
> >> the SparkContext (by calling the addJar() method)?
> >>
> >> That will distribute the code to all the worker nodes. The failure
> >> you're seeing seems to indicate the worker nodes do not have access to
> >> your code.
> >>
> >> On Mon, Apr 14, 2014 at 9:17 AM, Ian Bonnycastle <ibo...@gmail.com>
> wrote:
> >> > Good afternoon,
> >> >
> >> > I'm attempting to get the wordcount example working, and I keep
> getting
> >> > an
> >> > error in the "reduceByKey(_ + _)" call. I've scoured the mailing
> lists,
> >> > and
> >> > haven't been able to find a sure fire solution, unless I'm missing
> >> > something
> >> > big. I did find something close, but it didn't appear to work in my
> >> > case.
> >> > The error is:
> >> >
> >> > org.apache.spark.SparkException: Job aborted: Task 2.0:3 failed 4
> times
> >> > (most recent failure: Exception failure:
> >> > java.lang.ClassNotFoundException:
> >> > SimpleApp$$anonfun$3)
> >> >         at
> >> >
> >> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> >>
> >> --
> >> Marcelo
> >
> >
>
>
>
> --
> Marcelo
>

Reply via email to