Re: flink 1.11 SQL idea调试无数据也无报错

2020-08-18 文章 godfrey he
> GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)
是要1个小时的window结束才会出结果。
你可以通过把window缩小或者设置early-fire来提早看到数据
table.exec.emit.early-fire.enabled=true
table.exec.emit.early-fire.delay=xx

>  手动拿到那个executeSql的返回的TableResult,然后去   wait job finished
这个是为了防止本地ide里执行时executeSql执行结束后进程退出导致job也强制结束

DanielGu <610493...@qq.com> 于2020年8月17日周一 下午4:04写道:

> hi,
> flink 1.11 SQL idea调试 有其他伙伴了解吗?求赐教.
> 最近调试卡在这里..有点出不来了
> 十分感谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11 SQL idea调试无数据也无报错

2020-08-17 文章 DanielGu
hi,
flink 1.11 SQL idea调试 有其他伙伴了解吗?求赐教.
最近调试卡在这里..有点出不来了
十分感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11 SQL idea调试无数据也无报错

2020-08-14 文章 DanielGu
hi,

之前查看邮件列表确实有看到很多地方提到executeSql是一个异步接口.但是我对这部分还是有一些疑惑


1.当inset into 的逻辑是简单逻辑的时候可以看到代码有输出,但替换为我最初发的有聚合逻辑的insert into sql
就无法显示输出了,为什么?
代码
...
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);

