Hi all:
我在使用flink1.13.2 的时候利用TVF 进行统计发下滚动统计的结果和离线明细统计的每天总量不对。下面是我的sql:
String message = " CREATE TABLE test(\n" +
" gid VARCHAR COMMENT 'uuid 唯一标识',\n" +
" ip VARCHAR COMMENT 'ip 地址',\n" +
" business_no VARCHAR COMMENT '商户号',\n" +
" rtime BIGINT ,\n" +
" event_time as TO_TIMESTAMP_LTZ(rtime,3) ,\n" +
" WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE
, \n"+
" ts AS PROCTIME () , \n"+
" `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' \n"+
" ) \n" +
" WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test',\n" +
" 'properties.group.id' = 'consumer-02',\n" +
" 'properties.bootstrap.servers' = 'XXX:9092',\n" +
" 'properties.security.protocol' = 'SASL_PLAINTEXT',\n" +
" 'properties.sasl.mechanism' = 'GSSAPI',\n" +
" 'properties.sasl.kerberos.service.name' = 'kafka',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
" )";
// "
String message_cnts="SELECT " +
"ip ," +
"business_no as business_no ," +
" min(record_time) as record_time," +
" count(distinct gid) as total_call_num \n" +
",window_start, window_end" +
" FROM TABLE(\n" +
" TUMBLE(TABLE test, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))\n" +
" GROUP BY window_start, window_end, GROUPING SETS ((business_no ,ip)) ";