??????
SQL??????????????PRODUCT_ID??????Group 
By??????????PG??????????????????????WINDOW_START /WINDOW_END??PRODUCT_ID????????????upinsert??????????????????????????????????????????????????????????RANK_ID
 +WINDOW_START /WINDOW_END????????????????Flink ??????????????????????



??????Top3 ????????


??????????
ORDER_ID,USER_ID,PRODUCT_ID,NUM,ORDER_TIME
1,34,6005,4,2019-09-01 00:10:00
2,34,6003,1,2019-09-01 00:20:00
3,34,6005,4,2019-09-01 00:30:00
4,34,6006,3,2019-09-01 00:40:00
5,34,6001,6,2019-09-01 00:51:00
6,34,6005,1,2019-09-01 01:11:00



SQL??????????
--source
CREATE TABLE ORDER_DATA{
        ORDER_ID VARCHAR,
        USER_ID VARCHAR,
        PRODUCT_ID VARCHAR,
        NUM BIGINT,
        ORDER_TIME TIMESTAMP,
        WATEERMARK FOR ORDER_TIME AS ORDER_TIME
}WITH{
        'connector.type'='kafka',
        'connector.version'='0.10',
        'connector.topic'='orderData',
        'connector.start-mode'='latest-offset',
        'connector.properties.zookeeper.connect'='xxxx:2181',
        'connector.properties.boostrap.servers'='xxxx:9092',
        'connector.properties.group.id'='flink_sql',
        'format.type'='csv',
        'format.derive-schema'='true'
};


--sink
CREATE TABLE PRODUCT_RANK{
        RANK_ID BIGINT,
        WINDOW_START TIMESTAMP(3),
        WINDOW_END TIMESTAMP(3),
        PRODUCT_ID VARCHAR,
        TOTAL_NUM BIGINT
}WITH{
        'connector.type'='jdbc',
        
'connector.url'='jdbc:postgresql://localhost:5432/flink-test?characterEncoding=utf-8',
        'connector.driver'='org.postgresql.Driver',
        'connector.table'='product_rank',
        'connector.username'='xxxxx',
        'connector.password'='xxxx',
        'connector.write.flush.max-rows'='1'
};


INSERT INTO PRODUCT_RANK
    SELECT RANK_ID,WINDOW_START,WINDOW_END,PRODUCT_ID,TOTAL_NUM
        FROM(
                SELECT *,
                ROW_NUMBER() OVER (PARTITION BY WINDOW_START ORDER BY TOTAL_NUM 
DESC) AS RANK_ID
                FROM(
                        SELECT
                                TUMBLE_START(ORDER_TIME,INTERVAL '1' hour) AS 
WINDOW_START,
                                TUMBLE_END(ORDER_TIME,INTERVAL '1' hour) AS 
WINDOW_END,
                                SUM(NUM) AS TOTAL_NUM.
                                PRODUCT_ID
                        FROM ORDER_DATA
                        GROUP BY TUMBLE(ORDER_TIME,INTERVAL '1' hour),PRODUCT_ID
                )
        ) WHERE RANK_ID <=3;

&nbsp; &nbsp; &nbsp;

回复