Hello list, I am trying to run a very simple program with spark in a very small cluster (10 nodes). Each node has 8 cores and 16Gb of RAM. The program is the JavaWordCount provided in the spark's website:
https://github.com/apache/incubator-spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java I run the JavaWordCount program provided in the spark tarball too (./run-example...). However, that program fails (runs out of heap memory) when the file(s) are relatively big. In this case 151G (which is not that big actually). Hadoop WordCount works well with that file actually. I suspect the problem lies with some configuration options in spark. That is why I am trying to run my own version of JavaWordCount, to learn how can I tune memory usage to my advantage (and if this simple word count example can be run at all). For now I am just using a very small file to run it. When I do, I get the following exception: 13/10/03 13:15:44 INFO cluster.ClusterTaskSetManager: Lost TID 0 (task 1.0:0) 13/10/03 13:15:44 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.AbstractMethodError java.lang.AbstractMethodError: org.apache.spark.api.java.function.WrappedFunction1.call(Ljava/lang/Object;)Ljava/lang/Object; at org.apache.spark.api.java.function.WrappedFunction1.apply(WrappedFunction1.scala:31) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:90) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:90) at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:37) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:89) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:89) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149) at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) When I googled this error, I found that, apparently, there is a bug with the FlatMapFunction: https://spark-project.atlassian.net/browse/SPARK-902 However, when I apply the suggested changes I get the following exception: 13/10/03 13:27:44 INFO cluster.ClusterTaskSetManager: Lost TID 0 (task 1.0:0) 13/10/03 13:27:44 INFO cluster.ClusterTaskSetManager: Loss was due to java.io.InvalidClassException java.io.InvalidClassException: org.apache.spark.api.java.function.FlatMapFunction; local class incompatible: stream classdesc serialVersionUID = -1748278142466443391, local class serialVersionUID = 2220150375729402137 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:604) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1970) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1894) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1970) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1894) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1970) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1894) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at scala.collection.immutable.$colon$colon.readObject(List.scala:435) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1872) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1970) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1894) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1970) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1894) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at scala.collection.immutable.$colon$colon.readObject(List.scala:435) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1872) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1970) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1894) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1777) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) at org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:67) at org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:124) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1816) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) So at this point I am a little stuck. What am I doing wrong? Thanks, Eduardo.
