Still see a whole lot of following erros java.lang.OutOfMemoryError: Java heap space 13/12/05 16:04:13 INFO executor.StandaloneExecutorBackend: Got assigned task 553 13/12/05 16:04:13 INFO executor.Executor: Running task ID 553
Issue seems to be that the process hangs as we are probably performing full GC cycles... 1536.617: [Full GC 1536.617: [CMS: 707839K->707839K(707840K), 5.0507000 secs] 1014527K->1014527K(1014528K), [CMS Perm : 31955K->31955K(53572K)], 5.0507940 secs] [Times: user=4.94 sys=0.00, real=5.05 secs] 1541.669: [Full GC 1541.669: [CMS: 707840K->707839K(707840K), 4.5483600 secs] 1014527K->1014527K(1014528K), [CMS Perm : 31955K->31955K(53572K)], 4.5484390 secs] [Times: user=4.47 sys=0.00, real=4.55 secs] 1546.218: [Full GC 1546.218: [CMS: 707839K->707839K(707840K), 4.5937460 secs] 1014527K->1014527K(1014528K), [CMS Perm : 31955K->31955K(53572K)], 4.5938460 secs] [Times: user=4.59 sys=0.00, real=4.60 secs] 1550.812: [Full GC 1550.812: [CMS: 707839K->707839K(707840K), 5.3572370 secs] 1014527K->1014527K(1014528K), [CMS Perm : 31955K->31955K(53572K)], 5.3573840 secs] [Times: user=5.26 sys=0.01, real=5.35 secs] 1556.171: [Full GC 1556.171: [CMS: 707840K->694574K(707840K), 4.1462520 secs] 1014528K->860511K(1014528K), [CMS Perm : 31955K->31955K(53572K)], 4.1463350 secs] [Times: user=4.13 sys=0.00, real=4.15 secs] 1560.329: [GC [1 CMS-initial-mark: 694574K(707840K)] 874378K(1014528K), 0.4269160 secs] [Times: user=0.41 sys=0.00, real=0.43 secs] I tried the following parameters and they do not seem to help System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") System.setProperty("spark.akka.timeout", "30") //in seconds System.setProperty("spark.executor.memory","15g") System.setProperty("spark.akka.frameSize", "2000") //in MB System.setProperty("spark.akka.threads","8") Thanks On Thu, Dec 5, 2013 at 11:31 PM, purav aggarwal <puravaggarwal...@gmail.com>wrote: > Try allocating some more resources to your application. > You seem to be using 512Mb for you worker node - (you can verify that from > the master UI) > > Try putting the following settings into your code and see if it helps - > > System.setProperty("spark.executor.memory","15g") // Will allocate more > memory > System.setProperty("spark.akka.frameSize","2000") > System.setProperty("spark.akka.threads","16") // Dependent upon > number of cores with your worker machine > > > On Fri, Dec 6, 2013 at 1:06 AM, learner1014 all <learner1...@gmail.com>wrote: > >> Hi, >> >> Trying to do a join operation on an RDD, my input is pipe delimited data >> and there are 2 files. >> One file is 24MB and the other file is 285MB. >> Setup being used is the single node (server) setup: SPARK_MEM set to 512m >> >> Master >> /pkg/java/jdk1.7.0_11/bin/java -cp >> :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar >> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps >> -Dspark.boundedMemoryCache.memoryFraction=0.4 >> -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC >> -Djava.library.path= -Xms512m -Xmx512m >> org.apache.spark.deploy.master.Master --ip localhost --port 7077 >> --webui-port 8080 >> >> Worker >> /pkg/java/jdk1.7.0_11/bin/java -cp >> :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar >> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps >> -Dspark.boundedMemoryCache.memoryFraction=0.4 >> -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC >> -Djava.library.path= -Xms512m -Xmx512m >> org.apache.spark.deploy.worker.Worker spark://localhost:7077 >> >> >> App >> /pkg/java/jdk1.7.0_11/bin/java -cp >> :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes >> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps >> -Dspark.boundedMemoryCache.memoryFraction=0.4 >> -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC >> -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend >> akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4 >> >> >> Here is the code >> import org.apache.spark.SparkContext >> import org.apache.spark.SparkContext._ >> import org.apache.spark.storage.StorageLevel >> >> object SimpleApp { >> >> def main (args: Array[String]) { >> >> >> System.setProperty("spark.local.dir","/spark-0.8.0-incubating-bin-cdh4/tmp"); >> System.setProperty("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer") >> System.setProperty("spark.akka.timeout", "30") //in seconds >> >> val dataFile2 = "/tmp_data/data1.txt" >> val dataFile1 = "/tmp_data/data2.txt" >> val sc = new SparkContext("spark://localhost:7077", "Simple App", >> "/spark-0.8.0-incubating-bin-cdh4", >> List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar")) >> >> val data10 = sc.textFile(dataFile1, 128) >> val data11 = data10.map(x => x.split("|")) >> val data12 = data11.map( x => (x(1).toInt -> x) ) >> >> >> val data20 = sc.textFile(dataFile2, 128) >> val data21 = data20.map(x => x.split("|")) >> val data22 = data21.map(x => (x(1).toInt -> x)) >> >> >> val data3 = data12.join(data22, 128) >> val data4 = data3.distinct(4) >> val numAs = data10.count() >> val numBs = data20.count() >> val numCs = data3.count() >> val numDs = data4.count() >> println("Lines in 1: %s, Lines in 2: %s Lines in 3: %s Lines in 4: >> %s".format(numAs, numBs, numCs, numDs)) >> data4.foreach(println) >> } >> >> I see the following errors >> 13/12/04 10:53:55 WARN storage.BlockManagerMaster: Error sending message >> to BlockManagerMaster in 1 attempts >> java.util.concurrent.TimeoutException: Futures timed out after [10000] >> milliseconds >> at akka.dispatch.DefaultPromise.ready(Future.scala:870) >> at akka.dispatch.DefaultPromise.result(Future.scala:874) >> at akka.dispatch.Await$.result(Future.scala:74) >> >> and >> 13/12/04 10:53:55 ERROR executor.Executor: Exception in task ID 517 >> java.lang.OutOfMemoryError: Java heap space >> at com.esotericsoftware.kryo.io.Input.readString(Input.java:448) >> at >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:282) >> at >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:262) >> at >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >> at >> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) >> at >> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) >> at >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >> at >> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:106) >> at >> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:101) >> at >> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >> >> Lots of hem actually... >> >> >> To give some additional information, i just added single columns in both >> files and passed them through this program and encountered the same issue. >> Out of memory and other errors. >> >> What did work was removal of the following lines: >> >> val data21 = data20.map(x => x.split("|")) >> val data22 = data21.map(x => (x(1).toInt -> x)) >> >> which were replaced by: >> val data22 = data20.map(x => (x.toInt -> x)) >> >> However as soon as i add additional columns this is of-course not going >> to work. >> So can someone explain this and any suggestions are most welcome. >> Any help is helpful. >> Thanks >> > >