Any help would be much appreciated :) On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:
> Hi > > I want to write something in Structured streaming: > > 1. I have a dataset which has 3 columns: id, last_update_timestamp, > attribute > 2. I am receiving the data through Kinesis > > I want to deduplicate records based on last_updated. In batch, it looks > like: > > spark.sql("select * from (Select *, row_number() OVER(Partition by id > order by last_updated desc) rank from table1) tmp where rank =1") > > But now I would like to do it in Structured Stream. I need to maintain the > state of id as per the highest last_updated, across the triggers, for a > certain period (24 hours). > > Questions: > > 1. Should I use mapGroupsWithState or is there any other (SQL?) solution? > Can anyone help me to write it? > 2. Is mapGroupsWithState supported in Python? > > Just to ensure we cover bases, I have already tried using dropDuplicates, > but it is keeping the 1st record encountered for an Id, not updating the > state: > > unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData") > dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$. > header.id').alias('id'), > get_json_object(unpackedDF.jsonData, > '$.header.last_updated').cast('timestamp').alias('last_updated'), > unpackedDF.jsonData) > > dedupDF = > dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24 > hours') > > > So it is not working. Any help is appreciated. > > -- > Best Regards, > Ayan Guha > -- Best Regards, Ayan Guha