????flink sql 1.12 ????????????minibatch??????
val config = tConfig.getConfiguration()
config.setString("table.exec.mini-batch.enabled", "true") // mini-batch is
enabled
config.setString("table.exec.mini-batch.allow-latency", "true")
config.setString("table.exec.mini-batch.size", 100)
config.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable
two-phase, i.e. local-global aggregation
config.setString("table.optimizer.distinct-agg.split.enabled", "true") //not
support user defined AggregateFunctionsql??tableEnv.executeSql(
s"""insert into event_test
|select
| date_format(create_time,'yyyyMMdd') dt,
| uid,
| count(distinct fid) text_feed_count,
| max(event_time) event_time
| from basic_fd_base where ftype <>'0' and uid is not null
| group by
| date_format(create_time,'yyyyMMdd'),
| uid
""".stripMargin).print()event_test ????print connectorbasic_fd_base ????kafka
connector????????????timestamp????????????????????eventtime????????????null????????????????????????????????????null??2>
+I(20210427,747873111868904192,1,2021-04-27T14:03:10)
2> +I(20210427,709531067945685120,1,null)
2> +I(20210427,759213633292150016,1,2021-04-27T13:59:01.923)
2> +I(20210427,758340406550406272,4,2021-04-27T14:02:14.553)
2> +I(20210427,759658063329437312,1,2021-04-27T14:02:18.305)
2> +I(20210427,737415823706231680,1,2021-04-27T14:02:11.061)
2> +I(20210427,[email protected],1,2021-04-27T14:05:37)
2> +I(20210427,759219266892539008,1,null)
2> +I(20210427,758349976605763328,1,2021-04-27T14:02:24.184)
2> -U(20210427,709531067945685120,1,null)
2> +U(20210427,709531067945685120,1,2021-04-27T14:09:27.156)
2> +I(20210427,751664239562922752,1,2021-04-27T14:16:51.133)
2> -U(20210427,759219266892539008,1,null)
2> +U(20210427,759219266892539008,1,2021-04-27T14:12:40.692)
2> +I(20210427,745540385069273984,1,2021-04-27T14:23:34)
2> +I(20210427,745399833011098240,1,2021-04-27T14:20:32.870)
2> +I(20210427,714590484395398016,1,2021-04-27T14:19:06)
2> +I(20210427,747859173236216832,1,2021-04-27T14:28:21.864)
2> +I(20210427,746212052309316608,1,null)
2> +I(20210427,666839205279797376,1,2021-04-27T14:26:36.743)
2> +I(20210427,758334362541565568,3,2021-04-27T14:18:58.396)
2> +I(20210427,758325137706788480,1,2021-04-27T14:01:09.053)
2> +I(20210427,747837209624908800,1,2021-04-27T13:59:44.193)
2> -U(20210427,758388594254750720,1,2021-04-27T14:00:44.212)
2> +U(20210427,758388594254750720,4,2021-04-27T14:14:55)
2> +I(20210427,759466217777079296,1,2021-04-27T14:25:59.019)
2> -U(20210427,762769243539450496,1,2021-04-27T14:04:29)
2> +U(20210427,762769243539450496,2,2021-04-27T14:04:29)
2> +I(20210427,720648040456852096,1,2021-04-27T14:19:38.680)
2> +I(20210427,750144041584368000,1,2021-04-27T14:29:25.621)
2> +I(20210427,713108045701517952,1,null)
??????minibatch????????????????????????????????????????????????????????????????null??????????????????????????????