tEnv.executeSql("INSERT INTO print_sink SELECT  user_id
,item_id,category_id ,behavior ,ts,proctime FROM user_behavior");
...
控制台
3>
+I(1014646,2869046,4022701,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
3> +I(105950,191177,3975787,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
3>
+I(128322,5013356,4066962,buy,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
3> +I(225652,3487948,2462567,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847)

聚合逻辑代码(source不变,sink 对应变更列)
> String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
> "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
> hour_of_day , COUNT(*) as buy_cnt\n" +
> "FROM user_behavior\n" +
> "WHERE behavior = 'buy'\n" +
> "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";
>
>
>
> //注册source和sink
> tEnv.executeSql(sourceDDL);
> tEnv.executeSql(sinkDDL);
> //tableResult.print();
>
>tEnv.executeSql(transformationDDL);

2.没有太理解您说的   手动拿到那个executeSql的返回的TableResult,然后去   wait job finished
代码修改为如下 运行控制台还是没有结果打印
//注册source和sink
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);

TableResult tableResult = tEnv.executeSql(transformationDDL);

tableResult.getJobClient()
.get()
   
.getJobExecutionResult(Thread.currentThread().getContextClassLoader())
.get().wait();

Best,
DanielGu



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11 SQL idea调试无数据也无报错

2020-08-13 文章 Xingbo Huang
Hi,
这是因为flink
1.11里面executeSql是一个异步的接口,在idea里面跑的话,直接就结束了,你需要手动拿到那个executeSql的返回的TableResult,然后去

tableResult.getJobClient.get()
  .getJobExecutionResult(Thread.currentThread().getContextClassLoader)
  .get()

进行wait job finished

Best,
Xingbo

DanielGu <610493...@qq.com> 于2020年8月14日周五 上午11:45写道:

> 我遇到个问题,请教一下:
> 环境 1.11 idea
> 参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量
> 数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应
> https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN
> 求助,各位
>
>
>
> 下面是pom 和代码,以及运行结果
>
> // 创建执行环境
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
> //设置StateBackend
> bsEnv.setStateBackend(new
> FsStateBackend("file:///tmp/flink/chkdir"));
> EnvironmentSettings bsSettings = EnvironmentSettings
> .newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);
>
> // Kafka
> String sourceDDL ="CREATE TABLE user_behavior (" +
> "user_id BIGINT," +
> "item_id BIGINT," +
> "category_id BIGINT," +
> "behavior STRING," +
> "ts TIMESTAMP (3)," +
> "proctime AS PROCTIME ()," +
> "WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " +
> "WITH (" +
> "'connector'='kafka'," +
> "'topic'='user_behavior'," +
> "'scan.startup.mode'='earliest-offset'," +
> "'properties.bootstrap.servers'='localhost:9092'," +
> "'format'='json'" +
> ")";
>
>
> //写入es 改为print
> /*String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" +
> "hour_of_day BIGINT," +
> "buy_cnt BIGINT" +
> ") WITH (" +
> "'connector'='elasticsearch-7'," +
> "'hosts'='http://localhost:9200'," +
> "'index'='buy_cnt_per_hour')";*/
> String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" +
>  "hour_of_day BIGINT," +
>  "buy_cnt BIGINT" +
> ") WITH (\n" +
> " 'connector' = 'print'\n" +
> ")";
>
>
> String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
> "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
> hour_of_day , COUNT(*) as buy_cnt\n" +
> "FROM user_behavior\n" +
> "WHERE behavior = 'buy'\n" +
> "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";
>
>
>
> //注册source和sink
> tEnv.executeSql(sourceDDL);
> tEnv.executeSql(sinkDDL);
> //tableResult.print();
>
>tEnv.executeSql(transformationDDL);
>
> pom
> 
>
> 
> org.apache.flink
>
> flink-table-api-java-bridge_${scala.version}
> ${flink.version}
>
> 
>
> 
> org.apache.flink
>
> flink-table-planner-blink_${scala.version}
> ${flink.version}
>
> 
>
>
> 
> org.apache.flink
> flink-table-common
> ${flink.version}
> provided
> 
>
>
>
>
> 
> org.apache.flink
> flink-clients_${scala.version}
> ${flink.version}
>
> 
>
>
>
> 
> org.apache.flink
> flink-json
> ${flink.version}
>
> 
>
>
> 
> org.apache.flink
>
> flink-connector-elasticsearch7_${scala.version}
> ${flink.version}
> 
>
>
> 
> org.apache.flink
>
> flink-sql-connector-kafka_${scala.version}
> ${flink.version}
> 
>
>
>
>
> 
> org.apache.flink
> flink-connector-jdbc_${scala.version}
> ${flink.version}
> 
>
>
> 
> mysql
> mysql-connector-java
> ${mysql.version}
> 
>
>
> 
> org.apache.flink
> flink-runtime-web_${scala.version}
> ${flink.version}
> provided
> 
> 
>
> 运行结果
> 01:15:12,358 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> -
> Kafka version: unknown
> 01:15:12,358 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> -
> Kafka commitId: unknown
> 01:15:12,358 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> -
> Kafka startTimeMs: 1597338912355
> 01:15:12,361 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
>
> - [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s):
> user_behavior-0
> 01:15:12,365 INFO
> 

flink 1.11 SQL idea调试无数据也无报错

2020-08-13 文章 DanielGu
我遇到个问题,请教一下:
环境 1.11 idea
参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量
数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应
https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN
求助,各位



下面是pom 和代码,以及运行结果

// 创建执行环境
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
//设置StateBackend
bsEnv.setStateBackend(new
FsStateBackend("file:///tmp/flink/chkdir"));
EnvironmentSettings bsSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);

// Kafka
String sourceDDL ="CREATE TABLE user_behavior (" +
"user_id BIGINT," +
"item_id BIGINT," +
"category_id BIGINT," +
"behavior STRING," +
"ts TIMESTAMP (3)," +
"proctime AS PROCTIME ()," +
"WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " +
"WITH (" +
"'connector'='kafka'," +
"'topic'='user_behavior'," +
"'scan.startup.mode'='earliest-offset'," +
"'properties.bootstrap.servers'='localhost:9092'," +
"'format'='json'" +
")";


//写入es 改为print
/*String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" +
"hour_of_day BIGINT," +
"buy_cnt BIGINT" +
") WITH (" +
"'connector'='elasticsearch-7'," +
"'hosts'='http://localhost:9200'," +
"'index'='buy_cnt_per_hour')";*/
String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" +
 "hour_of_day BIGINT," +
 "buy_cnt BIGINT" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";


String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
"SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
hour_of_day , COUNT(*) as buy_cnt\n" +
"FROM user_behavior\n" +
"WHERE behavior = 'buy'\n" +
"GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";



//注册source和sink
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
//tableResult.print();

   tEnv.executeSql(transformationDDL);

pom



org.apache.flink
   
flink-table-api-java-bridge_${scala.version}
${flink.version}




org.apache.flink
   
flink-table-planner-blink_${scala.version}
${flink.version}





org.apache.flink
flink-table-common
${flink.version}
provided






org.apache.flink
flink-clients_${scala.version}
${flink.version}






org.apache.flink
flink-json
${flink.version}





org.apache.flink
   
flink-connector-elasticsearch7_${scala.version}
${flink.version}




org.apache.flink
   
flink-sql-connector-kafka_${scala.version}
${flink.version}






org.apache.flink
flink-connector-jdbc_${scala.version}
${flink.version}




mysql
mysql-connector-java
${mysql.version}




org.apache.flink
flink-runtime-web_${scala.version}
${flink.version}
provided



运行结果
01:15:12,358 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  -
Kafka version: unknown
01:15:12,358 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  -
Kafka commitId: unknown
01:15:12,358 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  -
Kafka startTimeMs: 1597338912355
01:15:12,361 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer 
- [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s):
user_behavior-0
01:15:12,365 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
 
- [Consumer clientId=consumer-20, groupId=null] Seeking to EARLIEST offset
of partition user_behavior-0
01:15:12,377 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata  - [Consumer
clientId=consumer-20, groupId=null] Cluster ID: txkqox8yRL6aWBNsOcS67g
01:15:12,387 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
 
- [Consumer clientId=consumer-20, groupId=null]