Any help would be much appreciated :)

On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> I want to write something in Structured streaming:
>
> 1. I have a dataset which has 3 columns: id, last_update_timestamp,
> attribute
> 2. I am receiving the data through Kinesis
>
> I want to deduplicate records based on last_updated. In batch, it looks
> like:
>
> spark.sql("select * from (Select *, row_number() OVER(Partition by id
> order by last_updated desc) rank  from table1) tmp where rank =1")
>
> But now I would like to do it in Structured Stream. I need to maintain the
> state of id as per the highest last_updated, across the triggers, for a
> certain period (24 hours).
>
> Questions:
>
> 1. Should I use mapGroupsWithState or is there any other (SQL?) solution?
> Can anyone help me to write it?
> 2. Is mapGroupsWithState supported in Python?
>
>  Just to ensure we cover bases, I have already tried using dropDuplicates,
> but it is keeping the 1st record encountered for an Id, not updating the
> state:
>
> unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
> dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.
> header.id').alias('id'),
>                           get_json_object(unpackedDF.jsonData,
> '$.header.last_updated').cast('timestamp').alias('last_updated'),
>                           unpackedDF.jsonData)
>
> dedupDF = 
> dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
> hours')
>
>
> So it is not working. Any help is appreciated.
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha

Reply via email to