Hi Benyi thanks for the reply yes I call each hive partition/ hdfs directory in one thread so that I can make it faster if I dont use threads then job is even more slow. Like I mentioned I have to process 2000 hive partitions so 2000 hdfs direcotories containing ORC files right? If I dont use threads then these 2000 directories will get processed one by one. By using Executor Service threads I can make it faster by using thread pool of 20 jobs so that at a time 20 jobs are running in one main job.
On Fri, Dec 11, 2015 at 12:49 AM, Benyi Wang <bewang.t...@gmail.com> wrote: > I don't understand this: "I have the following method code which I call it > from a thread spawn from spark driver. So in this case 2000 threads ..." > > Why do you call it from a thread? > Are you process one partition in one thread? > > On Thu, Dec 10, 2015 at 11:13 AM, Benyi Wang <bewang.t...@gmail.com> > wrote: > >> DataFrame filterFrame1 = >> sourceFrame.filter(col("col1").contains("xyz"));DataFrame frameToProcess = >> sourceFrame.except(filterFrame1); >> >> except is really expensive. Do you actually want this: >> >> sourceFrame.filter(! col("col1").contains("xyz")) >> >> >> >> On Thu, Dec 10, 2015 at 9:57 AM, unk1102 <umesh.ka...@gmail.com> wrote: >> >>> Hi I have spark job which reads Hive-ORC data and processes and >>> generates csv >>> file in the end. Now this ORC files are hive partitions and I have around >>> 2000 partitions to process every day. These hive partitions size is >>> around >>> 800 GB in HDFS. I have the following method code which I call it from a >>> thread spawn from spark driver. So in this case 2000 threads gets >>> processed >>> and those runs painfully slow around 12 hours making huge data shuffling >>> each executor shuffles around 50 GB of data. I am using 40 executors of 4 >>> core and 30 GB memory each. I am using Hadoop 2.6 and Spark 1.5.2 >>> release. >>> >>> public void callThisFromThread() { >>> DataFrame sourceFrame = >>> hiveContext.read().format("orc").load("/path/in/hdfs"); >>> DataFrame filterFrame1 = sourceFrame.filter(col("col1").contains("xyz")); >>> DataFrame frameToProcess = sourceFrame.except(filterFrame1); >>> JavaRDD<Rows> updatedRDD = frameToProcess.toJavaRDD().mapPartitions() { >>> ..... >>> } >>> DataFrame updatedFrame = >>> hiveContext.createDataFrame(updatedRdd,sourceFrame.schema()); >>> DataFrame selectFrame = updatedFrame.select("col1","col2...","col8"); >>> DataFrame groupFrame = >>> selectFrame.groupBy("col1","col2....","col8").agg("......");//8 column >>> group >>> by >>> groupFrame.coalesec(1).save();//save as csv only one file so coalesce(1) >>> } >>> >>> Please guide me how can I optimize above code I cant avoid group by >>> which is >>> evil I know I have to do group on 8 fields mentioned above. >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-this-Spark-1-5-2-code-fast-and-shuffle-less-data-tp25671.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >