- I assume your parquet files are compressed. Gzip or Snappy? - What spark version did you use? It seems at least 1.4. If you use spark-sql and tungsten, you might have better performance. but spark 1.5.2 gave me a wrong result when the data was about 300~400GB, just for a simple group-by and aggregate. - Did you use kyro serialization? - you should have spark.shuffle.compress=true, verify it. - How many tasks did you use? spark.default.parallelism=? - What about this: - Read the data day by day - compute a bucket id from timestamp, e.g., the date and hour - Write into different buckets (you probably need a special writer to write data efficiently without shuffling the data). - distinct for each bucket. Because each bucket is small, spark can get it done faster than having everything in one run. - I think using groupBy (userId, timestamp) might be better than distinct. I guess distinct() will compare every field.
On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com> wrote: > And the most frequent operation I am gonna do is find the UserID who have > some events, then retrieve all the events associted with the UserID. > > In this case, how should I partition to speed up the process? > > Thanks. > > On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com> wrote: > >> hey Ted, >> >> Event table is like this: UserID, EventType, EventKey, TimeStamp, >> MetaData. I just parse it from Json and save as Parquet, did not change >> the partition. >> >> Annoyingly, every day's incoming Event data having duplicates among each >> other. One same event could show up in Day1 and Day2 and probably Day3. >> >> I only want to keep single Event table and each day it come so many >> duplicates. >> >> Is there a way I could just insert into Parquet and if duplicate found, >> just ignore? >> >> Thanks, >> Gavin >> >> >> >> >> >> >> >> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Is your Parquet data source partitioned by date ? >>> >>> Can you dedup within partitions ? >>> >>> Cheers >>> >>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com> >>> wrote: >>> >>>> I tried on Three day's data. The total input is only 980GB, but the >>>> shuffle write Data is about 6.2TB, then the job failed during shuffle read >>>> step, which should be another 6.2TB shuffle read. >>>> >>>> I think to Dedup, the shuffling can not be avoided. Is there anything I >>>> could do to stablize this process? >>>> >>>> Thanks. >>>> >>>> >>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com> >>>> wrote: >>>> >>>>> Hey, >>>>> >>>>> I got everyday's Event table and want to merge them into a single >>>>> Event table. But there so many duplicates among each day's data. >>>>> >>>>> I use Parquet as the data source. What I am doing now is >>>>> >>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet >>>>> file"). >>>>> >>>>> Each day's Event is stored in their own Parquet file >>>>> >>>>> But it failed at the stage2 which keeps losing connection to one >>>>> executor. I guess this is due to the memory issue. >>>>> >>>>> Any suggestion how I do this efficiently? >>>>> >>>>> Thanks, >>>>> Gavin >>>>> >>>> >>>> >>> >> >