??????
SQL??????????????PRODUCT_ID??????Group
By??????????PG??????????????????????WINDOW_START /WINDOW_END??PRODUCT_ID????????????upinsert??????????????????????????????????????????????????????????RANK_ID
+WINDOW_START /WINDOW_END????????????????Flink ??????????????????????
??????Top3 ????????
??????????
ORDER_ID,USER_ID,PRODUCT_ID,NUM,ORDER_TIME
1,34,6005,4,2019-09-01 00:10:00
2,34,6003,1,2019-09-01 00:20:00
3,34,6005,4,2019-09-01 00:30:00
4,34,6006,3,2019-09-01 00:40:00
5,34,6001,6,2019-09-01 00:51:00
6,34,6005,1,2019-09-01 01:11:00
SQL??????????
--source
CREATE TABLE ORDER_DATA{
ORDER_ID VARCHAR,
USER_ID VARCHAR,
PRODUCT_ID VARCHAR,
NUM BIGINT,
ORDER_TIME TIMESTAMP,
WATEERMARK FOR ORDER_TIME AS ORDER_TIME
}WITH{
'connector.type'='kafka',
'connector.version'='0.10',
'connector.topic'='orderData',
'connector.start-mode'='latest-offset',
'connector.properties.zookeeper.connect'='xxxx:2181',
'connector.properties.boostrap.servers'='xxxx:9092',
'connector.properties.group.id'='flink_sql',
'format.type'='csv',
'format.derive-schema'='true'
};
--sink
CREATE TABLE PRODUCT_RANK{
RANK_ID BIGINT,
WINDOW_START TIMESTAMP(3),
WINDOW_END TIMESTAMP(3),
PRODUCT_ID VARCHAR,
TOTAL_NUM BIGINT
}WITH{
'connector.type'='jdbc',
'connector.url'='jdbc:postgresql://localhost:5432/flink-test?characterEncoding=utf-8',
'connector.driver'='org.postgresql.Driver',
'connector.table'='product_rank',
'connector.username'='xxxxx',
'connector.password'='xxxx',
'connector.write.flush.max-rows'='1'
};
INSERT INTO PRODUCT_RANK
SELECT RANK_ID,WINDOW_START,WINDOW_END,PRODUCT_ID,TOTAL_NUM
FROM(
SELECT *,
ROW_NUMBER() OVER (PARTITION BY WINDOW_START ORDER BY TOTAL_NUM
DESC) AS RANK_ID
FROM(
SELECT
TUMBLE_START(ORDER_TIME,INTERVAL '1' hour) AS
WINDOW_START,
TUMBLE_END(ORDER_TIME,INTERVAL '1' hour) AS
WINDOW_END,
SUM(NUM) AS TOTAL_NUM.
PRODUCT_ID
FROM ORDER_DATA
GROUP BY TUMBLE(ORDER_TIME,INTERVAL '1' hour),PRODUCT_ID
)
) WHERE RANK_ID <=3;