Hi, I have a 5 million record, 300 column data set. I am running a spark job in yarn-cluster mode, with the following args --driver-memory 11G --executor-memory 11G --executor-cores 16 --num-executors 500 The spark job replaces all categorical variables with some integers. I am getting the below error when I try to save the transformed data set.
java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded) java.util.Arrays.copyOfRange(Arrays.java:3209) java.lang.String.<init>(String.java:215) java.lang.StringBuilder.toString(StringBuilder.java:430) java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3023) java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2819) java.io.ObjectInputStream.readString(ObjectInputStream.java:1598) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176) scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) scala.collection.AbstractIterator.to(Iterator.scala:1157) scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257) scala.collection.AbstractIterator.toList(Iterator.scala:1157) scala.collection.immutable.List.$plus$plus(List.scala:193) DataProcessor.DataTypeConverter2$$anonfun$6.apply(DataTypeConverter2.scala:137) DataProcessor.DataTypeConverter2$$anonfun$6.apply(DataTypeConverter2.scala:137) The code is as follows: val transformedData = splitFileWithHeader.flatMap(rowArray => { try { if (rowArray.sameElements(header.value)) { None } else { val transformedArray: Array[String] = new Array[String](rowArray.length) for (i <- 0 until rowArray.length) { // Check 1 to see if the value should be replaced, Check 2 to see if its a null value in which case, we do not update the value if (broadcastReplacements.value(i) != null && rowArray(i).trim.toString != "") { transformedArray.update(i, broadcastReplacements.value(i)(rowArray(i).trim.toString).toString) } else { transformedArray.update(i, rowArray(i).trim.toString) } } Array(transformedArray.deep.mkString(",")) } } catch { case _: Throwable => { println("Failure in transforming the file, 1 line, Around Line 131") None } } }).coalesce(1, true).mapPartitions( it => (Seq(headerLine.value) ++ it).iterator,true).coalesce(500) // Save the Transformed Data File transformedData.saveAsTextFile(outputFileLocation) Any idea how I can resolve this error? Previous stages have completed successfully. Thank You! Vinay Prior Stages val dataFile = sc.textFile(args(1),500) //Get the first line which is the header, which would also contain the column type val columnDefinition = dataFile.first val headerLine = sc.broadcast(columnDefinition) val header = sc.broadcast(columnDefinition.split(",",-1)) // Remove the Header val modifiedDataFile = dataFile.filter(line => line != headerLine.value) val onlySplitFile = modifiedDataFile.flatMap(line => { try { // println(line.split(' ').length) // println(line.split(' ')) if (line.split(',').length < 1 || line.split(',').sameElements(Array(""))) { None } else { Array(line.split(",",-1)) } } catch { case _: Throwable => None } }) modifiedDataFile.unpersist(true) val currentColumn = sc.broadcast(i) val distinctValues = onlySplitFile.flatMap(rowArray => { try { Array(rowArray(currentColumn.value).toString.trim.toString) } catch { case _: Throwable => { println("Failure in Finding the Map: Around Line 72") None } } }).distinct(500).collect