[jira] [Updated] (FLINK-6689) Remote StreamExecutionEnvironment fails to submit jobs against LocalFlinkMiniCluster

2017-05-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-6689:

Component/s: Client

> Remote StreamExecutionEnvironment fails to submit jobs against 
> LocalFlinkMiniCluster
> 
>
> Key: FLINK-6689
> URL: https://issues.apache.org/jira/browse/FLINK-6689
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Job-Submission
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
> Fix For: 1.3.0
>
>
> The following Flink programs fails to execute with the current 1.3 branch 
> (1.2 works) because the leader session ID being used is wrong:
> {code:java}
> final String jobManagerAddress = "localhost";
> final int jobManagerPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
> final Configuration config = new Configuration();
>   config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
> jobManagerAddress);
>   config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
> jobManagerPort);
>   config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
> final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, 
> false);
> cluster.start(true);
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createRemoteEnvironment(jobManagerAddress, 
> jobManagerPort);
> env.fromElements(1l).addSink(new DiscardingSink());
> // fails due to leader session id being wrong:
> env.execute("test");
> {code}
> Output from logs contais:
> {code}
> ...
> 16:24:57,551 INFO  org.apache.flink.runtime.webmonitor.JobManagerRetriever
>- New leader reachable under 
> akka.tcp://flink@localhost:6123/user/jobmanager:ff0d56cf-6205-4dd4-a266-03847f4d6944.
> 16:24:57,894 INFO  
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment  - Running 
> remotely at localhost:6123
> 16:24:58,121 INFO  org.apache.flink.client.program.StandaloneClusterClient
>- Starting client actor system.
> 16:24:58,123 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils 
>- Trying to select the network interface and address to use by connecting 
> to the leading JobManager.
> 16:24:58,128 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils 
>- TaskManager will try to connect for 1 milliseconds before falling 
> back to heuristics
> 16:24:58,132 INFO  org.apache.flink.runtime.net.ConnectionUtils   
>- Retrieved new target address localhost/127.0.0.1:6123.
> 16:24:58,258 INFO  akka.event.slf4j.Slf4jLogger   
>- Slf4jLogger started
> 16:24:58,262 INFO  Remoting   
>- Starting remoting
> 16:24:58,375 INFO  Remoting   
>- Remoting started; listening on addresses 
> :[akka.tcp://fl...@nico-work.fritz.box:43413]
> 16:24:58,376 INFO  org.apache.flink.client.program.StandaloneClusterClient
>- Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for 
> job completion.
> Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for job 
> completion.
> 16:24:58,382 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>- Disconnect from JobManager null.
> 16:24:58,398 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>- Received SubmitJobAndWait(JobGraph(jobId: 
> 9bef4793a4b7f4caaad96bd28211cbb9)) but there is no connection to a JobManager 
> yet.
> 16:24:58,398 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>- Received job test (9bef4793a4b7f4caaad96bd28211cbb9).
> 16:24:58,429 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>- Connect to JobManager 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998].
> 16:24:58,432 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>- Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader 
> session id ----.
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader 
> session id ----.
> 16:24:58,432 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>- Sending message to JobManager 
> akka.tcp://flink@localhost:6123/user/jobmanager to submit job test 
> (9bef4793a4b7f4caaad96bd28211cbb9) and wait for progress
> 16:24:58,433 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>- Upload jar files to job manager 
> akka.tcp://flink@localhost:6123/user/jobmanager.
> 16:24:58,440 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor   
>- Submit job to the job manager 
> 

[jira] [Updated] (FLINK-6689) Remote StreamExecutionEnvironment fails to submit jobs against LocalFlinkMiniCluster

2017-05-24 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-6689:
---
Description: 
The following Flink programs fails to execute with the current 1.3 branch (1.2 
works) because the leader session ID being used is wrong:

{code:java}
final String jobManagerAddress = "localhost";
final int jobManagerPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;

final Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
jobManagerAddress);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
jobManagerPort);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, false);
cluster.start(true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(jobManagerAddress, 
jobManagerPort);

env.fromElements(1l).addSink(new DiscardingSink());

// fails due to leader session id being wrong:
env.execute("test");
{code}

Output from logs contais:
{code}
...
16:24:57,551 INFO  org.apache.flink.runtime.webmonitor.JobManagerRetriever  
 - New leader reachable under 
akka.tcp://flink@localhost:6123/user/jobmanager:ff0d56cf-6205-4dd4-a266-03847f4d6944.
16:24:57,894 INFO  
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment  - Running 
remotely at localhost:6123
16:24:58,121 INFO  org.apache.flink.client.program.StandaloneClusterClient  
 - Starting client actor system.
16:24:58,123 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils   
 - Trying to select the network interface and address to use by connecting to 
the leading JobManager.
16:24:58,128 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils   
 - TaskManager will try to connect for 1 milliseconds before falling back 
to heuristics
16:24:58,132 INFO  org.apache.flink.runtime.net.ConnectionUtils 
 - Retrieved new target address localhost/127.0.0.1:6123.
16:24:58,258 INFO  akka.event.slf4j.Slf4jLogger 
 - Slf4jLogger started
16:24:58,262 INFO  Remoting 
 - Starting remoting
16:24:58,375 INFO  Remoting 
 - Remoting started; listening on addresses 
:[akka.tcp://fl...@nico-work.fritz.box:43413]
16:24:58,376 INFO  org.apache.flink.client.program.StandaloneClusterClient  
 - Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for job 
completion.
Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for job 
completion.
16:24:58,382 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor 
 - Disconnect from JobManager null.
16:24:58,398 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor 
 - Received SubmitJobAndWait(JobGraph(jobId: 9bef4793a4b7f4caaad96bd28211cbb9)) 
but there is no connection to a JobManager yet.
16:24:58,398 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor 
 - Received job test (9bef4793a4b7f4caaad96bd28211cbb9).
16:24:58,429 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor 
 - Connect to JobManager 
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998].
16:24:58,432 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor 
 - Connected to JobManager at 
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader 
session id ----.
Connected to JobManager at 
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader 
session id ----.
16:24:58,432 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor 
 - Sending message to JobManager 
akka.tcp://flink@localhost:6123/user/jobmanager to submit job test 
(9bef4793a4b7f4caaad96bd28211cbb9) and wait for progress
16:24:58,433 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor 
 - Upload jar files to job manager 
akka.tcp://flink@localhost:6123/user/jobmanager.
16:24:58,440 INFO  org.apache.flink.runtime.client.JobSubmissionClientActor 
 - Submit job to the job manager 
akka.tcp://flink@localhost:6123/user/jobmanager.
16:24:58,522 WARN  org.apache.flink.runtime.jobmanager.JobManager   
 - Discard message 
LeaderSessionMessage(----,SubmitJob(JobGraph(jobId:
 9bef4793a4b7f4caaad96bd28211cbb9),EXECUTION_RESULT_AND_STATE_CHANGES)) because 
the expected leader session ID ff0d56cf-6205-4dd4-a266-03847f4d6944 did not 
equal the received leader session ID ----.

{code}

  was:
The following Flink programs fails to execute with the current 1.3 branch (1.2 
works):

{code:java}
final String jobManagerAddress = "localhost";
final int jobManagerPort =