Btw the node only has 4GB memory so does the spark.executor.memory make sense... Should i instead make it around 2-3GB. ALso how different is this parameter from SPARK_MEM
Thanks, Saurabh On Fri, Dec 6, 2013 at 8:26 AM, learner1014 all <[email protected]>wrote: > Still see a whole lot of following erros > java.lang.OutOfMemoryError: Java heap space > 13/12/05 16:04:13 INFO executor.StandaloneExecutorBackend: Got assigned > task 553 > 13/12/05 16:04:13 INFO executor.Executor: Running task ID 553 > > Issue seems to be that the process hangs as we are probably performing > full GC cycles... > 1536.617: [Full GC 1536.617: [CMS: 707839K->707839K(707840K), 5.0507000 > secs] 1014527K->1014527K(1014528K), [CMS Perm : 31955K->31955K(53572K)], > 5.0507940 secs] [Times: user=4.94 sys=0.00, real=5.05 secs] > 1541.669: [Full GC 1541.669: [CMS: 707840K->707839K(707840K), 4.5483600 > secs] 1014527K->1014527K(1014528K), [CMS Perm : 31955K->31955K(53572K)], > 4.5484390 secs] [Times: user=4.47 sys=0.00, real=4.55 secs] > 1546.218: [Full GC 1546.218: [CMS: 707839K->707839K(707840K), 4.5937460 > secs] 1014527K->1014527K(1014528K), [CMS Perm : 31955K->31955K(53572K)], > 4.5938460 secs] [Times: user=4.59 sys=0.00, real=4.60 secs] > 1550.812: [Full GC 1550.812: [CMS: 707839K->707839K(707840K), 5.3572370 > secs] 1014527K->1014527K(1014528K), [CMS Perm : 31955K->31955K(53572K)], > 5.3573840 secs] [Times: user=5.26 sys=0.01, real=5.35 secs] > 1556.171: [Full GC 1556.171: [CMS: 707840K->694574K(707840K), 4.1462520 > secs] 1014528K->860511K(1014528K), [CMS Perm : 31955K->31955K(53572K)], > 4.1463350 secs] [Times: user=4.13 sys=0.00, real=4.15 secs] > 1560.329: [GC [1 CMS-initial-mark: 694574K(707840K)] 874378K(1014528K), > 0.4269160 secs] [Times: user=0.41 sys=0.00, real=0.43 secs] > > > I tried the following parameters and they do not seem to help > > System.setProperty("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > System.setProperty("spark.akka.timeout", "30") //in seconds > > System.setProperty("spark.executor.memory","15g") > System.setProperty("spark.akka.frameSize", "2000") //in MB > System.setProperty("spark.akka.threads","8") > > Thanks > > > On Thu, Dec 5, 2013 at 11:31 PM, purav aggarwal < > [email protected]> wrote: > >> Try allocating some more resources to your application. >> You seem to be using 512Mb for you worker node - (you can verify that >> from the master UI) >> >> Try putting the following settings into your code and see if it helps - >> >> System.setProperty("spark.executor.memory","15g") // Will allocate more >> memory >> System.setProperty("spark.akka.frameSize","2000") >> System.setProperty("spark.akka.threads","16") // Dependent upon >> number of cores with your worker machine >> >> >> On Fri, Dec 6, 2013 at 1:06 AM, learner1014 all <[email protected]>wrote: >> >>> Hi, >>> >>> Trying to do a join operation on an RDD, my input is pipe delimited data >>> and there are 2 files. >>> One file is 24MB and the other file is 285MB. >>> Setup being used is the single node (server) setup: SPARK_MEM set to 512m >>> >>> Master >>> /pkg/java/jdk1.7.0_11/bin/java -cp >>> :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar >>> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps >>> -Dspark.boundedMemoryCache.memoryFraction=0.4 >>> -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC >>> -Djava.library.path= -Xms512m -Xmx512m >>> org.apache.spark.deploy.master.Master --ip localhost --port 7077 >>> --webui-port 8080 >>> >>> Worker >>> /pkg/java/jdk1.7.0_11/bin/java -cp >>> :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar >>> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps >>> -Dspark.boundedMemoryCache.memoryFraction=0.4 >>> -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC >>> -Djava.library.path= -Xms512m -Xmx512m >>> org.apache.spark.deploy.worker.Worker spark://localhost:7077 >>> >>> >>> App >>> /pkg/java/jdk1.7.0_11/bin/java -cp >>> :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes >>> -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps >>> -Dspark.boundedMemoryCache.memoryFraction=0.4 >>> -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC >>> -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend >>> akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4 >>> >>> >>> Here is the code >>> import org.apache.spark.SparkContext >>> import org.apache.spark.SparkContext._ >>> import org.apache.spark.storage.StorageLevel >>> >>> object SimpleApp { >>> >>> def main (args: Array[String]) { >>> >>> >>> System.setProperty("spark.local.dir","/spark-0.8.0-incubating-bin-cdh4/tmp"); >>> System.setProperty("spark.serializer", >>> "org.apache.spark.serializer.KryoSerializer") >>> System.setProperty("spark.akka.timeout", "30") //in seconds >>> >>> val dataFile2 = "/tmp_data/data1.txt" >>> val dataFile1 = "/tmp_data/data2.txt" >>> val sc = new SparkContext("spark://localhost:7077", "Simple App", >>> "/spark-0.8.0-incubating-bin-cdh4", >>> List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar")) >>> >>> val data10 = sc.textFile(dataFile1, 128) >>> val data11 = data10.map(x => x.split("|")) >>> val data12 = data11.map( x => (x(1).toInt -> x) ) >>> >>> >>> val data20 = sc.textFile(dataFile2, 128) >>> val data21 = data20.map(x => x.split("|")) >>> val data22 = data21.map(x => (x(1).toInt -> x)) >>> >>> >>> val data3 = data12.join(data22, 128) >>> val data4 = data3.distinct(4) >>> val numAs = data10.count() >>> val numBs = data20.count() >>> val numCs = data3.count() >>> val numDs = data4.count() >>> println("Lines in 1: %s, Lines in 2: %s Lines in 3: %s Lines in 4: >>> %s".format(numAs, numBs, numCs, numDs)) >>> data4.foreach(println) >>> } >>> >>> I see the following errors >>> 13/12/04 10:53:55 WARN storage.BlockManagerMaster: Error sending message >>> to BlockManagerMaster in 1 attempts >>> java.util.concurrent.TimeoutException: Futures timed out after [10000] >>> milliseconds >>> at akka.dispatch.DefaultPromise.ready(Future.scala:870) >>> at akka.dispatch.DefaultPromise.result(Future.scala:874) >>> at akka.dispatch.Await$.result(Future.scala:74) >>> >>> and >>> 13/12/04 10:53:55 ERROR executor.Executor: Exception in task ID 517 >>> java.lang.OutOfMemoryError: Java heap space >>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:448) >>> at >>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:282) >>> at >>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:262) >>> at >>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>> at >>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) >>> at >>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) >>> at >>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>> at >>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:106) >>> at >>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:101) >>> at >>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>> >>> Lots of hem actually... >>> >>> >>> To give some additional information, i just added single columns in both >>> files and passed them through this program and encountered the same issue. >>> Out of memory and other errors. >>> >>> What did work was removal of the following lines: >>> >>> val data21 = data20.map(x => x.split("|")) >>> val data22 = data21.map(x => (x(1).toInt -> x)) >>> >>> which were replaced by: >>> val data22 = data20.map(x => (x.toInt -> x)) >>> >>> However as soon as i add additional columns this is of-course not going >>> to work. >>> So can someone explain this and any suggestions are most welcome. >>> Any help is helpful. >>> Thanks >>> >> >> >
