It's a bit of a pain, but you could just use an outer join (assuming there are no duplicates in the input datasets, of course):
import org.apache.spark.sql.test.SharedSparkSession import org.scalatest.FunSpec class QuestionSpec extends FunSpec with SharedSparkSession { describe("spark list question") { it("should join based on id with one row only per id, based on the first dataset") { import testImplicits._ import org.apache.spark.sql.functions.when val ds1 = spark.createDataFrame(Seq( QuestionRecord(0, "dataset 1 record 1"), QuestionRecord(2, "dataset 1 record 2"), QuestionRecord(4, "dataset 1 record 3"), QuestionRecord(6, "dataset 1 record 4"), QuestionRecord(8, "dataset 1 record 5") )) val ds2 = spark.createDataFrame(Seq( QuestionRecord(0, "dataset 2 record 1"), QuestionRecord(3, "dataset 2 record 2"), QuestionRecord(6, "dataset 2 record 3"), QuestionRecord(9, "dataset 2 record 4"), QuestionRecord(12, "dataset 2 record 5") )) val allColumns = ds1.columns // Merge the datasets val ds3 = ds1.join(ds2, ds1("id") === ds2("id"), "outer") // Form new columns with the required value val ds4 = allColumns.foldLeft(ds3) { case (ds, nextColName) => ds.withColumn(s"new_$nextColName", when(ds1("id").isNotNull, ds1(nextColName)).otherwise(ds2(nextColName))) } // Drop old columns val ds5 = allColumns.foldLeft(ds4) { case (ds, nextColumnName) => ds.drop(ds1(nextColumnName)).drop(ds2(nextColumnName)) }.drop("id") // And get rid of our new_ marker val ds6 = allColumns.foldLeft(ds5) { case (ds, nextColumnName) => ds.withColumnRenamed(s"new_$nextColumnName", nextColumnName) } ds6.show() } } } case class QuestionRecord (id: Int, payload: String) On Fri, Sep 13, 2019 at 11:43 AM Abhinesh Hada <abhinesh...@gmail.com> wrote: > Hi, > > I am trying to take union of 2 dataframes and then drop duplicate based on > the value of a specific column. But, I want to make sure that while > dropping duplicates, the rows from first data frame are kept. > > Example: > df1 = df1.union(df2).dropDuplicates(['id']) > > >