Re: flink 1.11 SQL idea调试无数据也无报错
> GROUP BY TUMBLE(ts, INTERVAL '1' HOUR) 是要1个小时的window结束才会出结果。 你可以通过把window缩小或者设置early-fire来提早看到数据 table.exec.emit.early-fire.enabled=true table.exec.emit.early-fire.delay=xx > 手动拿到那个executeSql的返回的TableResult,然后去 wait job finished 这个是为了防止本地ide里执行时executeSql执行结束后进程退出导致job也强制结束 DanielGu <610493...@qq.com> 于2020年8月17日周一 下午4:04写道: > hi, > flink 1.11 SQL idea调试 有其他伙伴了解吗?求赐教. > 最近调试卡在这里..有点出不来了 > 十分感谢 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: flink 1.11 SQL idea调试无数据也无报错
hi, flink 1.11 SQL idea调试 有其他伙伴了解吗?求赐教. 最近调试卡在这里..有点出不来了 十分感谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.11 SQL idea调试无数据也无报错
hi, 之前查看邮件列表确实有看到很多地方提到executeSql是一个异步接口.但是我对这部分还是有一些疑惑 1.当inset into 的逻辑是简单逻辑的时候可以看到代码有输出,但替换为我最初发的有聚合逻辑的insert into sql 就无法显示输出了,为什么? 代码 ... tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); tEnv.executeSql("INSERT INTO print_sink SELECT user_id ,item_id,category_id ,behavior ,ts,proctime FROM user_behavior"); ... 控制台 3> +I(1014646,2869046,4022701,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847) 3> +I(105950,191177,3975787,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847) 3> +I(128322,5013356,4066962,buy,2017-11-27T00:38:15,2020-08-14T08:20:23.847) 3> +I(225652,3487948,2462567,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847) 聚合逻辑代码(source不变,sink 对应变更列) > String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" + > "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as > hour_of_day , COUNT(*) as buy_cnt\n" + > "FROM user_behavior\n" + > "WHERE behavior = 'buy'\n" + > "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)"; > > > > //注册source和sink > tEnv.executeSql(sourceDDL); > tEnv.executeSql(sinkDDL); > //tableResult.print(); > >tEnv.executeSql(transformationDDL); 2.没有太理解您说的 手动拿到那个executeSql的返回的TableResult,然后去 wait job finished 代码修改为如下 运行控制台还是没有结果打印 //注册source和sink tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); TableResult tableResult = tEnv.executeSql(transformationDDL); tableResult.getJobClient() .get() .getJobExecutionResult(Thread.currentThread().getContextClassLoader()) .get().wait(); Best, DanielGu -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.11 SQL idea调试无数据也无报错
Hi, 这是因为flink 1.11里面executeSql是一个异步的接口,在idea里面跑的话,直接就结束了,你需要手动拿到那个executeSql的返回的TableResult,然后去 tableResult.getJobClient.get() .getJobExecutionResult(Thread.currentThread().getContextClassLoader) .get() 进行wait job finished Best, Xingbo DanielGu <610493...@qq.com> 于2020年8月14日周五 上午11:45写道: > 我遇到个问题,请教一下: > 环境 1.11 idea > 参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量 > 数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应 > https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN > 求助,各位 > > > > 下面是pom 和代码,以及运行结果 > > // 创建执行环境 > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > //设置StateBackend > bsEnv.setStateBackend(new > FsStateBackend("file:///tmp/flink/chkdir")); > EnvironmentSettings bsSettings = EnvironmentSettings > .newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, > bsSettings); > > // Kafka > String sourceDDL ="CREATE TABLE user_behavior (" + > "user_id BIGINT," + > "item_id BIGINT," + > "category_id BIGINT," + > "behavior STRING," + > "ts TIMESTAMP (3)," + > "proctime AS PROCTIME ()," + > "WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " + > "WITH (" + > "'connector'='kafka'," + > "'topic'='user_behavior'," + > "'scan.startup.mode'='earliest-offset'," + > "'properties.bootstrap.servers'='localhost:9092'," + > "'format'='json'" + > ")"; > > > //写入es 改为print > /*String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" + > "hour_of_day BIGINT," + > "buy_cnt BIGINT" + > ") WITH (" + > "'connector'='elasticsearch-7'," + > "'hosts'='http://localhost:9200'," + > "'index'='buy_cnt_per_hour')";*/ > String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" + > "hour_of_day BIGINT," + > "buy_cnt BIGINT" + > ") WITH (\n" + > " 'connector' = 'print'\n" + > ")"; > > > String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" + > "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as > hour_of_day , COUNT(*) as buy_cnt\n" + > "FROM user_behavior\n" + > "WHERE behavior = 'buy'\n" + > "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)"; > > > > //注册source和sink > tEnv.executeSql(sourceDDL); > tEnv.executeSql(sinkDDL); > //tableResult.print(); > >tEnv.executeSql(transformationDDL); > > pom > > > > org.apache.flink > > flink-table-api-java-bridge_${scala.version} > ${flink.version} > > > > > org.apache.flink > > flink-table-planner-blink_${scala.version} > ${flink.version} > > > > > > org.apache.flink > flink-table-common > ${flink.version} > provided > > > > > > > org.apache.flink > flink-clients_${scala.version} > ${flink.version} > > > > > > > org.apache.flink > flink-json > ${flink.version} > > > > > > org.apache.flink > > flink-connector-elasticsearch7_${scala.version} > ${flink.version} > > > > > org.apache.flink > > flink-sql-connector-kafka_${scala.version} > ${flink.version} > > > > > > > org.apache.flink > flink-connector-jdbc_${scala.version} > ${flink.version} > > > > > mysql > mysql-connector-java > ${mysql.version} > > > > > org.apache.flink > flink-runtime-web_${scala.version} > ${flink.version} > provided > > > > 运行结果 > 01:15:12,358 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser > - > Kafka version: unknown > 01:15:12,358 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser > - > Kafka commitId: unknown > 01:15:12,358 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser > - > Kafka startTimeMs: 1597338912355 > 01:15:12,361 INFO > org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer > > - [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s): > user_behavior-0 > 01:15:12,365 INFO >
flink 1.11 SQL idea调试无数据也无报错
我遇到个问题,请教一下: 环境 1.11 idea 参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量 数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应 https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN 求助,各位 下面是pom 和代码,以及运行结果 // 创建执行环境 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); //设置StateBackend bsEnv.setStateBackend(new FsStateBackend("file:///tmp/flink/chkdir")); EnvironmentSettings bsSettings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // Kafka String sourceDDL ="CREATE TABLE user_behavior (" + "user_id BIGINT," + "item_id BIGINT," + "category_id BIGINT," + "behavior STRING," + "ts TIMESTAMP (3)," + "proctime AS PROCTIME ()," + "WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " + "WITH (" + "'connector'='kafka'," + "'topic'='user_behavior'," + "'scan.startup.mode'='earliest-offset'," + "'properties.bootstrap.servers'='localhost:9092'," + "'format'='json'" + ")"; //写入es 改为print /*String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" + "hour_of_day BIGINT," + "buy_cnt BIGINT" + ") WITH (" + "'connector'='elasticsearch-7'," + "'hosts'='http://localhost:9200'," + "'index'='buy_cnt_per_hour')";*/ String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" + "hour_of_day BIGINT," + "buy_cnt BIGINT" + ") WITH (\n" + " 'connector' = 'print'\n" + ")"; String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" + "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as hour_of_day , COUNT(*) as buy_cnt\n" + "FROM user_behavior\n" + "WHERE behavior = 'buy'\n" + "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)"; //注册source和sink tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); //tableResult.print(); tEnv.executeSql(transformationDDL); pom org.apache.flink flink-table-api-java-bridge_${scala.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} provided org.apache.flink flink-clients_${scala.version} ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.flink flink-connector-elasticsearch7_${scala.version} ${flink.version} org.apache.flink flink-sql-connector-kafka_${scala.version} ${flink.version} org.apache.flink flink-connector-jdbc_${scala.version} ${flink.version} mysql mysql-connector-java ${mysql.version} org.apache.flink flink-runtime-web_${scala.version} ${flink.version} provided 运行结果 01:15:12,358 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka version: unknown 01:15:12,358 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: unknown 01:15:12,358 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1597338912355 01:15:12,361 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s): user_behavior-0 01:15:12,365 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-20, groupId=null] Seeking to EARLIEST offset of partition user_behavior-0 01:15:12,377 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-20, groupId=null] Cluster ID: txkqox8yRL6aWBNsOcS67g 01:15:12,387 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-20, groupId=null]