看报错是TM挂了,具体原因需要分析TM日志,有可能是上面答复中相同的问题,也有可能是其他原因造成的。


Thank you~

Xintong Song



On Mon, Oct 21, 2019 at 11:36 AM [email protected] <[email protected]> wrote:

> 参考:
> http://mail-archives.apache.org/mod_mbox/flink-user-zh/201905.mbox/%[email protected]%3E
>
>
>
> [email protected]
>
> 发件人: [email protected]
> 发送时间: 2019-10-21 11:05
> 收件人: user-zh
> 抄送: zhangjunjie1130
> 主题: Flink提jar包部署到Yarn上报错
> 您好:
>         我的程序是从kafka取数,然后Flink处理后有写入kafka;本地运行正常,上传到Yarn集群上报错,
>        Flink版本是:1.7.2
>
> 错误是:
> 2019-10-21 09:52:30,054 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint
> triggering task Source: Custom Source -> Flat Map -> Map -> Map -> Sink:
> flink-conncetors-kafka (1/1) of job 7d5dfa42776d679eb240fa833444bc22 is not
> in state RUNNING but DEPLOYING instead. Aborting checkpoint.
> 2019-10-21 09:52:30,389 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> Custom Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka
> (1/1) (05b67b88bf084a9c9884201d224768b4) switched from DEPLOYING to RUNNING.
> 2019-10-21 09:52:31,054 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 1 @ 1571622751054 for job 7d5dfa42776d679eb240fa833444bc22.
> 2019-10-21 09:52:31,488 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> - Implementation error: Unhandled exception.
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> No TaskExecutor registered under container_e12_1562833757356_1953_01_000002.
>         at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
>         at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>         at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2019-10-21 09:52:34,467 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> - Implementation error: Unhandled exception.
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> No TaskExecutor registered under container_e12_1562833757356_1953_01_000002.
>         at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
>         at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>         at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> ========================================
> 相关代码:
>                  final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(1000);
>         Properties properties1 = new Properties();
>         properties1.setProperty("bootstrap.servers", "xxx:9092");
>         properties1.setProperty("group.id", "test");
>
>         System.out.println("333333333333333333333333");
>
>         FlinkKafkaConsumer010<String> myConsumer = new
> FlinkKafkaConsumer010<String>("bigeyeservertopictest", new
> SimpleStringSchema(), properties1);
>
>         DataStream<String> stream = env.addSource(myConsumer);
> //        stream.print();
>         DataStream<String> student = stream.flatMap(new
> StringToJsonObject()).map(value -> value.toJSONArraytoString())
>                 .map(new MapFunction<JSONArray, String>() {
>                     public String map(JSONArray jsonArray) throws
> Exception{
>                         String source= "bigeye";
>                         JSONObject jsonObject1 = new JSONObject();
>                         String result = null;
> //                        readRedis redis = new readRedis();
>                         for(int i=0;i<jsonArray.size();i++){
> //                            log.info("1111111111111111");
>                             String jsonObjectStr = jsonArray.getString(i);
>                             String timeValue =
> JSONObject.parseObject(jsonObjectStr).getString("time");
>                             String valueValue =
> JSONObject.parseObject(jsonObjectStr).getString("value");
>                             String agentHostnameValue =
> JSONObject.parseObject(jsonObjectStr).getString("agentHostname");
>                             String streamIDValue =
> JSONObject.parseObject(jsonObjectStr).getString("streamID");
> //                            Map<String, String> redisMap =
> redis.getInfo(agentHostnameValue,streamIDValue,source);
>                             // 创建opentsdb数据对象
>                             Map<String, Object> opentsdbValue = new
> HashMap<>();
>                             opentsdbValue.put("metric", streamIDValue);
>                             opentsdbValue.put("timestamp", timeValue);
>                             opentsdbValue.put("value", valueValue);
>                             Gson gson = new Gson();
> //                            opentsdbValue.put("tags",
> gson.fromJson((String) redisMap.get("tags"), Map.class));
>
> //
> jsonObject1.put(flag,timeValue+"+"+streamIDValue);
>                             jsonObject1.put("BigeyeValue",opentsdbValue);
> //                            redis.close();
>
>                         }
>
>                         result = jsonObject1.toString();
>                         return result;
>                     }
>                 })
>                 ;
>         student.addSink(new
> FlinkKafkaProducer010<String>("xxxx:9092","test", new
> SimpleStringSchema())).name("flink-kafka");
>
> 万分感谢!
>
>
>
> [email protected]
>

回复