Hello Abdeali, Thank you for your response. Can you please explain me this line, And the dropDuplicates at the end ensures records with two values for the same 'update_time' don't cause issues.
Sorry I didn't get quickly. :) On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <abdealikoth...@gmail.com> wrote: > I've faced this issue too - and a colleague pointed me to the > documentation - > https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates > dropDuplicates docs does not say that it will guarantee that it will > return the "first" record (even if you sort your dataframe) > It would give you any record it finds and just ensure that duplicates are > not present. > > The only way I know of how to do this is what you did, but you can avoid > the sorting inside the partition with something like (in pyspark): > > from pyspark.sql import Window, functions as F > df = df.withColumn('wanted_time', > F.min('update_time').over(Window.partitionBy('invoice_id'))) > out_df = df.filter(df['update_time'] == df['wanted_time']) > .drop('wanted_time').dropDuplicates('invoice_id', 'update_time') > > The min() is faster than doing an orderBy() and a row_number(). > And the dropDuplicates at the end ensures records with two values for the > same 'update_time' don't cause issues. > > > On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <chetan.opensou...@gmail.com> > wrote: > >> Hello Dear Spark Users, >> >> I am using dropDuplicate on a DataFrame generated from large parquet file >> from(HDFS) and doing dropDuplicate based on timestamp based column, every >> time I run it drops different - different rows based on same timestamp. >> >> What I tried and worked >> >> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time". >> desc) >> >> val irqDistinctDF = irqFilteredDF.withColumn("rn", >> row_number.over(wSpec)).where($"rn" === 1) .drop("rn").drop("update_time" >> ) >> >> But this is damn slow... >> >> Can someone please throw a light. >> >> Thanks >> >>