Where can I find your blog ? Thanks
> On Apr 29, 2015, at 7:14 PM, Ulanov, Alexander <alexander.ula...@hp.com> > wrote: > > After day of debugging (actually, more), I can answer my question: > The problem is that the default value 200 of “spark.sql.shuffle.partitions” > is too small for sorting 2B rows. It was hard to realize because Spark > executors just crash with various exceptions one by one. The other takeaway > is that Dataframe “order by” and RDD.sortBy are implemented in different > ways. BTW., why? > > Small synthetic test (copied from my blog): > Create 2B rows of MyRecord within 2000 partitions, so each partition will > have 1M of rows. > import sqlContext.implicits._ > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > case class MyRecord(time: Double, id: String) > val rdd = sc.parallelize(1 to 200, 200).flatMap(x => > Seq.fill(10000000)(MyRecord(util.Random.nextDouble, "xxx"))) > > Lets sort this RDD by time: > val sorted = rdd.sortBy(x => x.time) > result.count > > It finished in about 8 minutes on my cluster of 8 nodes. Everything's fine. > You can also check tasks that were completed in Spark web UI. The number of > reducers was equal to the number of partitions, i.e. 2000 > > Lets convert the original RDD to Dataframe and sort again: > val df = sqlContext.createDataFrame(rdd) > df.registerTempTable("data") > val result = sqlContext.sql("select * from data order by time") > result.count > > It will run for a while and then crash. If you check tasks in the Spark Web > UI, you will see that some of them were cancelled due to lost executors > (ExecutorLost) due to some strange Exceptions. It is really hard to trace > back which executor was first to be lost. The other follow it as in house of > cards. What's the problem? The number of reducers. For the first task it is > equal to the number of partitions, i.e. 2000, but for the second it switched > to 200. > > From: Ulanov, Alexander > Sent: Wednesday, April 29, 2015 1:08 PM > To: user@spark.apache.org > Subject: Sort (order by) of the big dataset > > Hi, > > I have a 2 billion records dataset witch schema <eventId: String, time: > Double, value: Double>. It is stored in Parquet format in HDFS, size 23GB. > Specs: Spark 1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space, > each node has 3 workers with 3GB memory. > > I keep failing to sort the mentioned dataset in Spark. I do the following: > val pf = sqlContext.parquetFile(“hdfs://my.net/data.parquet”) > pf.registerTempTable(“data”) > val sorted = sqlContext.sql(“select * from data order by time”) > sorted.saveAsParquetFile(“hdfs://my.net/data-sorted.parquet”) > > Spark starts to execute tasks and then errors like “Exector Lost” pop up in > the web UI (task mapPartitions at Exchange.scala and runJob at > newParquet.scala), giving different types of Exceptions in explanation. My > thinking is that the main problem is with “GC overhead limit” exception > however I observe exceptions related to connection time out and shuffling > write (java.lang.IllegalStateException: Shutdown in progress; > org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException > ). > > What I tried: > 1)Tried to “order by eventId” and by both “order by eventId, time” with the > same result. > 2)Looked at the shuffle parameters but the default do make sense. > 3)Tried to repartition the data I am loading from Parquet: val pf3000 = > pf.repartition(3000) in order to get smaller chunks of data passed to > executors (originally there are 300 partitions). It did not help either. > Surprisingly this dataset takes 50GB on hdfs versus 23GB that took the > original. > 4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It did > not help. > > Could you suggest what might be the problem and what is the workaround? Just > in case, I cannot have more RAM or more machines J > > Best regards, Alexander