Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"

2019-09-14 Thread Dhaval Patel
Hi Abhinesh,

As drop duplicates keeps first record, you can keep some id for 1st and 2nd
df and then
Union -> sort  on that id -> drop duplicates.
This will ensure records from 1st df is kept and 2nd are dropped.

Regards
Dhaval

On Sat, Sep 14, 2019 at 4:41 PM Abhinesh Hada  wrote:

> Hey Nathan,
>
> As the dataset is very huge, I am looking for ways that involve minimum
> joins. I will give a try to your approach.
> Thanks a lot for your help.
>
> On Sat, Sep 14, 2019 at 12:58 AM Nathan Kronenfeld
>  wrote:
>
>> It's a bit of a pain, but you could just use an outer join (assuming
>> there are no duplicates in the input datasets, of course):
>>
>> import org.apache.spark.sql.test.SharedSparkSession
>> import org.scalatest.FunSpec
>>
>> class QuestionSpec extends FunSpec with SharedSparkSession {
>>   describe("spark list question") {
>> it("should join based on id with one row only per id, based on the
>> first dataset") {
>>   import testImplicits._
>>   import org.apache.spark.sql.functions.when
>>
>>   val ds1 = spark.createDataFrame(Seq(
>> QuestionRecord(0, "dataset 1 record 1"),
>> QuestionRecord(2, "dataset 1 record 2"),
>> QuestionRecord(4, "dataset 1 record 3"),
>> QuestionRecord(6, "dataset 1 record 4"),
>> QuestionRecord(8, "dataset 1 record 5")
>>   ))
>>   val ds2 = spark.createDataFrame(Seq(
>> QuestionRecord(0, "dataset 2 record 1"),
>> QuestionRecord(3, "dataset 2 record 2"),
>> QuestionRecord(6, "dataset 2 record 3"),
>> QuestionRecord(9, "dataset 2 record 4"),
>> QuestionRecord(12, "dataset 2 record 5")
>>   ))
>>
>>   val allColumns = ds1.columns
>>
>>   // Merge the datasets
>>   val ds3 = ds1.join(ds2, ds1("id") === ds2("id"), "outer")
>>
>>   // Form new columns with the required value
>>   val ds4 = allColumns.foldLeft(ds3) { case (ds, nextColName) =>
>> ds.withColumn(s"new_$nextColName", when(ds1("id").isNotNull,
>> ds1(nextColName)).otherwise(ds2(nextColName)))
>>   }
>>
>>   // Drop old columns
>>   val ds5 = allColumns.foldLeft(ds4) { case (ds, nextColumnName) =>
>> ds.drop(ds1(nextColumnName)).drop(ds2(nextColumnName))
>>   }.drop("id")
>>
>>   // And get rid of our new_ marker
>>   val ds6 = allColumns.foldLeft(ds5) { case (ds, nextColumnName) =>
>>   ds.withColumnRenamed(s"new_$nextColumnName", nextColumnName)
>>   }
>>
>>   ds6.show()
>> }
>>   }
>> }
>>
>> case class QuestionRecord (id: Int, payload: String)
>>
>> On Fri, Sep 13, 2019 at 11:43 AM Abhinesh Hada 
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to take union of 2 dataframes and then drop duplicate based
>>> on the value of a specific column. But, I want to make sure that while
>>> dropping duplicates, the rows from first data frame are kept.
>>>
>>> Example:
>>> df1 = df1.union(df2).dropDuplicates(['id'])
>>>
>>>
>>>


Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"

2019-09-14 Thread Abhinesh Hada
Hey Nathan,

As the dataset is very huge, I am looking for ways that involve minimum
joins. I will give a try to your approach.
Thanks a lot for your help.

On Sat, Sep 14, 2019 at 12:58 AM Nathan Kronenfeld
 wrote:

