hi,
之前查看邮件列表确实有看到很多地方提到executeSql是一个异步接口.但是我对这部分还是有一些疑惑
1.当inset into 的逻辑是简单逻辑的时候可以看到代码有输出,但替换为我最初发的有聚合逻辑的insert into sql
就无法显示输出了,为什么?
代码
...
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
tEnv.executeSql("INSERT INTO print_sink SELECT user_id
,item_id,category_id ,behavior ,ts,proctime FROM user_behavior");
...
控制台
3>
+I(1014646,2869046,4022701,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
3> +I(105950,191177,3975787,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
3>
+I(128322,5013356,4066962,buy,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
3> +I(225652,3487948,2462567,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
聚合逻辑代码(source不变,sink 对应变更列)
> String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
> "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
> hour_of_day , COUNT(*) as buy_cnt\n" +
> "FROM user_behavior\n" +
> "WHERE behavior = 'buy'\n" +
> "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";
>
>
>
> //注册source和sink
> tEnv.executeSql(sourceDDL);
> tEnv.executeSql(sinkDDL);
> // tableResult.print();
>
> tEnv.executeSql(transformationDDL);
2.没有太理解您说的 手动拿到那个executeSql的返回的TableResult,然后去 .... wait job finished
代码修改为如下 运行控制台还是没有结果打印
//注册source和sink
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
TableResult tableResult = tEnv.executeSql(transformationDDL);
tableResult.getJobClient()
.get()
.getJobExecutionResult(Thread.currentThread().getContextClassLoader())
.get().wait();
Best,
DanielGu
--
Sent from: http://apache-flink.147419.n8.nabble.com/