Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"
Hi Abhinesh, As drop duplicates keeps first record, you can keep some id for 1st and 2nd df and then Union -> sort on that id -> drop duplicates. This will ensure records from 1st df is kept and 2nd are dropped. Regards Dhaval On Sat, Sep 14, 2019 at 4:41 PM Abhinesh Hada wrote: > Hey Nathan, > > As the dataset is very huge, I am looking for ways that involve minimum > joins. I will give a try to your approach. > Thanks a lot for your help. > > On Sat, Sep 14, 2019 at 12:58 AM Nathan Kronenfeld > wrote: > >> It's a bit of a pain, but you could just use an outer join (assuming >> there are no duplicates in the input datasets, of course): >> >> import org.apache.spark.sql.test.SharedSparkSession >> import org.scalatest.FunSpec >> >> class QuestionSpec extends FunSpec with SharedSparkSession { >> describe("spark list question") { >> it("should join based on id with one row only per id, based on the >> first dataset") { >> import testImplicits._ >> import org.apache.spark.sql.functions.when >> >> val ds1 = spark.createDataFrame(Seq( >> QuestionRecord(0, "dataset 1 record 1"), >> QuestionRecord(2, "dataset 1 record 2"), >> QuestionRecord(4, "dataset 1 record 3"), >> QuestionRecord(6, "dataset 1 record 4"), >> QuestionRecord(8, "dataset 1 record 5") >> )) >> val ds2 = spark.createDataFrame(Seq( >> QuestionRecord(0, "dataset 2 record 1"), >> QuestionRecord(3, "dataset 2 record 2"), >> QuestionRecord(6, "dataset 2 record 3"), >> QuestionRecord(9, "dataset 2 record 4"), >> QuestionRecord(12, "dataset 2 record 5") >> )) >> >> val allColumns = ds1.columns >> >> // Merge the datasets >> val ds3 = ds1.join(ds2, ds1("id") === ds2("id"), "outer") >> >> // Form new columns with the required value >> val ds4 = allColumns.foldLeft(ds3) { case (ds, nextColName) => >> ds.withColumn(s"new_$nextColName", when(ds1("id").isNotNull, >> ds1(nextColName)).otherwise(ds2(nextColName))) >> } >> >> // Drop old columns >> val ds5 = allColumns.foldLeft(ds4) { case (ds, nextColumnName) => >> ds.drop(ds1(nextColumnName)).drop(ds2(nextColumnName)) >> }.drop("id") >> >> // And get rid of our new_ marker >> val ds6 = allColumns.foldLeft(ds5) { case (ds, nextColumnName) => >> ds.withColumnRenamed(s"new_$nextColumnName", nextColumnName) >> } >> >> ds6.show() >> } >> } >> } >> >> case class QuestionRecord (id: Int, payload: String) >> >> On Fri, Sep 13, 2019 at 11:43 AM Abhinesh Hada >> wrote: >> >>> Hi, >>> >>> I am trying to take union of 2 dataframes and then drop duplicate based >>> on the value of a specific column. But, I want to make sure that while >>> dropping duplicates, the rows from first data frame are kept. >>> >>> Example: >>> df1 = df1.union(df2).dropDuplicates(['id']) >>> >>> >>>
Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"
Hey Nathan, As the dataset is very huge, I am looking for ways that involve minimum joins. I will give a try to your approach. Thanks a lot for your help. On Sat, Sep 14, 2019 at 12:58 AM Nathan Kronenfeld wrote: > It's a bit of a pain, but you could just use an outer join (assuming there > are no duplicates in the input datasets, of course): > > import org.apache.spark.sql.test.SharedSparkSession > import org.scalatest.FunSpec > > class QuestionSpec extends FunSpec with SharedSparkSession { > describe("spark list question") { > it("should join based on id with one row only per id, based on the > first dataset") { > import testImplicits._ > import org.apache.spark.sql.functions.when > > val ds1 = spark.createDataFrame(Seq( > QuestionRecord(0, "dataset 1 record 1"), > QuestionRecord(2, "dataset 1 record 2"), > QuestionRecord(4, "dataset 1 record 3"), > QuestionRecord(6, "dataset 1 record 4"), > QuestionRecord(8, "dataset 1 record 5") > )) > val ds2 = spark.createDataFrame(Seq( > QuestionRecord(0, "dataset 2 record 1"), > QuestionRecord(3, "dataset 2 record 2"), > QuestionRecord(6, "dataset 2 record 3"), > QuestionRecord(9, "dataset 2 record 4"), > QuestionRecord(12, "dataset 2 record 5") > )) > > val allColumns = ds1.columns > > // Merge the datasets > val ds3 = ds1.join(ds2, ds1("id") === ds2("id"), "outer") > > // Form new columns with the required value > val ds4 = allColumns.foldLeft(ds3) { case (ds, nextColName) => > ds.withColumn(s"new_$nextColName", when(ds1("id").isNotNull, > ds1(nextColName)).otherwise(ds2(nextColName))) > } > > // Drop old columns > val ds5 = allColumns.foldLeft(ds4) { case (ds, nextColumnName) => > ds.drop(ds1(nextColumnName)).drop(ds2(nextColumnName)) > }.drop("id") > > // And get rid of our new_ marker > val ds6 = allColumns.foldLeft(ds5) { case (ds, nextColumnName) => > ds.withColumnRenamed(s"new_$nextColumnName", nextColumnName) > } > > ds6.show() > } > } > } > > case class QuestionRecord (id: Int, payload: String) > > On Fri, Sep 13, 2019 at 11:43 AM Abhinesh Hada > wrote: > >> Hi, >> >> I am trying to take union of 2 dataframes and then drop duplicate based >> on the value of a specific column. But, I want to make sure that while >> dropping duplicates, the rows from first data frame are kept. >> >> Example: >> df1 = df1.union(df2).dropDuplicates(['id']) >> >> >>
Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"
It's a bit of a pain, but you could just use an outer join (assuming there are no duplicates in the input datasets, of course): import org.apache.spark.sql.test.SharedSparkSession import org.scalatest.FunSpec class QuestionSpec extends FunSpec with SharedSparkSession { describe("spark list question") { it("should join based on id with one row only per id, based on the first dataset") { import testImplicits._ import org.apache.spark.sql.functions.when val ds1 = spark.createDataFrame(Seq( QuestionRecord(0, "dataset 1 record 1"), QuestionRecord(2, "dataset 1 record 2"), QuestionRecord(4, "dataset 1 record 3"), QuestionRecord(6, "dataset 1 record 4"), QuestionRecord(8, "dataset 1 record 5") )) val ds2 = spark.createDataFrame(Seq( QuestionRecord(0, "dataset 2 record 1"), QuestionRecord(3, "dataset 2 record 2"), QuestionRecord(6, "dataset 2 record 3"), QuestionRecord(9, "dataset 2 record 4"), QuestionRecord(12, "dataset 2 record 5") )) val allColumns = ds1.columns // Merge the datasets val ds3 = ds1.join(ds2, ds1("id") === ds2("id"), "outer") // Form new columns with the required value val ds4 = allColumns.foldLeft(ds3) { case (ds, nextColName) => ds.withColumn(s"new_$nextColName", when(ds1("id").isNotNull, ds1(nextColName)).otherwise(ds2(nextColName))) } // Drop old columns val ds5 = allColumns.foldLeft(ds4) { case (ds, nextColumnName) => ds.drop(ds1(nextColumnName)).drop(ds2(nextColumnName)) }.drop("id") // And get rid of our new_ marker val ds6 = allColumns.foldLeft(ds5) { case (ds, nextColumnName) => ds.withColumnRenamed(s"new_$nextColumnName", nextColumnName) } ds6.show() } } } case class QuestionRecord (id: Int, payload: String) On Fri, Sep 13, 2019 at 11:43 AM Abhinesh Hada wrote: > Hi, > > I am trying to take union of 2 dataframes and then drop duplicate based on > the value of a specific column. But, I want to make sure that while > dropping duplicates, the rows from first data frame are kept. > > Example: > df1 = df1.union(df2).dropDuplicates(['id']) > > >
Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"
If you only care that you're deduping on one of the fields you could add an index and count like so: df3 = df1.withColumn('idx',lit(1)) .union(df2.withColumn('idx',lit(2)) remove_df = df3 .groupBy('id') .agg(collect_set('idx').alias('set_size') .filter(size(col('set_size') > 1)) .select('id', lit(2).alias('idx')) # the duplicated ids in the above are now coded for df2, so only those will be dropped df3.join(remove_df, on=['id','idx'], how='leftanti') On Fri, Sep 13, 2019 at 11:44 AM Abhinesh Hada wrote: > Hi, > > I am trying to take union of 2 dataframes and then drop duplicate based on > the value of a specific column. But, I want to make sure that while > dropping duplicates, the rows from first data frame are kept. > > Example: > df1 = df1.union(df2).dropDuplicates(['id']) > > > -- *Patrick McCarthy * Senior Data Scientist, Machine Learning Engineering Dstillery 470 Park Ave South, 17th Floor, NYC 10016