How much memory do you have per partition? On Thu, Apr 4, 2019 at 7:49 AM Chetan Khatri <chetan.opensou...@gmail.com> wrote:
> I will get the information and will share with you. > > On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari <abdealikoth...@gmail.com> > wrote: > >> How long does it take to do the window solution ? (Also mention how many >> executors was your spark application using on average during that time) >> I am not aware of anything that is faster. When I ran is on my data >> ~8-9GB I think it took less than 5 mins (don't remember exact time) >> >> On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri <chetan.opensou...@gmail.com> >> wrote: >> >>> Thanks for awesome clarification / explanation. >>> >>> I have cases where update_time can be same. >>> I am in need of suggestions, where I have very large data like 5 GB, >>> this window based solution which I mentioned is taking very long time. >>> >>> Thanks again. >>> >>> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari < >>> abdealikoth...@gmail.com> wrote: >>> >>>> So, the above code for min() worked for me fine in general, but there >>>> was one corner case where it failed. >>>> Which was when I have something like: >>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000* >>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000* >>>> invoice_id=1, update_time=2018-02-03 14:00:00.000 >>>> >>>> In this example, the update_time for 2 records is the exact same. So, >>>> doing a filter for the min() will result in 2 records for the invoice_id=1. >>>> This is avoided in your code snippet of row_num - because 2 rows will >>>> never have row_num = 1 >>>> >>>> But note that here - row_num=1 and row_num=2 will be randomly ordered >>>> (because orderBy is on update_time and they have the same value of >>>> update_time). >>>> Hence dropDuplicates can be used there cause it can be either one of >>>> those rows. >>>> >>>> Overall - dropDuplicates seems like it's meant for cases where you >>>> literally have redundant duplicated data. And not for filtering to get >>>> first/last etc. >>>> >>>> >>>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri < >>>> chetan.opensou...@gmail.com> wrote: >>>> >>>>> Hello Abdeali, Thank you for your response. >>>>> >>>>> Can you please explain me this line, And the dropDuplicates at the end >>>>> ensures records with two values for the same 'update_time' don't cause >>>>> issues. >>>>> >>>>> Sorry I didn't get quickly. :) >>>>> >>>>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari < >>>>> abdealikoth...@gmail.com> wrote: >>>>> >>>>>> I've faced this issue too - and a colleague pointed me to the >>>>>> documentation - >>>>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates >>>>>> dropDuplicates docs does not say that it will guarantee that it will >>>>>> return the "first" record (even if you sort your dataframe) >>>>>> It would give you any record it finds and just ensure that duplicates >>>>>> are not present. >>>>>> >>>>>> The only way I know of how to do this is what you did, but you can >>>>>> avoid the sorting inside the partition with something like (in pyspark): >>>>>> >>>>>> from pyspark.sql import Window, functions as F >>>>>> df = df.withColumn('wanted_time', >>>>>> F.min('update_time').over(Window.partitionBy('invoice_id'))) >>>>>> out_df = df.filter(df['update_time'] == df['wanted_time']) >>>>>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time') >>>>>> >>>>>> The min() is faster than doing an orderBy() and a row_number(). >>>>>> And the dropDuplicates at the end ensures records with two values for >>>>>> the same 'update_time' don't cause issues. >>>>>> >>>>>> >>>>>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri < >>>>>> chetan.opensou...@gmail.com> wrote: >>>>>> >>>>>>> Hello Dear Spark Users, >>>>>>> >>>>>>> I am using dropDuplicate on a DataFrame generated from large parquet >>>>>>> file from(HDFS) and doing dropDuplicate based on timestamp based column, >>>>>>> every time I run it drops different - different rows based on same >>>>>>> timestamp. >>>>>>> >>>>>>> What I tried and worked >>>>>>> >>>>>>> val wSpec = Window.partitionBy($"invoice_ >>>>>>> id").orderBy($"update_time".desc) >>>>>>> >>>>>>> val irqDistinctDF = irqFilteredDF.withColumn("rn", >>>>>>> row_number.over(wSpec)).where($"rn" === 1) >>>>>>> .drop("rn").drop("update_time") >>>>>>> >>>>>>> But this is damn slow... >>>>>>> >>>>>>> Can someone please throw a light. >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> -- Thanks, Jason