Hi Dillon, Thank you for your reply. mapToPair use a PairFunction to transform format to a particular parquet format. I have tried to replace the mapToPair() function with other action operators like count() or collect(), but it didn't work. So I guess the shuffle write explosion problem have no concern with mapToPair().
Best Regrads, Yichen Dillon Dukek <dillon.du...@placed.com> 于2018年11月6日周二 上午7:21写道: > What is your function in mapToPair doing? > > -Dillon > > On Mon, Nov 5, 2018 at 1:41 PM Taylor Cox <taylor....@microsoft.com.invalid> > wrote: > >> At first glance, I wonder if your tables are partitioned? There may not >> be enough parallelism happening. You can also pass in the number of >> partitions and/or a custom partitioner to help Spark “guess” how to >> organize the shuffle. >> >> >> >> Have you seen any of these docs? >> >> >> https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf >> >> https://spark.apache.org/docs/latest/tuning.html >> >> >> >> Taylor >> >> >> >> >> >> *From:* Yichen Zhou <zhouy...@gmail.com> >> *Sent:* Sunday, November 4, 2018 11:42 PM >> *To:* user@spark.apache.org >> *Subject:* Shuffle write explosion >> >> >> >> Hi All, >> >> >> >> When running a spark job, I have 100MB+ input and get more than 700GB >> shuffle write, which is really weird. And this job finally end up with the >> OOM error. Does anybody know why this happened? >> >> [image: Screen Shot 2018-11-05 at 15.20.35.png] >> >> My code is like: >> >> JavaPairRDD<Text, Text> inputRDD = sc.sequenceFile(inputPath, Text.class, >> Text.class); >> >> >> >> inputRDD.repartition(partitionNum).mapToPair(...).saveAsNewAPIHadoopDataset(job.getConfiguration()) >> ; >> >> >> Environment: >> >> *CPU 32 core; Memory 256G; Storage 7.5G CentOS 7.5* >> >> *java version "1.8.0_162"* >> >> *Spark 2.1.2* >> >> >> Any help is greatly appreciated. >> >> >> >> Regards, >> >> Yichen >> >