参考:http://mail-archives.apache.org/mod_mbox/flink-user-zh/201905.mbox/%[email protected]%3E



[email protected]
 
发件人: [email protected]
发送时间: 2019-10-21 11:05
收件人: user-zh
抄送: zhangjunjie1130
主题: Flink提jar包部署到Yarn上报错
您好:
        我的程序是从kafka取数,然后Flink处理后有写入kafka;本地运行正常,上传到Yarn集群上报错,
       Flink版本是:1.7.2
 
错误是:
2019-10-21 09:52:30,054 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
triggering task Source: Custom Source -> Flat Map -> Map -> Map -> Sink: 
flink-conncetors-kafka (1/1) of job 7d5dfa42776d679eb240fa833444bc22 is not in 
state RUNNING but DEPLOYING instead. Aborting checkpoint.
2019-10-21 09:52:30,389 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom 
Source -> Flat Map -> Map -> Map -> Sink: flink-conncetors-kafka (1/1) 
(05b67b88bf084a9c9884201d224768b4) switched from DEPLOYING to RUNNING.
2019-10-21 09:52:31,054 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 1 @ 1571622751054 for job 7d5dfa42776d679eb240fa833444bc22.
2019-10-21 09:52:31,488 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler  - 
Implementation error: Unhandled exception.
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
 No TaskExecutor registered under container_e12_1562833757356_1953_01_000002.
        at 
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
        at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2019-10-21 09:52:34,467 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler  - 
Implementation error: Unhandled exception.
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
 No TaskExecutor registered under container_e12_1562833757356_1953_01_000002.
        at 
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
        at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
========================================
相关代码:
                 final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(1000);
        Properties properties1 = new Properties();
        properties1.setProperty("bootstrap.servers", "xxx:9092");
        properties1.setProperty("group.id", "test");
 
        System.out.println("333333333333333333333333");
 
        FlinkKafkaConsumer010<String> myConsumer = new 
FlinkKafkaConsumer010<String>("bigeyeservertopictest", new 
SimpleStringSchema(), properties1);
 
        DataStream<String> stream = env.addSource(myConsumer);
//        stream.print();
        DataStream<String> student = stream.flatMap(new 
StringToJsonObject()).map(value -> value.toJSONArraytoString())
                .map(new MapFunction<JSONArray, String>() {
                    public String map(JSONArray jsonArray) throws Exception{
                        String source= "bigeye";
                        JSONObject jsonObject1 = new JSONObject();
                        String result = null;
//                        readRedis redis = new readRedis();
                        for(int i=0;i<jsonArray.size();i++){
//                            log.info("1111111111111111");
                            String jsonObjectStr = jsonArray.getString(i);
                            String timeValue = 
JSONObject.parseObject(jsonObjectStr).getString("time");
                            String valueValue = 
JSONObject.parseObject(jsonObjectStr).getString("value");
                            String agentHostnameValue = 
JSONObject.parseObject(jsonObjectStr).getString("agentHostname");
                            String streamIDValue = 
JSONObject.parseObject(jsonObjectStr).getString("streamID");
//                            Map<String, String> redisMap = 
redis.getInfo(agentHostnameValue,streamIDValue,source);
                            // 创建opentsdb数据对象
                            Map<String, Object> opentsdbValue = new HashMap<>();
                            opentsdbValue.put("metric", streamIDValue);
                            opentsdbValue.put("timestamp", timeValue);
                            opentsdbValue.put("value", valueValue);
                            Gson gson = new Gson();
//                            opentsdbValue.put("tags", gson.fromJson((String) 
redisMap.get("tags"), Map.class));
 
//                            jsonObject1.put(flag,timeValue+"+"+streamIDValue);
                            jsonObject1.put("BigeyeValue",opentsdbValue);
//                            redis.close();
 
                        }
 
                        result = jsonObject1.toString();
                        return result;
                    }
                })
                ;
        student.addSink(new FlinkKafkaProducer010<String>("xxxx:9092","test", 
new SimpleStringSchema())).name("flink-kafka");
 
万分感谢!
 
 
 
[email protected]

回复