Hi 大家好,请教一个问题:
    Flink版本1.9.1,使用MiniCluster在本地提交任务, 
不能正常执行任务,下面的报错信息是在执行miniCluster.start()时报的,并且程序一直卡在这一步 
miniCluster.executeJobBlocking(getSimpleJob(numOfTMs * slotsPerTM))
    Java Code:

public static void main(String[] args) throws Exception {
final int numOfTMs = 3;
final int slotsPerTM = 7;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
            .setNumTaskManagers(numOfTMs)
            .setNumSlotsPerTaskManager(slotsPerTM)
            .setRpcServiceSharing(RpcServiceSharing.SHARED)
            .setConfiguration(getDefaultConfiguration())
            .build();
final MiniCluster miniCluster = new MiniCluster(cfg);
try {
        miniCluster.start();
        JobExecutionResult result = 
miniCluster.executeJobBlocking(getSimpleJob(numOfTMs * slotsPerTM));
        System.out.println(result.toString());
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        miniCluster.close();
    }
}

private static Configuration getDefaultConfiguration() {
final Configuration configuration = new Configuration();
    configuration.setString(RestOptions.BIND_PORT, "0");

return configuration;
}

private static JobGraph getSimpleJob(int parallelism) throws IOException {
final JobVertex task = new JobVertex("Test task");
    task.setParallelism(parallelism);
    task.setMaxParallelism(parallelism);
    task.setInvokableClass(NoOpInvokable.class);

final JobGraph jg = new JobGraph(new JobID(), "Test Job", task);
    jg.setScheduleMode(ScheduleMode.EAGER);

final ExecutionConfig executionConfig = new ExecutionConfig();
    
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
 1000));
    jg.setExecutionConfig(executionConfig);

return jg;
}
      报错信息:
        02/12/2019 20:49:21.212  INFO 
[org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils] Actor system started at 
akka.tcp://flink-metrics@10.242.32.235:65105
02/12/2019 20:49:21.219  INFO 
[org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for 
org.apache.flink.runtime.metrics.dump.MetricQueryService at 
akka://flink-metrics/user/MetricQueryService .
02/12/2019 20:49:21.227 ERROR 
[org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
 Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: 
akka://flink-metrics/user/MetricQueryService: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been 
set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 7 more
02/12/2019 20:49:21.229  INFO 
[org.apache.flink.runtime.minicluster.MiniCluster] Starting high-availability 
services
02/12/2019 20:49:21.245  INFO [org.apache.flink.runtime.blob.BlobServer] 
Created BLOB server storage directory 
/var/folders/7b/x2zfnt157ls4f27syvlyjjsc0000gn/T/blobStore-575c98fc-54f2-45f8-a183-96d3880e940b
……
02/12/2019 20:49:21.671  INFO 
[org.apache.flink.runtime.taskexecutor.TaskManagerServices] Limiting managed 
memory to 524287 MB, memory will be allocated lazily.
02/12/2019 20:49:21.674 DEBUG [org.apache.flink.runtime.memory.MemoryManager] 
Initialized MemoryManager with total memory size 549754765312, number of slots 
1, page size 32768, memory type HEAP, pre allocate memory false and number of 
non allocated pages 16777184.
02/12/2019 20:49:21.685  INFO 
[org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration] Messages have 
a max timeout of 10000 ms
02/12/2019 20:49:21.696  INFO 
[org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for 
org.apache.flink.runtime.taskexecutor.TaskExecutor at 
akka://flink/user/taskmanager_0 .
02/12/2019 20:49:21.697 ERROR 
[org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
 Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink/user/taskmanager_0: 
exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been 
set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more
02/12/2019 20:49:21.735  INFO [org.apache.flink.configuration.Configuration] 
Config uses fallback configuration key 'rest.port' instead of key 
'rest.bind-port'
02/12/2019 20:49:21.774 DEBUG 
[org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory]
 Starting Dispatcher REST endpoint.
02/12/2019 20:49:21.774  INFO 
[org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint] Starting rest 
endpoint.
……
02/12/2019 20:49:22.245  INFO 
[org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at 
akka://flink/user/resourcemanager .
02/12/2019 20:49:22.246 ERROR 
[org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
 Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink/user/resourcemanager: 
exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been 
set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more
02/12/2019 20:49:22.257  INFO 
[org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/dispatcher .
02/12/2019 20:49:22.257 ERROR 
[org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
 Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink/user/dispatcher: 
exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been 
set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more
02/12/2019 20:49:22.267 DEBUG 
[org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory]
 Starting ResourceManager.
02/12/2019 20:49:22.267 DEBUG 
[org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory]
 Starting Dispatcher.
02/12/2019 20:49:22.268  INFO 
[org.apache.flink.runtime.minicluster.MiniCluster] Flink Mini Cluster started 
successfully
| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.mini.cluster.local;

import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A simple task that does nothing and finishes immediately.
 */
public class NoOpInvokable extends AbstractInvokable {
        private static final Logger LOG = 
LoggerFactory.getLogger(NoOpInvokable.class);

        public NoOpInvokable(Environment environment) {
                super(environment);
        }

        @Override
        public void invoke() {
                LOG.info("I am invoked..");
        }
}

Reply via email to