Hi,Godfrey:
加了checkpoint后确实可以了,能具体讲一下原理吗?print是在完成快照的时候顺便把结果输出了吗?或者有没有相关文档?
Best,
Junbao Zhang
________________________________
发件人: godfrey he <[email protected]>
发送时间: 2020年7月23日 19:24
收件人: user-zh <[email protected]>
主题: Re: flink 1.11 executeSql查询kafka表print没有输出
1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,
都是exactly once语义,需要配置checkpoint才能得到结果。
Best,
Godfrey
[email protected] <[email protected]> 于2020年7月23日周四 下午7:22写道:
> Hi, all:
>
> 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的),
> sql如下:
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>
> Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0");
> tEnv.registerCatalog("x", catalog);
>
> TableResult execute = tEnv.executeSql("select * from
> x.ods.ods_binlog_test_trip_create_t_order_1");
>
> execute.print();
>
> 建表语句如下:
>
> CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (
> _type STRING,
> order_no STRING,
> order_time STRING,
> dt as TO_TIMESTAMP(order_time),
> proctime as PROCTIME(),
> WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.properties.bootstrap.servers' = '***',
> 'connector.properties.zookeeper.connect' = '****',
> 'connector.version' = 'universal',
> 'format.type' = 'json',
> 'connector.properties.group.id' = 'testGroup',
> 'connector.startup-mode' = 'group-offsets',
> 'connector.topic' = 'test'
> )
>