I have spark 0.8.0 running in cluster with 2 workers each setup with 16 cores
and 24GB memory against hadoop 1.2.1
I have csv with over 1 million records.
My spark jave program runs as expected with smaller size csv but fails as
follows:
Loading csv as text works
JavaRDD<String> rawTable = sc.textFile(raw_file_path).cache();
Then apply map works
JavaRDD<String> col_values = rawTable.map(
new Function<String, String>() {
/**
*
*/
private static final long serialVersionUID =
1L;
@Override
public String call(String line) throws
Exception {
return
FileUtil.extractValueForAPositionNo(line,
columnPosition);
}
}).cache();
Then getting distinct works also
JavaRDD<String> distinct_col_values =
col_values.distinct().cache();
But to dump the content of the distinct into an List of String object .. fails
List<String> list =
distinct_col_values.collect();
Any help?
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297)
at java.lang.Thread.run(Thread.java:724)
Caused by: org.apache.spark.SparkException: Job failed: Task 1.0:1 failed more
than 4 times
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)