Hi again, jfyi - not sure if this is correct, but it seems that differing flink jars seemed to be the problem. Here is how I get the RemoteEnvironment running on my setup:
* create a maven project with the flink-quickstart-java archetype * no changes to pom.xml * add my custom job to the project, see [1] * build with `mvn clean install -Pbuild-jar` * run with `java -cp /tmp/flinkremote2.jar:/usr/local/Cellar/apache-flink/1.3.2/libexec/lib/* flinkremote2.RemoteEnvironmentTest` Including the jars of the deployed Flink installation avoids dependency mismatches. In the example code I point the remote environment to the local host. To point it to a remote machine I just make sure I have an identical Flink installation on the machine that calls `java -cp ...` (i.e. include the exact same jars in the classpath). [1] package flinkremote2; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.core.fs.FileSystem; public class RemoteEnvironmentTest { public static void main(final String[] args) throws Exception { final String host = "127.0.0.1"; final String jar = "/tmp/flinkremote2.jar"; System.out.println(String.format("host: %s", host)); final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, 6123, jar); env .fromElements(1, 2, 3, 4, 5, 6) .filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value == 4; } }) .print(); } } > On 16. Nov 2017, at 11:28, Lukas Kircher <lukaskirch...@gmail.com> wrote: > > Good morning, > > I have some problems using a Flink RemoteEnvironment in my Java application. > If I run the following code [1] directly from the IDE it runs as expected. > However, if I package a jar and run this from a CLI via `java -cp ...` I get > the following error [2]. I want to use the RemoteEnvironment to run a > sequence of Flink jobs that depend on each other in my application. I am > using Flink 1.3.2. > > Is there a difference between running this example from IDE vs CLI? Am I > missing something? Did I forget to specify some configuration? > > Thanks for your help, > Lukas > > > > [1] > package flinkremote; > > import org.apache.flink.api.common.functions.FilterFunction; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.core.fs.FileSystem; > > public class RemoteEnvironmentTest { > > public static void main(final String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 6123, > "/tmp/flinkremote.jar"); > env > .fromElements(1, 2, 3, 4, 5, 6) > .filter(new FilterFunction<Integer>() { > @Override > public boolean filter(Integer value) throws Exception { > return value == 4; > } > }) > .writeAsText("/tmp/flinkremote.csv", FileSystem.WriteMode.OVERWRITE); > env.execute("remote environment test"); > } > } > > [2] > Exception in thread "main" > org.apache.flink.client.program.ProgramInvocationException: Could not start > the ActorSystem needed to talk to the JobManager. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:461) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) > at > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211) > at > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188) > at > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172) > at flinkremote.RemoteEnvironmentTest.main(RemoteEnvironmentTest.java:23) > Caused by: org.apache.flink.util.FlinkException: Could not start the > ActorSystem lazily. > at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:230) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:459) > ... 8 more > Caused by: com.typesafe.config.ConfigException$Missing: No configuration > setting found for key 'akka.remote.log-received-messages' > at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164) > at > com.typesafe.config.impl.SimpleConfig.getBoolean(SimpleConfig.java:174) > at akka.remote.RemoteSettings.<init>(RemoteSettings.scala:24) > at > akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:114) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) > at scala.util.Try$.apply(Try.scala:192) > at > akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) > at > akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) > at > akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) > at scala.util.Success.flatMap(Try.scala:231) > at > akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) > at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585) > at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578) > at akka.actor.ActorSystem$.apply(ActorSystem.scala:142) > at akka.actor.ActorSystem$.apply(ActorSystem.scala:119) > at akka.actor.ActorSystem$.create(ActorSystem.scala:67) > at > org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:104) > at > org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:92) > at > org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala) > at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:226) > ... 9 more > > > >