Where can I find your blog ?

Thanks



> On Apr 29, 2015, at 7:14 PM, Ulanov, Alexander <alexander.ula...@hp.com> 
> wrote:
> 
> After day of debugging (actually, more), I can answer my question:
> The problem is that the default value 200 of “spark.sql.shuffle.partitions” 
> is too small for sorting 2B rows. It was hard to realize because Spark 
> executors just crash with various exceptions one by one. The other takeaway 
> is that Dataframe “order by” and RDD.sortBy are implemented in different 
> ways. BTW., why?
>  
> Small synthetic test (copied from my blog):
> Create 2B rows of MyRecord within 2000 partitions, so each partition will 
> have 1M of rows.
> import sqlContext.implicits._
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> case class MyRecord(time: Double, id: String)
> val rdd = sc.parallelize(1 to 200, 200).flatMap(x => 
> Seq.fill(10000000)(MyRecord(util.Random.nextDouble, "xxx")))
>  
> Lets sort this RDD by time:
> val sorted = rdd.sortBy(x => x.time)
> result.count
>  
> It finished in about 8 minutes on my cluster of 8 nodes. Everything's fine. 
> You can also check tasks that were completed in Spark web UI. The number of 
> reducers was equal to the number of partitions, i.e. 2000
>  
> Lets convert the original RDD to Dataframe and sort again:
> val df = sqlContext.createDataFrame(rdd)
> df.registerTempTable("data")
> val result = sqlContext.sql("select * from data order by time")
> result.count
>  
> It will run for a while and then crash. If you check tasks in the Spark Web 
> UI, you will see that some of them were cancelled due to lost executors 
> (ExecutorLost) due to some strange Exceptions. It is really hard to trace 
> back which executor was first to be lost. The other follow it as in house of 
> cards. What's the problem? The number of reducers. For the first task it is 
> equal to the number of partitions, i.e. 2000, but for the second it switched 
> to 200.
>  
> From: Ulanov, Alexander 
> Sent: Wednesday, April 29, 2015 1:08 PM
> To: user@spark.apache.org
> Subject: Sort (order by) of the big dataset
>  
> Hi,
>  
> I have a 2 billion records dataset witch schema <eventId: String, time: 
> Double, value: Double>. It is stored in Parquet format in HDFS, size 23GB. 
> Specs: Spark 1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space, 
> each node has 3 workers with 3GB memory.
>  
> I keep failing to sort the mentioned dataset in Spark. I do the following:
> val pf = sqlContext.parquetFile(“hdfs://my.net/data.parquet”)
> pf.registerTempTable(“data”)
> val sorted = sqlContext.sql(“select * from data order by time”)
> sorted.saveAsParquetFile(“hdfs://my.net/data-sorted.parquet”)
>  
> Spark starts to execute tasks and then errors like “Exector Lost” pop up in 
> the web UI (task mapPartitions at Exchange.scala and runJob at 
> newParquet.scala), giving different types of Exceptions in explanation. My 
> thinking is that the main problem is with “GC overhead limit” exception 
> however I observe exceptions related to connection time out and shuffling 
> write (java.lang.IllegalStateException: Shutdown in progress; 
> org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException 
> ).
>  
> What I tried:
> 1)Tried to “order by eventId” and by both “order by eventId, time” with the 
> same result.
> 2)Looked at the shuffle parameters but the default do make sense.
> 3)Tried to repartition the data I am loading from Parquet:  val pf3000 = 
> pf.repartition(3000) in order to get smaller chunks of data passed to 
> executors (originally there are 300 partitions). It did not help either. 
> Surprisingly this dataset takes 50GB on hdfs versus 23GB that took the 
> original.
> 4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It did 
> not help.
>  
> Could you suggest what might be the problem and what is the workaround? Just 
> in case, I cannot have more RAM or more machines J
>  
> Best  regards, Alexander

Reply via email to