1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下, 都是exactly once语义,需要配置checkpoint才能得到结果。
Best, Godfrey [email protected] <[email protected]> 于2020年7月23日周四 下午7:22写道: > Hi, all: > > 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的), > sql如下: > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); > > Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0"); > tEnv.registerCatalog("x", catalog); > > TableResult execute = tEnv.executeSql("select * from > x.ods.ods_binlog_test_trip_create_t_order_1"); > > execute.print(); > > 建表语句如下: > > CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 ( > _type STRING, > order_no STRING, > order_time STRING, > dt as TO_TIMESTAMP(order_time), > proctime as PROCTIME(), > WATERMARK FOR dt AS dt - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.properties.bootstrap.servers' = '***', > 'connector.properties.zookeeper.connect' = '****', > 'connector.version' = 'universal', > 'format.type' = 'json', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'group-offsets', > 'connector.topic' = 'test' > ) >
