[jira] [Updated] (FLINK-6689) Remote StreamExecutionEnvironment fails to submit jobs against LocalFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-6689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-6689: Component/s: Client > Remote StreamExecutionEnvironment fails to submit jobs against > LocalFlinkMiniCluster > > > Key: FLINK-6689 > URL: https://issues.apache.org/jira/browse/FLINK-6689 > Project: Flink > Issue Type: Bug > Components: Client, Job-Submission >Affects Versions: 1.3.0 >Reporter: Nico Kruber > Fix For: 1.3.0 > > > The following Flink programs fails to execute with the current 1.3 branch > (1.2 works) because the leader session ID being used is wrong: > {code:java} > final String jobManagerAddress = "localhost"; > final int jobManagerPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT; > final Configuration config = new Configuration(); > config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, > jobManagerAddress); > config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, > jobManagerPort); > config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); > final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, > false); > cluster.start(true); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createRemoteEnvironment(jobManagerAddress, > jobManagerPort); > env.fromElements(1l).addSink(new DiscardingSink()); > // fails due to leader session id being wrong: > env.execute("test"); > {code} > Output from logs contais: > {code} > ... > 16:24:57,551 INFO org.apache.flink.runtime.webmonitor.JobManagerRetriever >- New leader reachable under > akka.tcp://flink@localhost:6123/user/jobmanager:ff0d56cf-6205-4dd4-a266-03847f4d6944. > 16:24:57,894 INFO > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment - Running > remotely at localhost:6123 > 16:24:58,121 INFO org.apache.flink.client.program.StandaloneClusterClient >- Starting client actor system. > 16:24:58,123 INFO org.apache.flink.runtime.util.LeaderRetrievalUtils >- Trying to select the network interface and address to use by connecting > to the leading JobManager. > 16:24:58,128 INFO org.apache.flink.runtime.util.LeaderRetrievalUtils >- TaskManager will try to connect for 1 milliseconds before falling > back to heuristics > 16:24:58,132 INFO org.apache.flink.runtime.net.ConnectionUtils >- Retrieved new target address localhost/127.0.0.1:6123. > 16:24:58,258 INFO akka.event.slf4j.Slf4jLogger >- Slf4jLogger started > 16:24:58,262 INFO Remoting >- Starting remoting > 16:24:58,375 INFO Remoting >- Remoting started; listening on addresses > :[akka.tcp://fl...@nico-work.fritz.box:43413] > 16:24:58,376 INFO org.apache.flink.client.program.StandaloneClusterClient >- Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for > job completion. > Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for job > completion. > 16:24:58,382 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Disconnect from JobManager null. > 16:24:58,398 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Received SubmitJobAndWait(JobGraph(jobId: > 9bef4793a4b7f4caaad96bd28211cbb9)) but there is no connection to a JobManager > yet. > 16:24:58,398 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Received job test (9bef4793a4b7f4caaad96bd28211cbb9). > 16:24:58,429 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Connect to JobManager > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998]. > 16:24:58,432 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Connected to JobManager at > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader > session id ----. > Connected to JobManager at > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader > session id ----. > 16:24:58,432 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Sending message to JobManager > akka.tcp://flink@localhost:6123/user/jobmanager to submit job test > (9bef4793a4b7f4caaad96bd28211cbb9) and wait for progress > 16:24:58,433 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Upload jar files to job manager > akka.tcp://flink@localhost:6123/user/jobmanager. > 16:24:58,440 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Submit job to the job manager >
[jira] [Updated] (FLINK-6689) Remote StreamExecutionEnvironment fails to submit jobs against LocalFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-6689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-6689: --- Description: The following Flink programs fails to execute with the current 1.3 branch (1.2 works) because the leader session ID being used is wrong: {code:java} final String jobManagerAddress = "localhost"; final int jobManagerPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT; final Configuration config = new Configuration(); config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress); config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, false); cluster.start(true); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(jobManagerAddress, jobManagerPort); env.fromElements(1l).addSink(new DiscardingSink()); // fails due to leader session id being wrong: env.execute("test"); {code} Output from logs contais: {code} ... 16:24:57,551 INFO org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader reachable under akka.tcp://flink@localhost:6123/user/jobmanager:ff0d56cf-6205-4dd4-a266-03847f4d6944. 16:24:57,894 INFO org.apache.flink.streaming.api.environment.RemoteStreamEnvironment - Running remotely at localhost:6123 16:24:58,121 INFO org.apache.flink.client.program.StandaloneClusterClient - Starting client actor system. 16:24:58,123 INFO org.apache.flink.runtime.util.LeaderRetrievalUtils - Trying to select the network interface and address to use by connecting to the leading JobManager. 16:24:58,128 INFO org.apache.flink.runtime.util.LeaderRetrievalUtils - TaskManager will try to connect for 1 milliseconds before falling back to heuristics 16:24:58,132 INFO org.apache.flink.runtime.net.ConnectionUtils - Retrieved new target address localhost/127.0.0.1:6123. 16:24:58,258 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 16:24:58,262 INFO Remoting - Starting remoting 16:24:58,375 INFO Remoting - Remoting started; listening on addresses :[akka.tcp://fl...@nico-work.fritz.box:43413] 16:24:58,376 INFO org.apache.flink.client.program.StandaloneClusterClient - Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for job completion. Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for job completion. 16:24:58,382 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Disconnect from JobManager null. 16:24:58,398 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Received SubmitJobAndWait(JobGraph(jobId: 9bef4793a4b7f4caaad96bd28211cbb9)) but there is no connection to a JobManager yet. 16:24:58,398 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Received job test (9bef4793a4b7f4caaad96bd28211cbb9). 16:24:58,429 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Connect to JobManager Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998]. 16:24:58,432 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader session id ----. Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader session id ----. 16:24:58,432 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Sending message to JobManager akka.tcp://flink@localhost:6123/user/jobmanager to submit job test (9bef4793a4b7f4caaad96bd28211cbb9) and wait for progress 16:24:58,433 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Upload jar files to job manager akka.tcp://flink@localhost:6123/user/jobmanager. 16:24:58,440 INFO org.apache.flink.runtime.client.JobSubmissionClientActor - Submit job to the job manager akka.tcp://flink@localhost:6123/user/jobmanager. 16:24:58,522 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(----,SubmitJob(JobGraph(jobId: 9bef4793a4b7f4caaad96bd28211cbb9),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID ff0d56cf-6205-4dd4-a266-03847f4d6944 did not equal the received leader session ID ----. {code} was: The following Flink programs fails to execute with the current 1.3 branch (1.2 works): {code:java} final String jobManagerAddress = "localhost"; final int jobManagerPort =