看报错是TM挂了,具体原因需要分析TM日志,有可能是上面答复中相同的问题,也有可能是其他原因造成的。
Thank you~ Xintong Song On Mon, Oct 21, 2019 at 11:36 AM [email protected] <[email protected]> wrote: > 参考: > http://mail-archives.apache.org/mod_mbox/flink-user-zh/201905.mbox/%[email protected]%3E > > > > [email protected] > > 发件人: [email protected] > 发送时间: 2019-10-21 11:05 > 收件人: user-zh > 抄送: zhangjunjie1130 > 主题: Flink提jar包部署到Yarn上报错 > 您好: > 我的程序是从kafka取数,然后Flink处理后有写入kafka;本地运行正常,上传到Yarn集群上报错, > Flink版本是:1.7.2 > > 错误是: > 2019-10-21 09:52:30,054 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Custom Source -> Flat Map -> Map -> Map -> Sink: > flink-conncetors-kafka (1/1) of job 7d5dfa42776d679eb240fa833444bc22 is not > in state RUNNING but DEPLOYING instead. Aborting checkpoint. > 2019-10-21 09:52:30,389 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka > (1/1) (05b67b88bf084a9c9884201d224768b4) switched from DEPLOYING to RUNNING. > 2019-10-21 09:52:31,054 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 1 @ 1571622751054 for job 7d5dfa42776d679eb240fa833444bc22. > 2019-10-21 09:52:31,488 ERROR > org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler > - Implementation error: Unhandled exception. > org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: > No TaskExecutor registered under container_e12_1562833757356_1953_01_000002. > at > org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2019-10-21 09:52:34,467 ERROR > org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler > - Implementation error: Unhandled exception. > org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: > No TaskExecutor registered under container_e12_1562833757356_1953_01_000002. > at > org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > ======================================== > 相关代码: > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(1000); > Properties properties1 = new Properties(); > properties1.setProperty("bootstrap.servers", "xxx:9092"); > properties1.setProperty("group.id", "test"); > > System.out.println("333333333333333333333333"); > > FlinkKafkaConsumer010<String> myConsumer = new > FlinkKafkaConsumer010<String>("bigeyeservertopictest", new > SimpleStringSchema(), properties1); > > DataStream<String> stream = env.addSource(myConsumer); > // stream.print(); > DataStream<String> student = stream.flatMap(new > StringToJsonObject()).map(value -> value.toJSONArraytoString()) > .map(new MapFunction<JSONArray, String>() { > public String map(JSONArray jsonArray) throws > Exception{ > String source= "bigeye"; > JSONObject jsonObject1 = new JSONObject(); > String result = null; > // readRedis redis = new readRedis(); > for(int i=0;i<jsonArray.size();i++){ > // log.info("1111111111111111"); > String jsonObjectStr = jsonArray.getString(i); > String timeValue = > JSONObject.parseObject(jsonObjectStr).getString("time"); > String valueValue = > JSONObject.parseObject(jsonObjectStr).getString("value"); > String agentHostnameValue = > JSONObject.parseObject(jsonObjectStr).getString("agentHostname"); > String streamIDValue = > JSONObject.parseObject(jsonObjectStr).getString("streamID"); > // Map<String, String> redisMap = > redis.getInfo(agentHostnameValue,streamIDValue,source); > // 创建opentsdb数据对象 > Map<String, Object> opentsdbValue = new > HashMap<>(); > opentsdbValue.put("metric", streamIDValue); > opentsdbValue.put("timestamp", timeValue); > opentsdbValue.put("value", valueValue); > Gson gson = new Gson(); > // opentsdbValue.put("tags", > gson.fromJson((String) redisMap.get("tags"), Map.class)); > > // > jsonObject1.put(flag,timeValue+"+"+streamIDValue); > jsonObject1.put("BigeyeValue",opentsdbValue); > // redis.close(); > > } > > result = jsonObject1.toString(); > return result; > } > }) > ; > student.addSink(new > FlinkKafkaProducer010<String>("xxxx:9092","test", new > SimpleStringSchema())).name("flink-kafka"); > > 万分感谢! > > > > [email protected] >
