Hi Hequn,
However is it semantically correct? because the sql result is not equal
to the bounded table.
> 在 2018年8月20日,下午8:34,Hequn Cheng <[email protected]> 写道:
>
> Hi Henry,
>
> Both sql output incrementally.
>
> However there are some problems if you use retract sink. You have to pay
> attention to the timestamp field since each time the value is different.
> For example, if the value is updated from 1 to 2,
> previous row: add (a, 1, 2018-08-20 20:18:10.286)
> retract row: delete (a, 1, 2018-08-20 20:18:10.386)
> new row: add (a, 2, 2018-08-20 20:18:10.486)
> The retract row is different from the previous row because of the time field.
>
> Of course, this problem should be fixed later.
>
> Best, Hequn
>
> On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <[email protected]
> <mailto:[email protected]>> wrote:
> Hi All,
> Like the following code,If I use retract stream, I think Flink is able
> to know which item is modified( if praise has 10000 items now, when one item
> comes to the stream, only very small amount of data is write to sink)
> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU
> FROM praise group by article_id” )
> tableEnv.registerTable("finalTable", praiseAggr)
> tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from
> finalTable")
>
> But if I use the following sql, by adding a dynamic timestamp field:
> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid)
> as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id” )
> Is the whole table flush to the sink? Or only the incremental value
> will flush to the sink? Why?
>
> Thanks,
> Henry
>
>