Re: blink消费kafka出现诡异的情况,困扰很久了,哪位大佬知道怎么回事

2019-03-28 文章 邓成刚【qq】
通过测试发现,不是sql 脚本的问题,是并行度的问题,30个并行度不行,改成5就OK了。。。
env.setParallelism(5);
 
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:17
收件人: user-zh
主题: blink消费kafka出现诡异的情况,困扰很久了,哪位大佬知道怎么回事
HI,各位大佬:
      发现一个很诡异的问题:使用SQL API时,在窗口上group by,JOB 5分钟后会timeout,但如果改成select * 
就能正常消费kafka。。。
说明:本地模式和提交JOB均存在此异常
相关信息:
blink 1.5.1
kafka 1.1.1
flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar
 
消费正常的code:
 
String sql = "select * from table1"
 
Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           
 
tableEnv.toRetractStream(sip_distinct_event_id, Row.class).print();
env.execute("myjob2");
 
 
 
如果把SQL换成如下就会timeout...
 
String sql ="select TUMBLE_START(EVENTTIME,INTERVAL '1' MINUTE) AS 
EVENTTIME,NEW_EVENT_ID,MSISDN from   
       +"select EVENTTIME,EVENT_ID as NEW_EVENT_ID,MSISDN from table1"         
      +") group by TUMBLE(EVENTTIME,INTERVAL '1' MINUTE),NEW_EVENT_ID,MSISDN"); 
 
 
 
Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           
 
tableEnv.toRetractStream(sip_distinct_event_id, Row.class).print();
env.execute("myjob2");
 
 
异常:
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: 
java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJob(MiniCluster.java:637)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.executeInternal(LocalStreamEnvironment.java:98)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1893)
at com.nsn.flink.service.DealRegisterFile13.main(DealRegisterFile13.java:98)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
 
 
 
 

blink消费kafka出现诡异的情况,困扰很久了,哪位大佬知道怎么回事

2019-03-26 文章 邓成刚【qq】
HI,各位大佬:
      发现一个很诡异的问题:使用SQL API时,在窗口上group by,JOB 5分钟后会timeout,但如果改成select * 
就能正常消费kafka。。。
说明:本地模式和提交JOB均存在此异常
相关信息:
blink 1.5.1
kafka 1.1.1
flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar

消费正常的code:

String sql = "select * from table1"

Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           

tableEnv.toRetractStream(sip_distinct_event_id, 
Row.class).print();
env.execute("myjob2");



如果把SQL换成如下就会timeout...

String sql ="select TUMBLE_START(EVENTTIME,INTERVAL '1' MINUTE) AS 
EVENTTIME,NEW_EVENT_ID,MSISDN from   
        +"select EVENTTIME,EVENT_ID as NEW_EVENT_ID,MSISDN from 
table1"         
       +") group by TUMBLE(EVENTTIME,INTERVAL '1' 
MINUTE),NEW_EVENT_ID,MSISDN"); 



Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           

tableEnv.toRetractStream(sip_distinct_event_id, 
Row.class).print();
env.execute("myjob2");


异常:
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: 
java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJob(MiniCluster.java:637)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.executeInternal(LocalStreamEnvironment.java:98)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1893)
at 
com.nsn.flink.service.DealRegisterFile13.main(DealRegisterFile13.java:98)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)