No, the problem was worked around by not caching many RDD's in my program.  For 
example not caching col_values.distinct() then collect works as expected.

I know this cause I was tracking the invocation of each call with system out 
and so if FileUtil.extractValueForAPositionNo is failing it will get displayed 
in the output.


Thanks,
Hussam
From: Mark Hamstra [mailto:[email protected]]
Sent: Wednesday, October 23, 2013 3:00 PM
To: user
Subject: Re: getting Caused by: org.apache.spark.SparkException: Job failed: 
Task 1.0:1 failed more than 4 times

When you say things like "apply map works" and then lay the blame for the job 
failure on collect(), that's not being fair to collect().  RDD transformations 
are lazy, so the code that you posted immediately after claiming that map works 
doesn't really do anything right then and there except to schedule the map 
transformation to be done at a later time when an RDD action is invoked -- 
collect() in this case.  Very likely FileUtil.extractValueForAPositionNo(line, 
columnPosition) is throwing an exception, which causes the job initiated by the 
collect() action to fail.

On Wed, Oct 23, 2013 at 2:02 PM, 
<[email protected]<mailto:[email protected]>> wrote:
I have spark 0.8.0 running in cluster with 2 workers each setup with 16 cores 
and 24GB memory against hadoop 1.2.1

I have csv with over 1 million records.

My spark jave program runs as expected with smaller size csv but fails as 
follows:

Loading csv as text works
              JavaRDD<String> rawTable = sc.textFile(raw_file_path).cache();


Then apply map works
              JavaRDD<String> col_values = rawTable.map(
                           new Function<String, String>() {
                                  /**
                     *
                      */
                                  private static final long serialVersionUID = 
1L;

                                  @Override
                                  public String call(String line) throws 
Exception {
                                         return 
FileUtil.extractValueForAPositionNo(line,
                                                       columnPosition);
                                  }
                           }).cache();



Then getting distinct works also
                                  JavaRDD<String> distinct_col_values = 
col_values.distinct().cache();

But to dump the content of the distinct into an List of String object .. fails
                                  List<String> list = 
distinct_col_values.collect();


Any help?

[WARNING]
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
        at java.lang.Thread.run(Thread.java:724)
Caused by: org.apache.spark.SparkException: Job failed: Task 1.0:1 failed more 
than 4 times
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
        at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
        at 
org.apache.spark.scheduler.DAGScheduler.org<http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
        at 
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)


Reply via email to