> It's a bit of a pain, but you could just use an outer join (assuming there
> are no duplicates in the input datasets, of course):
>
> import org.apache.spark.sql.test.SharedSparkSession
> import org.scalatest.FunSpec
>
> class QuestionSpec extends FunSpec with SharedSparkSession {
>   describe("spark list question") {
> it("should join based on id with one row only per id, based on the
> first dataset") {
>   import testImplicits._
>   import org.apache.spark.sql.functions.when
>
>   val ds1 = spark.createDataFrame(Seq(
> QuestionRecord(0, "dataset 1 record 1"),
> QuestionRecord(2, "dataset 1 record 2"),
> QuestionRecord(4, "dataset 1 record 3"),
> QuestionRecord(6, "dataset 1 record 4"),
> QuestionRecord(8, "dataset 1 record 5")
>   ))
>   val ds2 = spark.createDataFrame(Seq(
> QuestionRecord(0, "dataset 2 record 1"),
> QuestionRecord(3, "dataset 2 record 2"),
> QuestionRecord(6, "dataset 2 record 3"),
> QuestionRecord(9, "dataset 2 record 4"),
> QuestionRecord(12, "dataset 2 record 5")
>   ))
>
>   val allColumns = ds1.columns
>
>   // Merge the datasets
>   val ds3 = ds1.join(ds2, ds1("id") === ds2("id"), "outer")
>
>   // Form new columns with the required value
>   val ds4 = allColumns.foldLeft(ds3) { case (ds, nextColName) =>
> ds.withColumn(s"new_$nextColName", when(ds1("id").isNotNull,
> ds1(nextColName)).otherwise(ds2(nextColName)))
>   }
>
>   // Drop old columns
>   val ds5 = allColumns.foldLeft(ds4) { case (ds, nextColumnName) =>
> ds.drop(ds1(nextColumnName)).drop(ds2(nextColumnName))
>   }.drop("id")
>
>   // And get rid of our new_ marker
>   val ds6 = allColumns.foldLeft(ds5) { case (ds, nextColumnName) =>
>   ds.withColumnRenamed(s"new_$nextColumnName", nextColumnName)
>   }
>
>   ds6.show()
> }
>   }
> }
>
> case class QuestionRecord (id: Int, payload: String)
>
> On Fri, Sep 13, 2019 at 11:43 AM Abhinesh Hada 
> wrote:
>
>> Hi,
>>
>> I am trying to take union of 2 dataframes and then drop duplicate based
>> on the value of a specific column. But, I want to make sure that while
>> dropping duplicates, the rows from first data frame are kept.
>>
>> Example:
>> df1 = df1.union(df2).dropDuplicates(['id'])
>>
>>
>>


Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"

2019-09-13 Thread Nathan Kronenfeld
It's a bit of a pain, but you could just use an outer join (assuming there
are no duplicates in the input datasets, of course):

import org.apache.spark.sql.test.SharedSparkSession
import org.scalatest.FunSpec

class QuestionSpec extends FunSpec with SharedSparkSession {
  describe("spark list question") {
it("should join based on id with one row only per id, based on the
first dataset") {
  import testImplicits._
  import org.apache.spark.sql.functions.when

  val ds1 = spark.createDataFrame(Seq(
QuestionRecord(0, "dataset 1 record 1"),
QuestionRecord(2, "dataset 1 record 2"),
QuestionRecord(4, "dataset 1 record 3"),
QuestionRecord(6, "dataset 1 record 4"),
QuestionRecord(8, "dataset 1 record 5")
  ))
  val ds2 = spark.createDataFrame(Seq(
QuestionRecord(0, "dataset 2 record 1"),
QuestionRecord(3, "dataset 2 record 2"),
QuestionRecord(6, "dataset 2 record 3"),
QuestionRecord(9, "dataset 2 record 4"),
QuestionRecord(12, "dataset 2 record 5")
  ))

  val allColumns = ds1.columns

  // Merge the datasets
  val ds3 = ds1.join(ds2, ds1("id") === ds2("id"), "outer")

  // Form new columns with the required value
  val ds4 = allColumns.foldLeft(ds3) { case (ds, nextColName) =>
ds.withColumn(s"new_$nextColName", when(ds1("id").isNotNull,
ds1(nextColName)).otherwise(ds2(nextColName)))
  }

  // Drop old columns
  val ds5 = allColumns.foldLeft(ds4) { case (ds, nextColumnName) =>
ds.drop(ds1(nextColumnName)).drop(ds2(nextColumnName))
  }.drop("id")

  // And get rid of our new_ marker
  val ds6 = allColumns.foldLeft(ds5) { case (ds, nextColumnName) =>
  ds.withColumnRenamed(s"new_$nextColumnName", nextColumnName)
  }

  ds6.show()
}
  }
}

case class QuestionRecord (id: Int, payload: String)

On Fri, Sep 13, 2019 at 11:43 AM Abhinesh Hada 
wrote:

> Hi,
>
> I am trying to take union of 2 dataframes and then drop duplicate based on
> the value of a specific column. But, I want to make sure that while
> dropping duplicates, the rows from first data frame are kept.
>
> Example:
> df1 = df1.union(df2).dropDuplicates(['id'])
>
>
>


Re: [Spark SQL]: Does Union operation followed by drop duplicate follows "keep first"

2019-09-13 Thread Patrick McCarthy
If you only care that you're deduping on one of the fields you could add an
index and count like so:

df3 = df1.withColumn('idx',lit(1))
.union(df2.withColumn('idx',lit(2))

remove_df = df3
.groupBy('id')
.agg(collect_set('idx').alias('set_size')
.filter(size(col('set_size') > 1))
.select('id', lit(2).alias('idx'))

# the duplicated ids in the above are now coded for df2, so only those will
be dropped

df3.join(remove_df, on=['id','idx'], how='leftanti')

On Fri, Sep 13, 2019 at 11:44 AM Abhinesh Hada 
wrote:

> Hi,
>
> I am trying to take union of 2 dataframes and then drop duplicate based on
> the value of a specific column. But, I want to make sure that while
> dropping duplicates, the rows from first data frame are kept.
>
> Example:
> df1 = df1.union(df2).dropDuplicates(['id'])
>
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016