通过测试发现,不是sql 脚本的问题,是并行度的问题,30个并行度不行,改成5就OK了。。。
env.setParallelism(5);
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:17
收件人: user-zh
主题: blink消费kafka出现诡异的情况,困扰很久了,哪位大佬知道怎么回事
HI,各位大佬:
发现一个很诡异的问题:使用SQL API时,在窗口上group by,JOB 5分钟后会timeout,但如果改成select *
就能正常消费kafka。。。
说明:本地模式和提交JOB均存在此异常
相关信息:
blink 1.5.1
kafka 1.1.1
flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar
消费正常的code:
String sql = "select * from table1"
Table sip_distinct_event_id = tableEnv.sqlQuery( sql );
tableEnv.toRetractStream(sip_distinct_event_id, Row.class).print();
env.execute("myjob2");
如果把SQL换成如下就会timeout...
String sql ="select TUMBLE_START(EVENTTIME,INTERVAL '1' MINUTE) AS
EVENTTIME,NEW_EVENT_ID,MSISDN from
+"select EVENTTIME,EVENT_ID as NEW_EVENT_ID,MSISDN from table1"
+") group by TUMBLE(EVENTTIME,INTERVAL '1' MINUTE),NEW_EVENT_ID,MSISDN");
Table sip_distinct_event_id = tableEnv.sqlQuery( sql );
tableEnv.toRetractStream(sip_distinct_event_id, Row.class).print();
env.execute("myjob2");
异常:
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException:
java.util.concurrent.TimeoutException
at
org.apache.flink.runtime.minicluster.MiniCluster.executeJob(MiniCluster.java:637)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.executeInternal(LocalStreamEnvironment.java:98)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1893)
at com.nsn.flink.service.DealRegisterFile13.main(DealRegisterFile13.java:98)
Caused by: java.util.concurrent.TimeoutException
at
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)