Hi Longfei, 非常抱歉当前确实支持不了,不过这个问题将在 FLIP-95 提供的新 TableSink 接口后解决,有望在 1.11 中解决。
Best, Jark On Wed, 1 Apr 2020 at 19:12, Longfei Zhou <[email protected]> wrote: > 问题: > SQL中对时间窗口和PRODUCT_ID进行了Group > By聚合操作,PG数据表中的主键须设置为WINDOW_START /WINDOW_END和PRODUCT_ID,否则无法以upinsert方式写出数据,但是这样却无法满足业务场景的需求,业务上应以RANK_ID > +WINDOW_START /WINDOW_END为主键标识,请问Flink 中该如何实现这个需求? > > > > 场景:Top3 热门商品 > > > 数据样例: > ORDER_ID,USER_ID,PRODUCT_ID,NUM,ORDER_TIME > 1,34,6005,4,2019-09-01 00:10:00 > 2,34,6003,1,2019-09-01 00:20:00 > 3,34,6005,4,2019-09-01 00:30:00 > 4,34,6006,3,2019-09-01 00:40:00 > 5,34,6001,6,2019-09-01 00:51:00 > 6,34,6005,1,2019-09-01 01:11:00 > > > > SQL逻辑如下: > --source > CREATE TABLE ORDER_DATA{ > ORDER_ID VARCHAR, > USER_ID VARCHAR, > PRODUCT_ID VARCHAR, > NUM BIGINT, > ORDER_TIME TIMESTAMP, > WATEERMARK FOR ORDER_TIME AS ORDER_TIME > }WITH{ > 'connector.type'='kafka', > 'connector.version'='0.10', > 'connector.topic'='orderData', > 'connector.start-mode'='latest-offset', > 'connector.properties.zookeeper.connect'='xxxx:2181', > 'connector.properties.boostrap.servers'='xxxx:9092', > 'connector.properties.group.id'='flink_sql', > 'format.type'='csv', > 'format.derive-schema'='true' > }; > > > --sink > CREATE TABLE PRODUCT_RANK{ > RANK_ID BIGINT, > WINDOW_START TIMESTAMP(3), > WINDOW_END TIMESTAMP(3), > PRODUCT_ID VARCHAR, > TOTAL_NUM BIGINT > }WITH{ > 'connector.type'='jdbc', > > 'connector.url'='jdbc:postgresql://localhost:5432/flink-test?characterEncoding=utf-8', > 'connector.driver'='org.postgresql.Driver', > 'connector.table'='product_rank', > 'connector.username'='xxxxx', > 'connector.password'='xxxx', > 'connector.write.flush.max-rows'='1' > }; > > > INSERT INTO PRODUCT_RANK > SELECT RANK_ID,WINDOW_START,WINDOW_END,PRODUCT_ID,TOTAL_NUM > FROM( > SELECT *, > ROW_NUMBER() OVER (PARTITION BY WINDOW_START ORDER BY > TOTAL_NUM DESC) AS RANK_ID > FROM( > SELECT > TUMBLE_START(ORDER_TIME,INTERVAL '1' hour) > AS WINDOW_START, > TUMBLE_END(ORDER_TIME,INTERVAL '1' hour) > AS WINDOW_END, > SUM(NUM) AS TOTAL_NUM. > PRODUCT_ID > FROM ORDER_DATA > GROUP BY TUMBLE(ORDER_TIME,INTERVAL '1' > hour),PRODUCT_ID > ) > ) WHERE RANK_ID <=3; > >
