I would just map to pair using the id. Then do a reduceByKey where you compare the scores and keep the highest. Then do .values and that should do it.
Sent from my iPhone > On Jan 11, 2020, at 11:14 AM, Rishi Shah <rishishah.s...@gmail.com> wrote: > > > Thanks everyone for your contribution on this topic, I wanted to check-in to > see if anyone has discovered a different or have an opinion on better > approach to deduplicating data using pyspark. Would really appreciate any > further insight on this. > > Thanks, > -Rishi > >> On Wed, Jun 12, 2019 at 4:21 PM Yeikel <em...@yeikel.com> wrote: >> Nicholas , thank you for your explanation. >> >> I am also interested in the example that Rishi is asking for. I am sure >> mapPartitions may work , but as Vladimir suggests it may not be the best >> option in terms of performance. >> >> @Vladimir Prus , are you aware of any example about writing a "custom >> physical exec operator"? >> >> If anyone needs a further explanation for the follow up question Rishi >> posted , please see the example below : >> >> >> import org.apache.spark.sql.types._ >> import org.apache.spark.sql.Row >> >> >> val someData = Seq( >> Row(1, 10), >> Row(1, 20), >> Row(1, 11) >> ) >> >> val schema = List( >> StructField("id", IntegerType, true), >> StructField("score", IntegerType, true) >> ) >> >> val df = spark.createDataFrame( >> spark.sparkContext.parallelize(someData), >> StructType(schema) >> ) >> >> // Goal : Drop duplicates using the "id" as the primary key and keep the >> highest "score". >> >> df.sort($"score".desc).dropDuplicates("id").show >> >> == Physical Plan == >> *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)]) >> +- Exchange hashpartitioning(id#191, 200) >> +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192, >> false)]) >> +- *(1) Sort [score#192 DESC NULLS LAST], true, 0 >> +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200) >> +- Scan ExistingRDD[id#191,score#192] >> >> This seems to work , but I don't know what are the implications if we use >> this approach with a bigger dataset or what are the alternatives. From the >> explain output I can see the two Exchanges , so it may not be the best >> approach? >> >> >> >> >> >> >> >> -- >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > > > -- > Regards, > > Rishi Shah