Hi I have the following Spark code which involves huge data shuffling even though using mapPartitionswithIndex() with shuffle false. I have 2 TB of skewed data to process and then convert rdd into dataframe and use it as table in hiveContext.sql(). I am using 60 executors with 20 GB memory and 4 cores. I specify spark.yarn.executor.memoryOverhead as 8500 which is high enough. I am using default settings for spark.shuffle.memoryFraction and spark.storage.memoryFraction I also tried to change its settings but none helped. I am using Spark 1.4.0 Please guide I am new to Spark help me optimize the following code. Thanks in advance.
JavaRDD<Row> indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new Function2<Integer, Iterator<Row>, Iterator<Row>>() { @Override public Iterator<Row> call(Integer ind, Iterator<Row> rowIterator) throws Exception { List<Row> rowList = new ArrayList<>(); while (rowIterator.hasNext()) { Row row = rowIterator.next(); List rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq())); Row updatedRow = RowFactory.create(rowAsList.toArray()); rowList.add(updatedRow); } return rowList.iterator(); } }, false).union(remainingRdd); DataFrame baseFrame = hiveContext.createDataFrame(indexedRdd,MySchema.class); hiveContext.registerDataFrameasTable(baseFrame,"baseTable"); hiveContext.sql("insert into abc bla bla using baseTable group by bla bla"); hiveContext.sql("insert into def bla bla using baseTable group by bla bla"); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-rdd-mapPartitionsWithIndex-hits-physical-memory-limit-after-huge-data-shuffle-tp24627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org