Hi 大家好,请教一个问题:
Flink版本1.9.1,使用MiniCluster在本地提交任务,
不能正常执行任务,下面的报错信息是在执行miniCluster.start()时报的,并且程序一直卡在这一步
miniCluster.executeJobBlocking(getSimpleJob(numOfTMs * slotsPerTM))
Java Code:
public static void main(String[] args) throws Exception {
final int numOfTMs = 3;
final int slotsPerTM = 7;
final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(numOfTMs)
.setNumSlotsPerTaskManager(slotsPerTM)
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setConfiguration(getDefaultConfiguration())
.build();
final MiniCluster miniCluster = new MiniCluster(cfg);
try {
miniCluster.start();
JobExecutionResult result =
miniCluster.executeJobBlocking(getSimpleJob(numOfTMs * slotsPerTM));
System.out.println(result.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
miniCluster.close();
}
}
private static Configuration getDefaultConfiguration() {
final Configuration configuration = new Configuration();
configuration.setString(RestOptions.BIND_PORT, "0");
return configuration;
}
private static JobGraph getSimpleJob(int parallelism) throws IOException {
final JobVertex task = new JobVertex("Test task");
task.setParallelism(parallelism);
task.setMaxParallelism(parallelism);
task.setInvokableClass(NoOpInvokable.class);
final JobGraph jg = new JobGraph(new JobID(), "Test Job", task);
jg.setScheduleMode(ScheduleMode.EAGER);
final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
1000));
jg.setExecutionConfig(executionConfig);
return jg;
}
报错信息:
02/12/2019 20:49:21.212 INFO
[org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils] Actor system started at
akka.tcp://[email protected]:65105
02/12/2019 20:49:21.219 INFO
[org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for
org.apache.flink.runtime.metrics.dump.MetricQueryService at
akka://flink-metrics/user/MetricQueryService .
02/12/2019 20:49:21.227 ERROR
[org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException:
akka://flink-metrics/user/MetricQueryService: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been
set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 7 more
02/12/2019 20:49:21.229 INFO
[org.apache.flink.runtime.minicluster.MiniCluster] Starting high-availability
services
02/12/2019 20:49:21.245 INFO [org.apache.flink.runtime.blob.BlobServer]
Created BLOB server storage directory
/var/folders/7b/x2zfnt157ls4f27syvlyjjsc0000gn/T/blobStore-575c98fc-54f2-45f8-a183-96d3880e940b
……
02/12/2019 20:49:21.671 INFO
[org.apache.flink.runtime.taskexecutor.TaskManagerServices] Limiting managed
memory to 524287 MB, memory will be allocated lazily.
02/12/2019 20:49:21.674 DEBUG [org.apache.flink.runtime.memory.MemoryManager]
Initialized MemoryManager with total memory size 549754765312, number of slots
1, page size 32768, memory type HEAP, pre allocate memory false and number of
non allocated pages 16777184.
02/12/2019 20:49:21.685 INFO
[org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration] Messages have
a max timeout of 10000 ms
02/12/2019 20:49:21.696 INFO
[org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for
org.apache.flink.runtime.taskexecutor.TaskExecutor at
akka://flink/user/taskmanager_0 .
02/12/2019 20:49:21.697 ERROR
[org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink/user/taskmanager_0:
exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been
set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more
02/12/2019 20:49:21.735 INFO [org.apache.flink.configuration.Configuration]
Config uses fallback configuration key 'rest.port' instead of key
'rest.bind-port'
02/12/2019 20:49:21.774 DEBUG
[org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory]
Starting Dispatcher REST endpoint.
02/12/2019 20:49:21.774 INFO
[org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint] Starting rest
endpoint.
……
02/12/2019 20:49:22.245 INFO
[org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at
akka://flink/user/resourcemanager .
02/12/2019 20:49:22.246 ERROR
[org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink/user/resourcemanager:
exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been
set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more
02/12/2019 20:49:22.257 INFO
[org.apache.flink.runtime.rpc.akka.AkkaRpcService] Starting RPC endpoint for
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/dispatcher .
02/12/2019 20:49:22.257 ERROR
[org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy]
Actor failed with exception. Stopping it now.
akka.actor.ActorInitializationException: akka://flink/user/dispatcher:
exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:175)
at akka.actor.ActorCell.create(ActorCell.scala:607)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.actor.IllegalActorStateException: Actor behavior has not been
set with receive(...)
at akka.actor.IllegalActorStateException$.apply(Actor.scala:140)
at akka.actor.AbstractActor.receive(AbstractActor.scala:74)
at akka.actor.ActorCell.newActor(ActorCell.scala:568)
at akka.actor.ActorCell.create(ActorCell.scala:588)
... 9 more
02/12/2019 20:49:22.267 DEBUG
[org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory]
Starting ResourceManager.
02/12/2019 20:49:22.267 DEBUG
[org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory]
Starting Dispatcher.
02/12/2019 20:49:22.268 INFO
[org.apache.flink.runtime.minicluster.MiniCluster] Flink Mini Cluster started
successfully
| |
叶贤勋
|
|
[email protected]
|
签名由网易邮箱大师定制
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.mini.cluster.local;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A simple task that does nothing and finishes immediately.
*/
public class NoOpInvokable extends AbstractInvokable {
private static final Logger LOG =
LoggerFactory.getLogger(NoOpInvokable.class);
public NoOpInvokable(Environment environment) {
super(environment);
}
@Override
public void invoke() {
LOG.info("I am invoked..");
}
}