When you say things like "apply map works" and then lay the blame for the job failure on collect(), that's not being fair to collect(). RDD transformations are lazy, so the code that you posted immediately after claiming that map works doesn't really do anything right then and there except to schedule the map transformation to be done at a later time when an RDD action is invoked -- collect() in this case. Very likely FileUtil.extractValueForAPositionNo(line, columnPosition) is throwing an exception, which causes the job initiated by the collect() action to fail.
On Wed, Oct 23, 2013 at 2:02 PM, <[email protected]> wrote: > I have spark 0.8.0 running in cluster with 2 workers each setup with 16 > cores and 24GB memory against hadoop 1.2.1**** > > ** ** > > I have csv with over 1 million records.**** > > ** ** > > My spark jave program runs as expected with smaller size csv but fails as > follows:**** > > ** ** > > Loading csv as text works **** > > JavaRDD<String> rawTable = *sc* > .textFile(raw_file_path).cache();**** > > ** ** > > ** ** > > Then apply map works**** > > JavaRDD<String> col_values = rawTable.map(**** > > *new* Function<String, String>() {**** > > /****** > > * **** > > */**** > > *private* *static* *final* *long* * > serialVersionUID* = 1L;**** > > ** ** > > @Override**** > > *public* String call(String line) * > throws* Exception {**** > > *return* FileUtil.* > extractValueForAPositionNo*(line,**** > > columnPosition);*** > * > > }**** > > }).cache();**** > > ** ** > > ** ** > > ** ** > > Then getting distinct works also**** > > JavaRDD<String> distinct_col_values = > col_values.distinct().cache();**** > > ** ** > > But to dump the content of the distinct into an List of String object .. > fails**** > > List<String> list = > distinct_col_values.collect();**** > > ** ** > > ** ** > > Any help?**** > > ** ** > > [WARNING]**** > > java.lang.reflect.InvocationTargetException**** > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)**** > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > **** > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > **** > > at java.lang.reflect.Method.invoke(Method.java:606)**** > > at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297) > **** > > at java.lang.Thread.run(Thread.java:724)**** > > Caused by: org.apache.spark.SparkException: Job failed: Task 1.0:1 failed > more than 4 times**** > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) > **** > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) > **** > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > **** > > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)**** > > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) > **** > > at > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379) > **** > > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)**** > > at > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) > **** > > ** ** >
