[jira] [Issue Comment Deleted] (FLINK-4233) Simplify leader election / leader session ID assignment

2019-09-19 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-4233:

Comment: was deleted

(was: Further, if it is assumed that there are never too many contenders, in 
FLINK-10333 we can adopt the unoptimized version of leader election, i.e., 
create the universe leader node, and if fails, wait for its deletion. Then we 
trade off performance(no impact) for simplicity.)

> Simplify leader election / leader session ID assignment
> ---
>
> Key: FLINK-4233
> URL: https://issues.apache.org/jira/browse/FLINK-4233
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.0.3
>Reporter: Stephan Ewen
>Priority: Major
>
> Currently, there are two separate actions and znodes involved in leader 
> election and communication of the leader session ID and leader URL.
> This leads to some quite elaborate code that tries to make sure that the 
> leader session ID and leader URL always eventually converge to those of the 
> leader.
> It is simpler to just encode both the ID and the URL into an id-string that 
> is attached to the leader latch znode. One would have to create a new leader 
> latch each time a contender re-applies for leadership.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-4233) Simplify leader election / leader session ID assignment

2019-09-19 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-4233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16933899#comment-16933899
 ] 

TisonKun commented on FLINK-4233:
-

Further, if it is assumed that there are never too many contenders, in 
FLINK-10333 we can adopt the unoptimized version of leader election, i.e., 
create the universe leader node, and if fails, wait for its deletion. Then we 
trade off performance(no impact) for simplicity.

> Simplify leader election / leader session ID assignment
> ---
>
> Key: FLINK-4233
> URL: https://issues.apache.org/jira/browse/FLINK-4233
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.0.3
>Reporter: Stephan Ewen
>Priority: Major
>
> Currently, there are two separate actions and znodes involved in leader 
> election and communication of the leader session ID and leader URL.
> This leads to some quite elaborate code that tries to make sure that the 
> leader session ID and leader URL always eventually converge to those of the 
> leader.
> It is simpler to just encode both the ID and the URL into an id-string that 
> is attached to the leader latch znode. One would have to create a new leader 
> latch each time a contender re-applies for leadership.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-4233) Simplify leader election / leader session ID assignment

2019-09-19 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-4233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16933879#comment-16933879
 ] 

TisonKun commented on FLINK-4233:
-

Thanks for picking up this topic! I'm thinking of something similar to this one 
recently.

Basically it requires a new ZKLeaderRetrievalService that works like 
{{PathChildrenCache}} in Curator(or we just reuse it) to monitoring leader 
latch registry. In this way, retriever will automatic treat the latch with the 
smallest sequential number as the leader info node and retrieve the 
information. As side effects, leader needs not to "publish" its leader 
information any more.

The downside would be overhead so-called {{PathChildrenCache}} costs. However, 
given that there is typically 1 or 2 latch at the same time and leader 
changes(after we tolerate SUSPENDED a.k.a CONNECTIONLOSS) is rare event, 
hopefully it doesn't impact too much.

> Simplify leader election / leader session ID assignment
> ---
>
> Key: FLINK-4233
> URL: https://issues.apache.org/jira/browse/FLINK-4233
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.0.3
>Reporter: Stephan Ewen
>Priority: Major
>
> Currently, there are two separate actions and znodes involved in leader 
> election and communication of the leader session ID and leader URL.
> This leads to some quite elaborate code that tries to make sure that the 
> leader session ID and leader URL always eventually converge to those of the 
> leader.
> It is simpler to just encode both the ID and the URL into an id-string that 
> is attached to the leader latch znode. One would have to create a new leader 
> latch each time a contender re-applies for leadership.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14096) Merge NewClusterClient into ClusterClient

2019-09-19 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-14096:


Assignee: TisonKun

> Merge NewClusterClient into ClusterClient
> -
>
> Key: FLINK-14096
> URL: https://issues.apache.org/jira/browse/FLINK-14096
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> With the effort under FLINK-10392 we don't need the bridge class 
> {{NewClusterClient}} any more. We can just merge {{NewClusterClient}} into 
> {{ClusterClient}} towards an interface-ized {{ClusterClient}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14113) Remove class JobWithJars

2019-09-19 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-14113:


Assignee: TisonKun

> Remove class JobWithJars
> 
>
> Key: FLINK-14113
> URL: https://issues.apache.org/jira/browse/FLINK-14113
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> {{JobWithJars}} is a batch-only concept, acts as a POJO consists of {{Plan}} 
> and {{URL}}s of libs. We can
> 1. inline the usage of {{Plan}} and {{URL}}s as we do in streaming case.
> 2. extract static methods into a utility class said {{ClientUtils}}.
> The main purpose here is towards no batch specific concept that doesn't bring 
> too much good.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14130) Remove ClusterClient.run() methods

2019-09-19 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16933494#comment-16933494
 ] 

TisonKun commented on FLINK-14130:
--

Great to have! I ever thought it is not easily to achieve but if you have a 
plan I'm glad to help in the review side :-)

> Remove ClusterClient.run() methods
> --
>
> Key: FLINK-14130
> URL: https://issues.apache.org/jira/browse/FLINK-14130
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: Aljoscha Krettek
>Priority: Major
>
> {{ClusterClient}} is an internal interface of the {{flink-clients}} package. 
> It should only be concerned with submitting {{JobGraphs}} to a cluster, which 
> is what {{submitJob()}} does. 
> The {{run()}} methods are concerned with unpacking programs or job-with-jars 
> and at the end use {{submitJob()}} in some way, they should reside in some 
> other component. The only valid remaining run method is {{run(PackagedProgram 
> prog, int parallelism)}}, this could be in {{PackagedProgramUtils}}. The 
> other {{run()}} methods are actually only used in one test: 
> {{ClientTest.shouldSubmitToJobClient()}}. I don't think that test is valid 
> anymore, it evolved for a very long time and now doesn't test what it was 
> supposed to test once.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14112) Removing zookeeper state should cause the task manager and job managers to restart

2019-09-19 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16933184#comment-16933184
 ] 

TisonKun commented on FLINK-14112:
--

Thanks for your insights [~trohrmann]. Sounds reasonable to me. Currently I'm 
fine to handle {{null}} value in leader listener.

One thing in addition. There is an edge case that leader election will be 
affected if znodes deleted out of control. If there is only one contender(which 
is in YARN scenario), if the leader latch deleted, no one will be noticed by 
this event and the contender will think itself still the leader. Due to our 
implement details {{ZooKeeperLeaderElectionService}} is a {{NodeCacheListener}} 
and thus if the leader info node got deleted it will try to re-create the 
znode. Thus it is strange if TM cannot recover the connection of RM. 
[~aaronlevin] did you see a reconnect successfully log?

> Removing zookeeper state should cause the task manager and job managers to 
> restart
> --
>
> Key: FLINK-14112
> URL: https://issues.apache.org/jira/browse/FLINK-14112
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / Coordination
>Affects Versions: 1.8.1, 1.9.0
>Reporter: Aaron Levin
>Priority: Minor
>
> Suppose you have a flink application running on a cluster with the following 
> configuration:
> {noformat}
> high-availability.zookeeper.path.root: /flink
> {noformat}
> Now suppose you delete all the znodes within {{/flink}}. I experienced the 
> following:
>  * massive amount of logging
>  * application did not restart
>  * task manager did not crash or restart
>  * job manager did not crash or restart
> From this state I had to restart all the task managers and all the job 
> managers in order for the flink application to recover.
> It would be desirable for the Task Managers and Job Managers to crash if the 
> znode is not available (though perhaps you all have thought about this more 
> deeply than I!)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14010) Dispatcher & JobManagers don't give up leadership when AM is shut down

2019-09-19 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-14010:
-
Affects Version/s: (was: 1.8.1)
   1.8.2

> Dispatcher & JobManagers don't give up leadership when AM is shut down
> --
>
> Key: FLINK-14010
> URL: https://issues.apache.org/jira/browse/FLINK-14010
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.2, 1.9.0, 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Critical
>
> In YARN deployment scenario, YARN RM possibly launches a new AM for the job 
> even if the previous AM does not terminated, for example, when AMRM heartbeat 
> timeout. This is a common case that RM will send a shutdown request to the 
> previous AM and expect the AM shutdown properly.
> However, currently in {{YARNResourceManager}}, we handle this request in 
> {{onShutdownRequest}} which simply close the {{YARNResourceManager}} *but not 
> Dispatcher and JobManagers*. Thus, Dispatcher and JobManager launched in new 
> AM cannot be granted leadership properly. Visually,
> on previous AM: Dispatcher leader, JM leaders
> on new AM: ResourceManager leader
> since on client side or in per-job mode, JobManager address and port are 
> configured as the new AM, the whole cluster goes into an unrecoverable 
> inconsistent status: client all queries the dispatcher on new AM who is now 
> the leader. Briefly, Dispatcher and JobManagers on previous AM do not give up 
> their leadership properly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14010) Dispatcher & JobManagers don't give up leadership when AM is shut down

2019-09-19 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-14010:


Assignee: TisonKun

> Dispatcher & JobManagers don't give up leadership when AM is shut down
> --
>
> Key: FLINK-14010
> URL: https://issues.apache.org/jira/browse/FLINK-14010
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.1, 1.9.0, 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Critical
>
> In YARN deployment scenario, YARN RM possibly launches a new AM for the job 
> even if the previous AM does not terminated, for example, when AMRM heartbeat 
> timeout. This is a common case that RM will send a shutdown request to the 
> previous AM and expect the AM shutdown properly.
> However, currently in {{YARNResourceManager}}, we handle this request in 
> {{onShutdownRequest}} which simply close the {{YARNResourceManager}} *but not 
> Dispatcher and JobManagers*. Thus, Dispatcher and JobManager launched in new 
> AM cannot be granted leadership properly. Visually,
> on previous AM: Dispatcher leader, JM leaders
> on new AM: ResourceManager leader
> since on client side or in per-job mode, JobManager address and port are 
> configured as the new AM, the whole cluster goes into an unrecoverable 
> inconsistent status: client all queries the dispatcher on new AM who is now 
> the leader. Briefly, Dispatcher and JobManagers on previous AM do not give up 
> their leadership properly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14010) Dispatcher & JobManagers don't give up leadership when AM is shut down

2019-09-19 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16933140#comment-16933140
 ] 

TisonKun commented on FLINK-14010:
--

Will send a pull request in hours :-)

> Dispatcher & JobManagers don't give up leadership when AM is shut down
> --
>
> Key: FLINK-14010
> URL: https://issues.apache.org/jira/browse/FLINK-14010
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.1, 1.9.0, 1.10.0
>Reporter: TisonKun
>Priority: Critical
>
> In YARN deployment scenario, YARN RM possibly launches a new AM for the job 
> even if the previous AM does not terminated, for example, when AMRM heartbeat 
> timeout. This is a common case that RM will send a shutdown request to the 
> previous AM and expect the AM shutdown properly.
> However, currently in {{YARNResourceManager}}, we handle this request in 
> {{onShutdownRequest}} which simply close the {{YARNResourceManager}} *but not 
> Dispatcher and JobManagers*. Thus, Dispatcher and JobManager launched in new 
> AM cannot be granted leadership properly. Visually,
> on previous AM: Dispatcher leader, JM leaders
> on new AM: ResourceManager leader
> since on client side or in per-job mode, JobManager address and port are 
> configured as the new AM, the whole cluster goes into an unrecoverable 
> inconsistent status: client all queries the dispatcher on new AM who is now 
> the leader. Briefly, Dispatcher and JobManagers on previous AM do not give up 
> their leadership properly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14093) Java8 lambdas and exceptions lead to compile error

2019-09-19 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-14093:


Assignee: zzsmdfj

> Java8 lambdas and exceptions lead to compile error
> --
>
> Key: FLINK-14093
> URL: https://issues.apache.org/jira/browse/FLINK-14093
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: zzsmdfj
>Assignee: zzsmdfj
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> when compile flink master by using Java 1.8.0_77, got errors as follow:
> {code:java}
> // code placeholder
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute 
> goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:
> compile (default-compile) on project flink-table-api-java: Compilation failure
> /home/*/zzsmdfj/sflink/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/Cal
> culatedTableFactory.java:[90,53] unreported exception X; must be caught or 
> declared to be thrownat 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute 
> (MojoExecutor.java:213)
> at org.apache.maven.lifecycle.internal.MojoExecutor.execute 
> (MojoExecutor.java:154)
> at org.apache.maven.lifecycle.internal.MojoExecutor.execute 
> (MojoExecutor.java:146)
> at 
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject 
> (LifecycleModuleBuilder.java:117)
> at 
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject 
> (LifecycleModuleBuilder.java:81)
> at 
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>  (SingleThreadedBuilder.java:51)
> at org.apache.maven.lifecycle.internal.LifecycleStarter.execute 
> (LifecycleStarter.java:128)
> at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:309)
> at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:194)
> at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:107)
> at org.apache.maven.cli.MavenCli.execute (MavenCli.java:955)
> at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:290)
> at org.apache.maven.cli.MavenCli.main (MavenCli.java:194)
> at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke (Method.java:498)
> at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced 
> (Launcher.java:289)
> at org.codehaus.plexus.classworlds.launcher.Launcher.launch 
> (Launcher.java:229)
> at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode 
> (Launcher.java:415)
> at org.codehaus.plexus.classworlds.launcher.Launcher.main 
> (Launcher.java:356)
> Caused by: org.apache.maven.plugin.compiler.CompilationFailureException: 
> Compilation failure
> {code}
> if using Java 1.8.0_102 to compile, it build success. it maybe a case of bug 
> [JDK-8054569|https://bugs.openjdk.java.net/browse/JDK-8054569], although we 
> can fix this by upgrading the JDK,but i think it is better of compatible with 
> different versions of the JDK8.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14112) Removing zookeeper state should cause the task manager and job managers to restart

2019-09-19 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16933131#comment-16933131
 ] 

TisonKun commented on FLINK-14112:
--

[~trohrmann] is its function duplicated by heartbeat mechanism? If JM is no 
longer the leader it will be notified and close itself. Heartbeat between JM 
and TM must timeout.

Here my concern is that we can get rid of {{null}} handling in leader listener 
while no function breaks.

> Removing zookeeper state should cause the task manager and job managers to 
> restart
> --
>
> Key: FLINK-14112
> URL: https://issues.apache.org/jira/browse/FLINK-14112
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / Coordination
>Affects Versions: 1.8.1, 1.9.0
>Reporter: Aaron Levin
>Priority: Minor
>
> Suppose you have a flink application running on a cluster with the following 
> configuration:
> {noformat}
> high-availability.zookeeper.path.root: /flink
> {noformat}
> Now suppose you delete all the znodes within {{/flink}}. I experienced the 
> following:
>  * massive amount of logging
>  * application did not restart
>  * task manager did not crash or restart
>  * job manager did not crash or restart
> From this state I had to restart all the task managers and all the job 
> managers in order for the flink application to recover.
> It would be desirable for the Task Managers and Job Managers to crash if the 
> znode is not available (though perhaps you all have thought about this more 
> deeply than I!)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14010) Dispatcher & JobManagers don't give up leadership when AM is shut down

2019-09-18 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16933030#comment-16933030
 ] 

TisonKun commented on FLINK-14010:
--

[~trohrmann] Technically I agree that it is a valid solution. Give it another 
look I think we can complete shutdown future exceptionally "ResourceManager got 
closed when DispatcherResourceManagerComponent is running". It infers that the 
application goes into an UNKNOWN state so that the semantic is also correct.

> Dispatcher & JobManagers don't give up leadership when AM is shut down
> --
>
> Key: FLINK-14010
> URL: https://issues.apache.org/jira/browse/FLINK-14010
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.1, 1.9.0, 1.10.0
>Reporter: TisonKun
>Priority: Critical
>
> In YARN deployment scenario, YARN RM possibly launches a new AM for the job 
> even if the previous AM does not terminated, for example, when AMRM heartbeat 
> timeout. This is a common case that RM will send a shutdown request to the 
> previous AM and expect the AM shutdown properly.
> However, currently in {{YARNResourceManager}}, we handle this request in 
> {{onShutdownRequest}} which simply close the {{YARNResourceManager}} *but not 
> Dispatcher and JobManagers*. Thus, Dispatcher and JobManager launched in new 
> AM cannot be granted leadership properly. Visually,
> on previous AM: Dispatcher leader, JM leaders
> on new AM: ResourceManager leader
> since on client side or in per-job mode, JobManager address and port are 
> configured as the new AM, the whole cluster goes into an unrecoverable 
> inconsistent status: client all queries the dispatcher on new AM who is now 
> the leader. Briefly, Dispatcher and JobManagers on previous AM do not give up 
> their leadership properly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14112) Removing zookeeper state should cause the task manager and job managers to restart

2019-09-18 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16933022#comment-16933022
 ] 

TisonKun commented on FLINK-14112:
--

I agree with [~trohrmann]'s comments.

Another question I notice is that for what reason we notify a "null" 
address/session-id? I think the timeout logic can be handled by heartbeats and 
if we enforce the notification always contains valid leader info we can reduce 
noisy & meaningless log also simplify logic in {{LeaderRetrievalListener}}

> Removing zookeeper state should cause the task manager and job managers to 
> restart
> --
>
> Key: FLINK-14112
> URL: https://issues.apache.org/jira/browse/FLINK-14112
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / Coordination
>Affects Versions: 1.8.1, 1.9.0
>Reporter: Aaron Levin
>Priority: Minor
>
> Suppose you have a flink application running on a cluster with the following 
> configuration:
> {noformat}
> high-availability.zookeeper.path.root: /flink
> {noformat}
> Now suppose you delete all the znodes within {{/flink}}. I experienced the 
> following:
>  * massive amount of logging
>  * application did not restart
>  * task manager did not crash or restart
>  * job manager did not crash or restart
> From this state I had to restart all the task managers and all the job 
> managers in order for the flink application to recover.
> It would be desirable for the Task Managers and Job Managers to crash if the 
> znode is not available (though perhaps you all have thought about this more 
> deeply than I!)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14109) Improve javadocs and tests for high-availability backend

2019-09-18 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932959#comment-16932959
 ] 

TisonKun commented on FLINK-14109:
--

[~bremac] aha, nice to hear :-)

> Improve javadocs and tests for high-availability backend
> 
>
> Key: FLINK-14109
> URL: https://issues.apache.org/jira/browse/FLINK-14109
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Brendan MacDonell
>Assignee: Brendan MacDonell
>Priority: Minor
>
> My team at Sight Machine just finished building a custom HA backend for our 
> infrastructure. The process was mostly painless, but we ran into a few pain 
> points on the way:
>  * {{CompletedCheckpointStore#getLatestCheckpoint}} is not marked as 
> {{@Nullable}}, so there was some confusion about whether the Javadocs or 
> (lack of) annotation is correct. The interface would be clearer if the 
> annotation was present.
>  * The javadocs for {{CompletedCheckpointStore#recover}} disagree with the 
> documentation for {{ZooKeeperCompletedCheckpointStore#recover}}. It's not 
> immediately clear to someone working on the code that the ZK javadoc is 
> outdated and the interface documentation is correct.
>  * -The base {{CompletedCheckpointStore}} tests only work with 
> high-availability backends that keep a list of checkpoints in memory. If the 
> backend persists and retrieves data from another source the tests will fail. 
> It's fairly simple to change the tests to lift this requirement though. See 
> [this gist|https://gist.github.com/bremac/1b3365bc0257dfbd33bcd0b7a7627c00] 
> for an example.-
> We've got patches for the points above that we'd be happy to contribute. :)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14113) Remove class JobWithJars

2019-09-18 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932491#comment-16932491
 ] 

TisonKun commented on FLINK-14113:
--

Thanks for your insights! I will create a pull request tomorrow.

> Remove class JobWithJars
> 
>
> Key: FLINK-14113
> URL: https://issues.apache.org/jira/browse/FLINK-14113
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> {{JobWithJars}} is a batch-only concept, acts as a POJO consists of {{Plan}} 
> and {{URL}}s of libs. We can
> 1. inline the usage of {{Plan}} and {{URL}}s as we do in streaming case.
> 2. extract static methods into a utility class said {{ClientUtils}}.
> The main purpose here is towards no batch specific concept that doesn't bring 
> too much good.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14010) Dispatcher & JobManagers don't give up leadership when AM is shut down

2019-09-18 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932423#comment-16932423
 ] 

TisonKun commented on FLINK-14010:
--

I have thought of this. The problem is that when the situation described here 
happens, we actually complete {{ResourceManager#getTerminationFuture}} 
normally, which cannot be sourced that it comes from 
{{YarnResourceManager#onShutdownRequest}}.

If we achieve the function by using {{ResourceManager#getTerminationFuture}} to 
trigger the shut down of the {{DispatcherResourceManagerComponent}}, the 
assumption is:

If ResourceManager is closed first(since termination future completes normally 
in both cases, we cannot distinguish by {{whenComplete}}), it infers an 
exceptionally status so that we should complete 
{{DispatcherResourceManagerComponent#getShutDownFuture}} exceptionally. 
Otherwise ResourceManager closes normally by other triggers, and the either 
{{DispatcherResourceManagerComponent#getShutDownFuture}} is already completed 
or {{ClusterEntrypoint#shutdownAsync}} is guarded to be executed once.

I think this assumption is counter-intuitive that ResourceManager terminates 
"normally" but we complete shutdownFuture exceptionally.

> Dispatcher & JobManagers don't give up leadership when AM is shut down
> --
>
> Key: FLINK-14010
> URL: https://issues.apache.org/jira/browse/FLINK-14010
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.1, 1.9.0, 1.10.0
>Reporter: TisonKun
>Priority: Critical
>
> In YARN deployment scenario, YARN RM possibly launches a new AM for the job 
> even if the previous AM does not terminated, for example, when AMRM heartbeat 
> timeout. This is a common case that RM will send a shutdown request to the 
> previous AM and expect the AM shutdown properly.
> However, currently in {{YARNResourceManager}}, we handle this request in 
> {{onShutdownRequest}} which simply close the {{YARNResourceManager}} *but not 
> Dispatcher and JobManagers*. Thus, Dispatcher and JobManager launched in new 
> AM cannot be granted leadership properly. Visually,
> on previous AM: Dispatcher leader, JM leaders
> on new AM: ResourceManager leader
> since on client side or in per-job mode, JobManager address and port are 
> configured as the new AM, the whole cluster goes into an unrecoverable 
> inconsistent status: client all queries the dispatcher on new AM who is now 
> the leader. Briefly, Dispatcher and JobManagers on previous AM do not give up 
> their leadership properly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14010) Dispatcher & JobManagers don't give up leadership when AM is shut down

2019-09-18 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932423#comment-16932423
 ] 

TisonKun edited comment on FLINK-14010 at 9/18/19 1:06 PM:
---

I have thought of this. The problem is that when the situation described here 
happens, we actually complete {{ResourceManager#getTerminationFuture}} 
normally, which cannot be sourced that it comes from 
{{YarnResourceManager#onShutdownRequest}}.

If we achieve the function by using {{ResourceManager#getTerminationFuture}} to 
trigger the shut down of the {{DispatcherResourceManagerComponent}}, the 
assumption is:

If ResourceManager is closed first(since termination future completes normally 
in both cases, we cannot distinguish by {{whenComplete}}), it infers an 
exceptionally status so that we should complete 
{{DispatcherResourceManagerComponent#getShutDownFuture}} exceptionally. 
Otherwise ResourceManager closes normally by other triggers, and then either 
{{DispatcherResourceManagerComponent#getShutDownFuture}} is already completed 
or {{ClusterEntrypoint#shutdownAsync}} is guarded to be executed once.

I think this assumption is counter-intuitive that ResourceManager terminates 
"normally" but we complete shutdownFuture exceptionally.


was (Author: tison):
I have thought of this. The problem is that when the situation described here 
happens, we actually complete {{ResourceManager#getTerminationFuture}} 
normally, which cannot be sourced that it comes from 
{{YarnResourceManager#onShutdownRequest}}.

If we achieve the function by using {{ResourceManager#getTerminationFuture}} to 
trigger the shut down of the {{DispatcherResourceManagerComponent}}, the 
assumption is:

If ResourceManager is closed first(since termination future completes normally 
in both cases, we cannot distinguish by {{whenComplete}}), it infers an 
exceptionally status so that we should complete 
{{DispatcherResourceManagerComponent#getShutDownFuture}} exceptionally. 
Otherwise ResourceManager closes normally by other triggers, and the either 
{{DispatcherResourceManagerComponent#getShutDownFuture}} is already completed 
or {{ClusterEntrypoint#shutdownAsync}} is guarded to be executed once.

I think this assumption is counter-intuitive that ResourceManager terminates 
"normally" but we complete shutdownFuture exceptionally.

> Dispatcher & JobManagers don't give up leadership when AM is shut down
> --
>
> Key: FLINK-14010
> URL: https://issues.apache.org/jira/browse/FLINK-14010
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.1, 1.9.0, 1.10.0
>Reporter: TisonKun
>Priority: Critical
>
> In YARN deployment scenario, YARN RM possibly launches a new AM for the job 
> even if the previous AM does not terminated, for example, when AMRM heartbeat 
> timeout. This is a common case that RM will send a shutdown request to the 
> previous AM and expect the AM shutdown properly.
> However, currently in {{YARNResourceManager}}, we handle this request in 
> {{onShutdownRequest}} which simply close the {{YARNResourceManager}} *but not 
> Dispatcher and JobManagers*. Thus, Dispatcher and JobManager launched in new 
> AM cannot be granted leadership properly. Visually,
> on previous AM: Dispatcher leader, JM leaders
> on new AM: ResourceManager leader
> since on client side or in per-job mode, JobManager address and port are 
> configured as the new AM, the whole cluster goes into an unrecoverable 
> inconsistent status: client all queries the dispatcher on new AM who is now 
> the leader. Briefly, Dispatcher and JobManagers on previous AM do not give up 
> their leadership properly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14114) Shift down ClusterClient#timeout to RestClusterClient

2019-09-18 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932275#comment-16932275
 ] 

TisonKun commented on FLINK-14114:
--

I've assigned the issue to you [~zhuzh] :-)

> Shift down ClusterClient#timeout to RestClusterClient
> -
>
> Key: FLINK-14114
> URL: https://issues.apache.org/jira/browse/FLINK-14114
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: TisonKun
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> {{ClusterClient#timeout}} is only used in {{RestClusterClient}}, even without 
> this prerequisite we can always shift down {{timeout}} field to subclasses of 
> {{ClusterClient}}. It is towards an interface-ized {{ClusterClient}}. By side 
> effect, we could reduce the dependency to parsing duration with Scala 
> Duration on the fly.
> CC [~till.rohrmann] [~zhuzh]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14114) Shift down ClusterClient#timeout to RestClusterClient

2019-09-18 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-14114:


Assignee: Zhu Zhu

> Shift down ClusterClient#timeout to RestClusterClient
> -
>
> Key: FLINK-14114
> URL: https://issues.apache.org/jira/browse/FLINK-14114
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: TisonKun
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> {{ClusterClient#timeout}} is only used in {{RestClusterClient}}, even without 
> this prerequisite we can always shift down {{timeout}} field to subclasses of 
> {{ClusterClient}}. It is towards an interface-ized {{ClusterClient}}. By side 
> effect, we could reduce the dependency to parsing duration with Scala 
> Duration on the fly.
> CC [~till.rohrmann] [~zhuzh]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14114) Shift down ClusterClient#timeout to RestClusterClient

2019-09-18 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932256#comment-16932256
 ] 

TisonKun commented on FLINK-14114:
--

Hi [~zhuzh] if you'd like to work on this coordinate with FLINK-14070 I'm glad 
to assign this to you.

[~Zentol] [~kkl0u] I'm not sure if this breaks user cases that programming 
directly to {{ClusterClient}}...Anyway if we towards an interface-ized 
{{ClusterClient}} there should be no fields, but maybe we can do that all at 
once to prevent multiple bump from those users.

> Shift down ClusterClient#timeout to RestClusterClient
> -
>
> Key: FLINK-14114
> URL: https://issues.apache.org/jira/browse/FLINK-14114
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> {{ClusterClient#timeout}} is only used in {{RestClusterClient}}, even without 
> this prerequisite we can always shift down {{timeout}} field to subclasses of 
> {{ClusterClient}}. It is towards an interface-ized {{ClusterClient}}. By side 
> effect, we could reduce the dependency to parsing duration with Scala 
> Duration on the fly.
> CC [~till.rohrmann] [~zhuzh]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14114) Shift down ClusterClient#timeout to RestClusterClient

2019-09-18 Thread TisonKun (Jira)
TisonKun created FLINK-14114:


 Summary: Shift down ClusterClient#timeout to RestClusterClient
 Key: FLINK-14114
 URL: https://issues.apache.org/jira/browse/FLINK-14114
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission
Reporter: TisonKun
 Fix For: 1.10.0


{{ClusterClient#timeout}} is only used in {{RestClusterClient}}, even without 
this prerequisite we can always shift down {{timeout}} field to subclasses of 
{{ClusterClient}}. It is towards an interface-ized {{ClusterClient}}. By side 
effect, we could reduce the dependency to parsing duration with Scala Duration 
on the fly.

CC [~till.rohrmann] [~zhuzh]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14070) Use TimeUtils to parse duration configs

2019-09-18 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932133#comment-16932133
 ] 

TisonKun commented on FLINK-14070:
--

Hi [~zhuzh] did you start working on this thread? I'd like to create a JIRA 
about shift down {{ClusterClient#timeout}} to {{RestClusterClient}}, which 
might reduce the usage of parsing duration configs by Scala {{Duration}}. 
Generally it is a separated task but it likely has some conflict with this one. 
So I think it is better to reach you out first to see if you prefer

1. concurrently start these 2 thread and resolve possible conflict(if it 
occurs, it should be nit to resolve).
2. sequentially start working on 2 thread.

Alternatively, after create the task described above and if you are interested 
in working it as well, I can assign that ticket to you and you are the 
coordinator.

> Use TimeUtils to parse duration configs
> ---
>
> Key: FLINK-14070
> URL: https://issues.apache.org/jira/browse/FLINK-14070
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> FLINK-14069 makes TimeUtils able to parse all time unit labels supported by 
> scala Duration.
> We can now use TimeUtils to parse duration configs instead of using scala 
> Duration.
> Some config descriptors referring scala FiniteDuration should be updated as 
> well.
> This is one step for Flink core to get rid of scala dependencies.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14093) Java8 lambdas and exceptions lead to compile error

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932061#comment-16932061
 ] 

TisonKun commented on FLINK-14093:
--

[~zhaoshijie] We cannot just introduce the workaround without any context. 
Following contributor is likely to revert your commit regarding as improvement. 
For whether or not to tolerate JDK bugs, I think you'd better start a 
discussion thread on dev mailing list. We need to reach a consensus before 
start any code working.

> Java8 lambdas and exceptions lead to compile error
> --
>
> Key: FLINK-14093
> URL: https://issues.apache.org/jira/browse/FLINK-14093
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: zzsmdfj
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> when compile flink master by using Java 1.8.0_77, got errors as follow:
> {code:java}
> // code placeholder
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute 
> goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:
> compile (default-compile) on project flink-table-api-java: Compilation failure
> /home/*/zzsmdfj/sflink/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/Cal
> culatedTableFactory.java:[90,53] unreported exception X; must be caught or 
> declared to be thrownat 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute 
> (MojoExecutor.java:213)
> at org.apache.maven.lifecycle.internal.MojoExecutor.execute 
> (MojoExecutor.java:154)
> at org.apache.maven.lifecycle.internal.MojoExecutor.execute 
> (MojoExecutor.java:146)
> at 
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject 
> (LifecycleModuleBuilder.java:117)
> at 
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject 
> (LifecycleModuleBuilder.java:81)
> at 
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
>  (SingleThreadedBuilder.java:51)
> at org.apache.maven.lifecycle.internal.LifecycleStarter.execute 
> (LifecycleStarter.java:128)
> at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:309)
> at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:194)
> at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:107)
> at org.apache.maven.cli.MavenCli.execute (MavenCli.java:955)
> at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:290)
> at org.apache.maven.cli.MavenCli.main (MavenCli.java:194)
> at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke 
> (NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke 
> (DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke (Method.java:498)
> at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced 
> (Launcher.java:289)
> at org.codehaus.plexus.classworlds.launcher.Launcher.launch 
> (Launcher.java:229)
> at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode 
> (Launcher.java:415)
> at org.codehaus.plexus.classworlds.launcher.Launcher.main 
> (Launcher.java:356)
> Caused by: org.apache.maven.plugin.compiler.CompilationFailureException: 
> Compilation failure
> {code}
> if using Java 1.8.0_102 to compile, it build success. it maybe a case of bug 
> [JDK-8054569|https://bugs.openjdk.java.net/browse/JDK-8054569], although we 
> can fix this by upgrading the JDK,but i think it is better of compatible with 
> different versions of the JDK8.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14111) Flink should be robust to a non-leader Zookeeper host going down

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932059#comment-16932059
 ] 

TisonKun commented on FLINK-14111:
--

Yes I think it is a duplication of FLINK-10052. Could you check it 
[~aaronlevin]?

> Flink should be robust to a non-leader Zookeeper host going down
> 
>
> Key: FLINK-14111
> URL: https://issues.apache.org/jira/browse/FLINK-14111
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.0, 1.8.1, 1.9.0
> Environment: Linux
> JVM 8
> Flink {{1.7.2}}, {{1.8.1}}, {{1.9.0}}
> {{Zookeeper version 3.4.5}}
>Reporter: Aaron Levin
>Priority: Major
>
> I noticed that if a non-leader Zookeeper node goes down and there is still 
> quorom in the zookeeper cluster , my flink application will restart anyway. I 
> believe it should be possible for Flink applications not to require a restart 
> in this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14112) Removing zookeeper state should cause the task manager and job managers to restart

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932058#comment-16932058
 ] 

TisonKun edited comment on FLINK-14112 at 9/18/19 4:56 AM:
---

Hi [~aaronlevin] thanks for creating this JIRA. Generally I think Flink owns 
its znodes and the prerequisite here "delete all the znodes within {{/flink}}" 
should not happen.

However, I can see your concern and ask you for the "massive amount of logging" 
to see what we can improve in log scope.


was (Author: tison):
Hi [~aaronlevin] thanks for creating this JIRA. Generally I think Flink owns 
its znodes and the prerequisite here "delete all the znodes within {{/flink}}" 
should not happen.

However, I can see your concern and ask you for the "massive amount of logging" 
to see what we can improve in log scope. Besides, I agree that JM and TM are 
nice to crash if ZK is under an uncertain state.

> Removing zookeeper state should cause the task manager and job managers to 
> restart
> --
>
> Key: FLINK-14112
> URL: https://issues.apache.org/jira/browse/FLINK-14112
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / Coordination
>Affects Versions: 1.8.1, 1.9.0
>Reporter: Aaron Levin
>Priority: Minor
>
> Suppose you have a flink application running on a cluster with the following 
> configuration:
> {noformat}
> high-availability.zookeeper.path.root: /flink
> {noformat}
> Now suppose you delete all the znodes within {{/flink}}. I experienced the 
> following:
>  * massive amount of logging
>  * application did not restart
>  * task manager did not crash or restart
>  * job manager did not crash or restart
> From this state I had to restart all the task managers and all the job 
> managers in order for the flink application to recover.
> It would be desirable for the Task Managers and Job Managers to crash if the 
> znode is not available (though perhaps you all have thought about this more 
> deeply than I!)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14112) Removing zookeeper state should cause the task manager and job managers to restart

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932058#comment-16932058
 ] 

TisonKun commented on FLINK-14112:
--

Hi [~aaronlevin] thanks for creating this JIRA. Generally I think Flink owns 
its znodes and the prerequisite here "delete all the znodes within {{/flink}}" 
should not happen.

However, I can see your concern and ask you for the "massive amount of logging" 
to see what we can improve in log scope. Besides, I agree that JM and TM are 
nice to crash if ZK is under an uncertain state.

> Removing zookeeper state should cause the task manager and job managers to 
> restart
> --
>
> Key: FLINK-14112
> URL: https://issues.apache.org/jira/browse/FLINK-14112
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / Coordination
>Affects Versions: 1.8.1, 1.9.0
>Reporter: Aaron Levin
>Priority: Minor
>
> Suppose you have a flink application running on a cluster with the following 
> configuration:
> {noformat}
> high-availability.zookeeper.path.root: /flink
> {noformat}
> Now suppose you delete all the znodes within {{/flink}}. I experienced the 
> following:
>  * massive amount of logging
>  * application did not restart
>  * task manager did not crash or restart
>  * job manager did not crash or restart
> From this state I had to restart all the task managers and all the job 
> managers in order for the flink application to recover.
> It would be desirable for the Task Managers and Job Managers to crash if the 
> znode is not available (though perhaps you all have thought about this more 
> deeply than I!)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14109) Improve javadocs and tests for high-availability backend

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932053#comment-16932053
 ] 

TisonKun commented on FLINK-14109:
--

Hi [~bremac] thanks for reporting this!

There are two topics under this issue from my perspective.

1. Correct & enrich document.
2. Modify tests.

For 1, I it looks nice to have to me. For 2, maybe it's worth a separated 
thread. So a suggestion is narrowing this issue to focus on 1 and starting a 
separated issue focus on 2.

Besides, the community started a survey weeks ago on the topic "How do you use 
high-availability services in Flink?"[1]. It is recommend to you sharing your 
customization experience so that the ongoing refactor to high-availability 
service[2] would take your use case into consideration :-)

[1] 
https://lists.apache.org/x/thread.html/c0cc07197e6ba30b45d7709cc9e17d8497e5e3f33de504d58dfcafad@%3Cuser.flink.apache.org%3E
[2] https://issues.apache.org/jira/browse/FLINK-10333

> Improve javadocs and tests for high-availability backend
> 
>
> Key: FLINK-14109
> URL: https://issues.apache.org/jira/browse/FLINK-14109
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Brendan MacDonell
>Priority: Minor
>
> My team at Sight Machine just finished building a custom HA backend for our 
> infrastructure. The process was mostly painless, but we ran into a few pain 
> points on the way:
>  * {{CompletedCheckpointStore#getLatestCheckpoint}} is not marked as 
> \{{@Nullable}}, so there was some confusion about whether the Javadocs or 
> (lack of) annotation is correct. The interface would be clearer if the 
> annotation was present.
>  * The javadocs for {{CompletedCheckpointStore#recover}} disagree with the 
> documentation for {{ZooKeeperCompletedCheckpointStore#recover}}. It's not 
> immediately clear to someone working on the code that the ZK javadoc is 
> outdated and the interface documentation is correct.
>  * The base {{CompletedCheckpointStore}} tests only work with 
> high-availability backends that keep a list of checkpoints in memory. If the 
> backend persists and retrieves data from another source the tests will fail. 
> It's fairly simple to change the tests to lift this requirement though. See 
> [this gist|https://gist.github.com/bremac/1b3365bc0257dfbd33bcd0b7a7627c00] 
> for an example.
> We've got patches for the points above that we'd be happy to contribute. :)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14096) Merge NewClusterClient into ClusterClient

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931962#comment-16931962
 ] 

TisonKun commented on FLINK-14096:
--

FYI 
https://lists.apache.org/x/thread.html/e6536ad385ae1fe41006309dfb3e808c12195aeb8d425c52c86d4ce6@%3Cdev.flink.apache.org%3E

> Merge NewClusterClient into ClusterClient
> -
>
> Key: FLINK-14096
> URL: https://issues.apache.org/jira/browse/FLINK-14096
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> With the effort under FLINK-10392 we don't need the bridge class 
> {{NewClusterClient}} any more. We can just merge {{NewClusterClient}} into 
> {{ClusterClient}} towards an interface-ized {{ClusterClient}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14113) Remove class JobWithJars

2019-09-17 Thread TisonKun (Jira)
TisonKun created FLINK-14113:


 Summary: Remove class JobWithJars
 Key: FLINK-14113
 URL: https://issues.apache.org/jira/browse/FLINK-14113
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission
Reporter: TisonKun
 Fix For: 1.10.0


{{JobWithJars}} is a batch-only concept, acts as a POJO consists of {{Plan}} 
and {{URL}}s of libs. We can

1. inline the usage of {{Plan}} and {{URL}}s as we do in streaming case.
2. extract static methods into a utility class said {{ClientUtils}}.

The main purpose here is towards no batch specific concept that doesn't bring 
too much good.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14010) Dispatcher & JobManagers don't give up leadership when AM is shut down

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931490#comment-16931490
 ] 

TisonKun commented on FLINK-14010:
--

Well, it's reasonable we try to gracefully shut down. I start to work on it but 
I'm not sure about what the future should look like.

There are two options in my mind, both of which introduce a {{shutdownFuture}} 
in {{ResourceManager}}.

1. {{ResourceManager#shutdownFuture}} is completed on 
{{YarnResourceManager#onShutdownRequest}} gets called. And we register callback 
in {{DispatcherResourceManagerComponent#registerShutDownFuture}}, when 
{{ResourceManager#shutdownFuture}} complete, we complete 
{{DispatcherResourceManagerComponent#shutDownFuture}} exceptionally. Concern 
here is that {{ResourceManager#shutdownFuture}} is never completed if 
{{YarnResourceManager#onShutdownRequest}} never gets called. I'm not sure if it 
is well.

2. {{ResourceManager#shutdownFuture}} is completed normally on 
{{ResourceManager#stopResourceManagerServices}} gets called, while completed 
exceptionally on {{YarnResourceManager#onShutdownRequest}} gets called. Also we 
register callback in 
{{DispatcherResourceManagerComponent#registerShutDownFuture}}, when 
{{ResourceManager#shutdownFuture}} complete exceptionally, we complete 
{{DispatcherResourceManagerComponent#shutDownFuture}} exceptionally; when when 
{{ResourceManager#shutdownFuture}} complete normally we do nothing. It might be 
a bit more complex than 1 and we should ensure that codepaths 
{{ResourceManager}} exit are all covered.

WDYT [~till.rohrmann]?

> Dispatcher & JobManagers don't give up leadership when AM is shut down
> --
>
> Key: FLINK-14010
> URL: https://issues.apache.org/jira/browse/FLINK-14010
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.1, 1.9.0, 1.10.0
>Reporter: TisonKun
>Priority: Critical
>
> In YARN deployment scenario, YARN RM possibly launches a new AM for the job 
> even if the previous AM does not terminated, for example, when AMRM heartbeat 
> timeout. This is a common case that RM will send a shutdown request to the 
> previous AM and expect the AM shutdown properly.
> However, currently in {{YARNResourceManager}}, we handle this request in 
> {{onShutdownRequest}} which simply close the {{YARNResourceManager}} *but not 
> Dispatcher and JobManagers*. Thus, Dispatcher and JobManager launched in new 
> AM cannot be granted leadership properly. Visually,
> on previous AM: Dispatcher leader, JM leaders
> on new AM: ResourceManager leader
> since on client side or in per-job mode, JobManager address and port are 
> configured as the new AM, the whole cluster goes into an unrecoverable 
> inconsistent status: client all queries the dispatcher on new AM who is now 
> the leader. Briefly, Dispatcher and JobManagers on previous AM do not give up 
> their leadership properly.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Closed] (FLINK-14051) Deploy job cluster in attached mode

2019-09-17 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun closed FLINK-14051.

Fix Version/s: (was: 1.10.0)
   Resolution: Later

Follow the discussion it seems that we cannot resolve this issue before a 
refactor to {{ExecutionEnvironment}}. Revisit this topic when there is further 
progress on client API enhancement thread.

> Deploy job cluster in attached mode
> ---
>
> Key: FLINK-14051
> URL: https://issues.apache.org/jira/browse/FLINK-14051
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission, Command Line Client
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Priority: Major
>
> While working on FLINK-14048 I revisit the problem we handle deploy logic in 
> a complicated if-else branches in {{CliFrontend#runProgram}}. Previously we 
> said even in per-job mode and attached we deploy a session cluster for 
> historical reasons.
> However, I notice that {{#deployJobCluster}} has a parameter {{boolean 
> detached}}. Also it is used in sql-client package. So it looks like we can 
> deploy job cluster in attached mode as we do in sql-client package.
> However, as [~xccui] answered on mailing list 
> [here|https://lists.apache.org/x/thread.html/5464459db08f2a756af0c61eb02d34a26f04c27c62140886cad52731@%3Cuser.flink.apache.org%3E],
>  we support only standalone session cluster for sql-client. So maybe it is 
> not our case. Anyway, if we cannot deploy job cluster in attached mode, I'd 
> like to know the concrete reason.
> CC [~till.rohrmann] [~twalthr]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14051) Deploy job cluster in attached mode

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931426#comment-16931426
 ] 

TisonKun commented on FLINK-14051:
--

> If the execution environment could also deploy a cluster (in case of the 
>per-job mode) when {{ExecutionEnvironment#execute}} is being called, then the 
>per-job mode could work well with multi parts jobs. It would simply deploy for 
>every part a dedicated per-job mode cluster.

Yes this is the solution in my mind.

> Deploy job cluster in attached mode
> ---
>
> Key: FLINK-14051
> URL: https://issues.apache.org/jira/browse/FLINK-14051
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission, Command Line Client
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> While working on FLINK-14048 I revisit the problem we handle deploy logic in 
> a complicated if-else branches in {{CliFrontend#runProgram}}. Previously we 
> said even in per-job mode and attached we deploy a session cluster for 
> historical reasons.
> However, I notice that {{#deployJobCluster}} has a parameter {{boolean 
> detached}}. Also it is used in sql-client package. So it looks like we can 
> deploy job cluster in attached mode as we do in sql-client package.
> However, as [~xccui] answered on mailing list 
> [here|https://lists.apache.org/x/thread.html/5464459db08f2a756af0c61eb02d34a26f04c27c62140886cad52731@%3Cuser.flink.apache.org%3E],
>  we support only standalone session cluster for sql-client. So maybe it is 
> not our case. Anyway, if we cannot deploy job cluster in attached mode, I'd 
> like to know the concrete reason.
> CC [~till.rohrmann] [~twalthr]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13417) Bump Zookeeper to 3.5.5

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931424#comment-16931424
 ] 

TisonKun commented on FLINK-13417:
--

[~till.rohrmann] thanks for your advice. For the tools issue I believe I fully 
built Flink before running the tool but it generates incorrect NOTICE-binary. 
Anyway, I will create a separated issue with details when I meet it next time.

For the step "replace all Flink dependencies on ZooKeeper with this one", I'd 
like to know whether there is a good way to list out all Flink dependencies on 
ZooKeeper.

> Bump Zookeeper to 3.5.5
> ---
>
> Key: FLINK-13417
> URL: https://issues.apache.org/jira/browse/FLINK-13417
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Konstantin Knauf
>Priority: Major
> Fix For: 1.10.0
>
>
> User might want to secure their Zookeeper connection via SSL.
> This requires a Zookeeper version >= 3.5.1. We might as well try to bump it 
> to 3.5.5, which is the latest version. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14070) Use TimeUtils to parse duration configs

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931413#comment-16931413
 ] 

TisonKun commented on FLINK-14070:
--

Thanks for your confirmation [~till.rohrmann]. FYI FLINK-14105.

[~zhuzh] I'd like to leave the right to you that whether change this issue and 
FLINK-14069 to subtasks of FLINK-14105 or not.

> Use TimeUtils to parse duration configs
> ---
>
> Key: FLINK-14070
> URL: https://issues.apache.org/jira/browse/FLINK-14070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> FLINK-14069 makes TimeUtils able to parse all time unit labels supported by 
> scala Duration.
> We can now use TimeUtils to parse duration configs instead of using scala 
> Duration.
> Some config descriptors referring scala FiniteDuration should be updated as 
> well.
> This is one step for Flink core to get rid of scala dependencies.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14105) Make flink-runtime scala-free

2019-09-17 Thread TisonKun (Jira)
TisonKun created FLINK-14105:


 Summary: Make flink-runtime scala-free
 Key: FLINK-14105
 URL: https://issues.apache.org/jira/browse/FLINK-14105
 Project: Flink
  Issue Type: Task
  Components: Runtime / Configuration, Runtime / Coordination
Reporter: TisonKun


As the consensus among our community(please link dedicated thread if there is) 
we keep in mind that {{flink-runtime}} will be eventually scala-free. It is 
because of maintenance concerns, release concerns and so on.

This is an umbrella issue to track all efforts that towards a scala-free 
{{flink-runtime}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (FLINK-14096) Merge NewClusterClient into ClusterClient

2019-09-17 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-14096:


Assignee: (was: TisonKun)

> Merge NewClusterClient into ClusterClient
> -
>
> Key: FLINK-14096
> URL: https://issues.apache.org/jira/browse/FLINK-14096
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> With the effort under FLINK-10392 we don't need the bridge class 
> {{NewClusterClient}} any more. We can just merge {{NewClusterClient}} into 
> {{ClusterClient}} towards an interface-ized {{ClusterClient}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14096) Merge NewClusterClient into ClusterClient

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931400#comment-16931400
 ] 

TisonKun commented on FLINK-14096:
--

Thanks for your attention [~kkl0u]! I will start a discussion thread tomorrow.

> Merge NewClusterClient into ClusterClient
> -
>
> Key: FLINK-14096
> URL: https://issues.apache.org/jira/browse/FLINK-14096
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> With the effort under FLINK-10392 we don't need the bridge class 
> {{NewClusterClient}} any more. We can just merge {{NewClusterClient}} into 
> {{ClusterClient}} towards an interface-ized {{ClusterClient}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14096) Merge NewClusterClient into ClusterClient

2019-09-17 Thread TisonKun (Jira)
TisonKun created FLINK-14096:


 Summary: Merge NewClusterClient into ClusterClient
 Key: FLINK-14096
 URL: https://issues.apache.org/jira/browse/FLINK-14096
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission
Reporter: TisonKun
 Fix For: 1.10.0


With the effort under FLINK-10392 we don't need the bridge class 
{{NewClusterClient}} any more. We can just merge {{NewClusterClient}} into 
{{ClusterClient}} towards an interface-ized {{ClusterClient}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (FLINK-14096) Merge NewClusterClient into ClusterClient

2019-09-17 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-14096:


Assignee: TisonKun

> Merge NewClusterClient into ClusterClient
> -
>
> Key: FLINK-14096
> URL: https://issues.apache.org/jira/browse/FLINK-14096
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> With the effort under FLINK-10392 we don't need the bridge class 
> {{NewClusterClient}} any more. We can just merge {{NewClusterClient}} into 
> {{ClusterClient}} towards an interface-ized {{ClusterClient}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (FLINK-14050) Refactor YarnClusterDescriptor inheritance

2019-09-17 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-14050:


Assignee: TisonKun

> Refactor YarnClusterDescriptor inheritance
> --
>
> Key: FLINK-14050
> URL: https://issues.apache.org/jira/browse/FLINK-14050
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission, Command Line Client
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, the inheritance looks like
> {{AbstractYarnClusterDescriptor}}
> -> {{YarnClusterDescriptor}}
> -> {{TestingYarnClusterDescriptor}}
> -> {{NonDeployingYarnClusterDescriptor}}
> ->-> {{NonDeployingDetachedYarnClusterDescriptor}}
> With an investigation, I find
> 1. {{AbstractYarnClusterDescriptor}} is introduced for migration purpose and 
> no need any more.
> 2. {{TestingYarnClusterDescriptor}} is redundant and can be replaced directly 
> with {{YarnClusterDescriptor}}.
> 3. Some methods like {{#createYarnClusterClient}} have parameters that never 
> used, which are for historical reasons.
> Thus, I propose we refactor {{YarnClusterDescriptor}} inheritance 
> {{YarnClusterDescriptor}}
> -> {{NonDeployingYarnClusterDescriptor}}
> ->-> {{NonDeployingDetachedYarnClusterDescriptor}}
> and also methods remove unused parameters.
> CC [~kkl0u] [~aljoscha] [~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14010) Dispatcher & JobManagers don't give up leadership when AM is shut down

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931287#comment-16931287
 ] 

TisonKun commented on FLINK-14010:
--

[~till.rohrmann] Thanks to your explanation, I learn where the components 
layout comes from.

Back to this issue, what if we call {{#onFatalError}} in 
{{YarnResourceManager#onShutdownRequest}}? {{#onShutdownRequest}} is only 
called when AM exceptionally switched and we can regard it as a fatal error. 
For implementation details, it calls {{System.exit}} that correctly shutdown 
the AM and release leadership.

> Dispatcher & JobManagers don't give up leadership when AM is shut down
> --
>
> Key: FLINK-14010
> URL: https://issues.apache.org/jira/browse/FLINK-14010
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.1, 1.9.0, 1.10.0
>Reporter: TisonKun
>Priority: Critical
>
> In YARN deployment scenario, YARN RM possibly launches a new AM for the job 
> even if the previous AM does not terminated, for example, when AMRM heartbeat 
> timeout. This is a common case that RM will send a shutdown request to the 
> previous AM and expect the AM shutdown properly.
> However, currently in {{YARNResourceManager}}, we handle this request in 
> {{onShutdownRequest}} which simply close the {{YARNResourceManager}} *but not 
> Dispatcher and JobManagers*. Thus, Dispatcher and JobManager launched in new 
> AM cannot be granted leadership properly. Visually,
> on previous AM: Dispatcher leader, JM leaders
> on new AM: ResourceManager leader
> since on client side or in per-job mode, JobManager address and port are 
> configured as the new AM, the whole cluster goes into an unrecoverable 
> inconsistent status: client all queries the dispatcher on new AM who is now 
> the leader. Briefly, Dispatcher and JobManagers on previous AM do not give up 
> their leadership properly.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14051) Deploy job cluster in attached mode

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931275#comment-16931275
 ] 

TisonKun commented on FLINK-14051:
--

Thanks for your explanation [~till.rohrmann].

I think the problem is the approach we compile Flink job. In per-job(detached) 
mode we use {{OptimizerPlanEnvironment}} which abort execution and set the 
FlinkPlan to be executed. Thus when the job consists of multiple parts it 
always surprises users that the following parts are never executed. We deploy a 
session cluster in attached mode to workaround this a bit.

Maybe standard what per-job stands for could help. In my opinion per-job is 
like Driver mode in Spark scope that we just submit jars represent user job to 
the resource management cluster(YARN, Mesos, k8s) and a ClusterEntrypoint is 
launched and then compile to get the JobGraph and start to execute it. With 
this definition we survive from the implementation details for compiling 
JobGraph in client side.

> Deploy job cluster in attached mode
> ---
>
> Key: FLINK-14051
> URL: https://issues.apache.org/jira/browse/FLINK-14051
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission, Command Line Client
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> While working on FLINK-14048 I revisit the problem we handle deploy logic in 
> a complicated if-else branches in {{CliFrontend#runProgram}}. Previously we 
> said even in per-job mode and attached we deploy a session cluster for 
> historical reasons.
> However, I notice that {{#deployJobCluster}} has a parameter {{boolean 
> detached}}. Also it is used in sql-client package. So it looks like we can 
> deploy job cluster in attached mode as we do in sql-client package.
> However, as [~xccui] answered on mailing list 
> [here|https://lists.apache.org/x/thread.html/5464459db08f2a756af0c61eb02d34a26f04c27c62140886cad52731@%3Cuser.flink.apache.org%3E],
>  we support only standalone session cluster for sql-client. So maybe it is 
> not our case. Anyway, if we cannot deploy job cluster in attached mode, I'd 
> like to know the concrete reason.
> CC [~till.rohrmann] [~twalthr]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14070) Use TimeUtils to parse duration configs

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931169#comment-16931169
 ] 

TisonKun commented on FLINK-14070:
--

An instance is {{AkkaUtils.getClientTimeout}} and its use points such as that 
in {{ClusterClient}}. Please track it under this effort.

[~till.rohrmann] BTW, is it helpful that we create an umbrella issue aimed at a 
scala-free {{flink-runtime}}?

> Use TimeUtils to parse duration configs
> ---
>
> Key: FLINK-14070
> URL: https://issues.apache.org/jira/browse/FLINK-14070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> FLINK-14069 makes TimeUtils able to parse all time unit labels supported by 
> scala Duration.
> We can now use TimeUtils to parse duration configs instead of using scala 
> Duration.
> Some config descriptors referring scala FiniteDuration should be updated as 
> well.
> This is one step for Flink core to get rid of scala dependencies.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14050) Refactor YarnClusterDescriptor inheritance

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931142#comment-16931142
 ] 

TisonKun commented on FLINK-14050:
--

I will start progress in hours if there is no other concerns here.

> Refactor YarnClusterDescriptor inheritance
> --
>
> Key: FLINK-14050
> URL: https://issues.apache.org/jira/browse/FLINK-14050
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission, Command Line Client
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> Currently, the inheritance looks like
> {{AbstractYarnClusterDescriptor}}
> -> {{YarnClusterDescriptor}}
> -> {{TestingYarnClusterDescriptor}}
> -> {{NonDeployingYarnClusterDescriptor}}
> ->-> {{NonDeployingDetachedYarnClusterDescriptor}}
> With an investigation, I find
> 1. {{AbstractYarnClusterDescriptor}} is introduced for migration purpose and 
> no need any more.
> 2. {{TestingYarnClusterDescriptor}} is redundant and can be replaced directly 
> with {{YarnClusterDescriptor}}.
> 3. Some methods like {{#createYarnClusterClient}} have parameters that never 
> used, which are for historical reasons.
> Thus, I propose we refactor {{YarnClusterDescriptor}} inheritance 
> {{YarnClusterDescriptor}}
> -> {{NonDeployingYarnClusterDescriptor}}
> ->-> {{NonDeployingDetachedYarnClusterDescriptor}}
> and also methods remove unused parameters.
> CC [~kkl0u] [~aljoscha] [~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (FLINK-13417) Bump Zookeeper to 3.5.5

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931131#comment-16931131
 ] 

TisonKun edited comment on FLINK-13417 at 9/17/19 6:47 AM:
---

Updates: Manually apply the diff and go through. Maybe there is something 
non-deterministic in {{tools/releasing/collect_license_files.sh}}.

Travis deadlock on {{KafkaTestBase}} because ZK AuthFailed, full log is here 
https://api.travis-ci.org/v3/job/585870154/log.txt

[~StephanEwen] yes I think it is reasonable to shade Flink's ZK as you 
proposed. Otherwise we have to twist our tests quite a bit to let them work. 
Even since Kafka and HBase don't really base on ZK 3.5 the twist is brittle.

I'd like to give it a try to see the effect. However, I'm not so familiar with 
the procedure so here I'd like to describe what I want to do first.

1. create a sub-module {{flink-shaded-zookeeper}} as what we do for 
{{flink-shaded-curator}}. Here I don't know how to generate the NOTICE file.
2. replace {{zookeeper}} dependencies in Flink scope with 
{{flink-shaded-zookeeper}}. Here I don't know whether there is an automatic way 
to list which dependencies should be replaced.


was (Author: tison):
Updates: Manually apply the diff and go through. Maybe there is something 
non-deterministic in {{tools/releasing/collect_license_files.sh}}.

Travis deadlock on {{KafkaTestBase}} because ZK AuthFailed, full log is here 
https://api.travis-ci.org/v3/job/585870154/log.txt

[~StephanEwen] yes I think it is reasonable to shade Flink's ZK as you 
proposed. Otherwise we have to twist our tests quite a bit to let them work. 
I'd like to give it a try to see the effect. However, I'm not so familiar with 
the procedure so here I'd like to describe what I want to do first.

1. create a sub-module {{flink-shaded-zookeeper}} as what we do for 
{{flink-shaded-curator}}. Here I don't know how to generate the NOTICE file.
2. replace {{zookeeper}} dependencies in Flink scope with 
{{flink-shaded-zookeeper}}. Here I don't know whether there is an automatic way 
to list which dependencies should be replaced.

> Bump Zookeeper to 3.5.5
> ---
>
> Key: FLINK-13417
> URL: https://issues.apache.org/jira/browse/FLINK-13417
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Konstantin Knauf
>Priority: Major
> Fix For: 1.10.0
>
>
> User might want to secure their Zookeeper connection via SSL.
> This requires a Zookeeper version >= 3.5.1. We might as well try to bump it 
> to 3.5.5, which is the latest version. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (FLINK-13417) Bump Zookeeper to 3.5.5

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931131#comment-16931131
 ] 

TisonKun edited comment on FLINK-13417 at 9/17/19 6:45 AM:
---

Updates: Manually apply the diff and go through. Maybe there is something 
non-deterministic in {{tools/releasing/collect_license_files.sh}}.

Travis deadlock on {{KafkaTestBase}} because ZK AuthFailed, full log is here 
https://api.travis-ci.org/v3/job/585870154/log.txt

[~StephanEwen] yes I think it is reasonable to shade Flink's ZK as you 
proposed. Otherwise we have to twist our tests quite a bit to let them work. 
I'd like to give it a try to see the effect. However, I'm not so familiar with 
the procedure so here I'd like to describe what I want to do first.

1. create a sub-module {{flink-shaded-zookeeper}} as what we do for 
{{flink-shaded-curator}}. Here I don't know how to generate the NOTICE file.
2. replace {{zookeeper}} dependencies in Flink scope with 
{{flink-shaded-zookeeper}}. Here I don't know whether there is an automatic way 
to list which dependencies should be replaced.


was (Author: tison):
Updates: Manually apply the diff and go through. Maybe there is something 
non-deterministic in {{tools.releasing/collect_license_files.sh}}.

Travis deadlock on {{KafkaTestBase}} because ZK AuthFailed, full log is here 
https://api.travis-ci.org/v3/job/585870154/log.txt

[~StephanEwen] yes I think it is reasonable that shade Flink's ZK as you 
proposed. Otherwise we have to twist our tests quite a bit to let them work. 
I'd like to give it a try to see the effect. However, I'm not so familiar with 
the procedure so here I'd like to describe what I want to do first.

1. create a sub-module {{flink-shaded-zookeeper}} as what we do for 
{{flink-shaded-curator}}. Here I don't know how to generate the NOTICE file.
2. replace {{zookeeper}} dependencies in Flink scope with 
{{flink-shaded-zookeeper}}. Here I don't know whether there is an automatic way 
to list which dependencies should be replaced.

> Bump Zookeeper to 3.5.5
> ---
>
> Key: FLINK-13417
> URL: https://issues.apache.org/jira/browse/FLINK-13417
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Konstantin Knauf
>Priority: Major
> Fix For: 1.10.0
>
>
> User might want to secure their Zookeeper connection via SSL.
> This requires a Zookeeper version >= 3.5.1. We might as well try to bump it 
> to 3.5.5, which is the latest version. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13417) Bump Zookeeper to 3.5.5

2019-09-17 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931131#comment-16931131
 ] 

TisonKun commented on FLINK-13417:
--

Updates: Manually apply the diff and go through. Maybe there is something 
non-deterministic in {{tools.releasing/collect_license_files.sh}}.

Travis deadlock on {{KafkaTestBase}} because ZK AuthFailed, full log is here 
https://api.travis-ci.org/v3/job/585870154/log.txt

[~StephanEwen] yes I think it is reasonable that shade Flink's ZK as you 
proposed. Otherwise we have to twist our tests quite a bit to let them work. 
I'd like to give it a try to see the effect. However, I'm not so familiar with 
the procedure so here I'd like to describe what I want to do first.

1. create a sub-module {{flink-shaded-zookeeper}} as what we do for 
{{flink-shaded-curator}}. Here I don't know how to generate the NOTICE file.
2. replace {{zookeeper}} dependencies in Flink scope with 
{{flink-shaded-zookeeper}}. Here I don't know whether there is an automatic way 
to list which dependencies should be replaced.

> Bump Zookeeper to 3.5.5
> ---
>
> Key: FLINK-13417
> URL: https://issues.apache.org/jira/browse/FLINK-13417
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Konstantin Knauf
>Priority: Major
> Fix For: 1.10.0
>
>
> User might want to secure their Zookeeper connection via SSL.
> This requires a Zookeeper version >= 3.5.1. We might as well try to bump it 
> to 3.5.5, which is the latest version. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13992) Refactor Optional parameter in InputGateWithMetrics#updateMetrics

2019-09-16 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16930355#comment-16930355
 ] 

TisonKun commented on FLINK-13992:
--

Thanks for your clarification [~azagrebin]! Now I understand the trade-off here 
and agree only mark {{@Nonnull}} in a context where one would expect a nullable 
value.

> Refactor Optional parameter in InputGateWithMetrics#updateMetrics
> -
>
> Key: FLINK-13992
> URL: https://issues.apache.org/jira/browse/FLINK-13992
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> As consensus from community code style discussion, in 
> {{InputGateWithMetrics#updateMetrics}} we can refactor to reduce the usage of 
> Optional parameter.
> cc [~azagrebin]
> {code:java}
> diff --git 
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
>  
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> index 5d2cfd95c4..e548fbf02b 100644
> --- 
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> +++ 
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> @@ -24,6 +24,8 @@ import 
> org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
>  import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
>  import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
>  
> +import javax.annotation.Nonnull;
> +
>  import java.io.IOException;
>  import java.util.Optional;
>  import java.util.concurrent.CompletableFuture;
> @@ -67,12 +69,12 @@ public class InputGateWithMetrics extends InputGate {
>  
>   @Override
>   public Optional getNext() throws IOException, 
> InterruptedException {
> - return updateMetrics(inputGate.getNext());
> + return inputGate.getNext().map(this::updateMetrics);
>   }
>  
>   @Override
>   public Optional pollNext() throws IOException, 
> InterruptedException {
> - return updateMetrics(inputGate.pollNext());
> + return inputGate.pollNext().map(this::updateMetrics);
>   }
>  
>   @Override
> @@ -85,8 +87,8 @@ public class InputGateWithMetrics extends InputGate {
>   inputGate.close();
>   }
>  
> - private Optional updateMetrics(Optional 
> bufferOrEvent) {
> - bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize()));
> + private BufferOrEvent updateMetrics(@Nonnull BufferOrEvent 
> bufferOrEvent) {
> + numBytesIn.inc(bufferOrEvent.getSize());
>   return bufferOrEvent;
>   }
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (FLINK-14041) Refactor LeaderRetrievalServiceHostnameResolutionTest and remove StandaloneUtils

2019-09-16 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-14041:


Assignee: TisonKun

> Refactor LeaderRetrievalServiceHostnameResolutionTest and remove 
> StandaloneUtils
> 
>
> Key: FLINK-14041
> URL: https://issues.apache.org/jira/browse/FLINK-14041
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Minor
> Fix For: 1.10.0
>
>
> {{StandaloneUtils}} is a poor utility that can be just replace with 
> {{HighAvailabilityServicesUtils}} on its only usages.
> Propose to refactor {{LeaderRetrievalServiceHostnameResolutionTest}} and 
> remove {{StandaloneUtils}}.
> cc [~Zentol]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13992) Refactor Optional parameter in InputGateWithMetrics#updateMetrics

2019-09-16 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16930336#comment-16930336
 ] 

TisonKun commented on FLINK-13992:
--

Thank [~azagrebin] for your updates. Now it works as expected.

For {{@Nonnull}}, I agree that with our code style guide it is unnecessary. My 
concern is that without {{@Nonull}} annotation IDE such as IDEA cannot 
automatically lint function calls that pass a nullable value as a non-null 
argument. IMO automatic detection overwhelms just conventions. What do you 
think?

> Refactor Optional parameter in InputGateWithMetrics#updateMetrics
> -
>
> Key: FLINK-13992
> URL: https://issues.apache.org/jira/browse/FLINK-13992
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> As consensus from community code style discussion, in 
> {{InputGateWithMetrics#updateMetrics}} we can refactor to reduce the usage of 
> Optional parameter.
> cc [~azagrebin]
> {code:java}
> diff --git 
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
>  
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> index 5d2cfd95c4..e548fbf02b 100644
> --- 
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> +++ 
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> @@ -24,6 +24,8 @@ import 
> org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
>  import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
>  import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
>  
> +import javax.annotation.Nonnull;
> +
>  import java.io.IOException;
>  import java.util.Optional;
>  import java.util.concurrent.CompletableFuture;
> @@ -67,12 +69,12 @@ public class InputGateWithMetrics extends InputGate {
>  
>   @Override
>   public Optional getNext() throws IOException, 
> InterruptedException {
> - return updateMetrics(inputGate.getNext());
> + return inputGate.getNext().map(this::updateMetrics);
>   }
>  
>   @Override
>   public Optional pollNext() throws IOException, 
> InterruptedException {
> - return updateMetrics(inputGate.pollNext());
> + return inputGate.pollNext().map(this::updateMetrics);
>   }
>  
>   @Override
> @@ -85,8 +87,8 @@ public class InputGateWithMetrics extends InputGate {
>   inputGate.close();
>   }
>  
> - private Optional updateMetrics(Optional 
> bufferOrEvent) {
> - bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize()));
> + private BufferOrEvent updateMetrics(@Nonnull BufferOrEvent 
> bufferOrEvent) {
> + numBytesIn.inc(bufferOrEvent.getSize());
>   return bufferOrEvent;
>   }
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13992) Refactor Optional parameter in InputGateWithMetrics#updateMetrics

2019-09-15 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16930183#comment-16930183
 ] 

TisonKun commented on FLINK-13992:
--

Thanks for your imformation [~azagrebin]. I think we can also mark {{@Nonnull}} 
to help the compiler/IDE find any function calls that violate the non-null 
requirement.

Besides, I cannot assign the issue to me atm. Is there any extra step should be 
done from my side?

> Refactor Optional parameter in InputGateWithMetrics#updateMetrics
> -
>
> Key: FLINK-13992
> URL: https://issues.apache.org/jira/browse/FLINK-13992
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> As consensus from community code style discussion, in 
> {{InputGateWithMetrics#updateMetrics}} we can refactor to reduce the usage of 
> Optional parameter.
> cc [~azagrebin]
> {code:java}
> diff --git 
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
>  
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> index 5d2cfd95c4..e548fbf02b 100644
> --- 
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> +++ 
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> @@ -24,6 +24,8 @@ import 
> org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
>  import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
>  import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
>  
> +import javax.annotation.Nonnull;
> +
>  import java.io.IOException;
>  import java.util.Optional;
>  import java.util.concurrent.CompletableFuture;
> @@ -67,12 +69,12 @@ public class InputGateWithMetrics extends InputGate {
>  
>   @Override
>   public Optional getNext() throws IOException, 
> InterruptedException {
> - return updateMetrics(inputGate.getNext());
> + return inputGate.getNext().map(this::updateMetrics);
>   }
>  
>   @Override
>   public Optional pollNext() throws IOException, 
> InterruptedException {
> - return updateMetrics(inputGate.pollNext());
> + return inputGate.pollNext().map(this::updateMetrics);
>   }
>  
>   @Override
> @@ -85,8 +87,8 @@ public class InputGateWithMetrics extends InputGate {
>   inputGate.close();
>   }
>  
> - private Optional updateMetrics(Optional 
> bufferOrEvent) {
> - bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize()));
> + private BufferOrEvent updateMetrics(@Nonnull BufferOrEvent 
> bufferOrEvent) {
> + numBytesIn.inc(bufferOrEvent.getSize());
>   return bufferOrEvent;
>   }
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13417) Bump Zookeeper to 3.5.5

2019-09-12 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928459#comment-16928459
 ] 

TisonKun commented on FLINK-13417:
--

Hi [~till.rohrmann] I met a NOTICE-binary issue and cannot resolve it follow 
the guide output. So I'd like to reach you for help.

With the fix mentioned above we don't fail on the same cause now. But after I 
do a rebase on master I encounter a failure message as below


{noformat}
> Apache ZooKeeper - Server
> Copyright 2008-2019 The Apache Software Foundation
> 
> Apache ZooKeeper - Jute
> Copyright 2008-2019 The Apache Software Foundation
> 
7922a7929,7937
> 
> Apache ZooKeeper - Server
> Copyright 2008-2019 The Apache Software Foundation
> 
> Apache ZooKeeper - Jute
> Copyright 2008-2019 The Apache Software Foundation
> 
> Apache Yetus - Audience Annotations
> Copyright 2015-2017 The Apache Software Foundation
==
ERROR: binary licensing is out-of-date.
Please update NOTICE-binary and licenses-binary:
Step 1: Rebuild flink
Step 2: Run 'tools/releasing/collect_license_files.sh build-target'
  This extracts all the licensing files from the distribution, and puts them in 
'licenses-output'.
  If the build-target symlink does not exist after building flink, point the 
tool to 'flink-dist/target/flink--bin/flink-' instead.
Step 3: Replace existing licensing
  Delete NOTICE-binary and the entire licenses-binary directory.
  Copy the contents in 'licenses-output' into the root directory of the Flink 
project.
Step 4: Remember to commit the changes!
==
{noformat}

full log is here https://api.travis-ci.org/v3/job/584054593/log.txt

so I follow the guide, regenerate the NOTICE file and do the replacement. 
However, I met another NOTICE failure

full log is here https://api.travis-ci.org/v3/job/584027857/log.txt

(do not attach the second output since it is quite a bit long)

> Bump Zookeeper to 3.5.5
> ---
>
> Key: FLINK-13417
> URL: https://issues.apache.org/jira/browse/FLINK-13417
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Konstantin Knauf
>Priority: Blocker
> Fix For: 1.10.0
>
>
> User might want to secure their Zookeeper connection via SSL.
> This requires a Zookeeper version >= 3.5.1. We might as well try to bump it 
> to 3.5.5, which is the latest version. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14068) Use Java's Duration instead of Flink's Time

2019-09-12 Thread TisonKun (Jira)
TisonKun created FLINK-14068:


 Summary: Use Java's Duration instead of Flink's Time
 Key: FLINK-14068
 URL: https://issues.apache.org/jira/browse/FLINK-14068
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream, Runtime / Configuration, Runtime / 
Coordination
Reporter: TisonKun
 Fix For: 2.0.0


As discussion in mailing list 
[here|https://lists.apache.org/x/thread.html/90ad2f1d7856cfe5bdc8f7dd678c626be96eeaeeb736e98f31660039@%3Cdev.flink.apache.org%3E]
 the community reaches a consensus that we will use Java's Duration for 
representing "time interval" instead of use Flink's Time for it.

Specifically, Flink has two {{Time}} classes, which are

{{org.apache.flink.api.common.time.Time}}
{{org.apache.flink.streaming.api.windowing.time.Time}}

the latter has been already deprecated and superseded by the former. Now we 
want to also deprecated the format and drop it in 2.0.0(we don't drop it just 
now because it is part of {{@Public}} interfaces).



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2019-09-12 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928384#comment-16928384
 ] 

TisonKun commented on FLINK-10333:
--

Yes that's it.

For implementation details it is an alternative that we add two new methods

- {{void removeLeaderInfo()}}
- {{LeaderStore getLeaderStore()}}

onto {{LeaderElectionService}} interface and simply adjust existing 
implementations to implement the methods but since the leader store hasn't been 
into use we can even defer the changes at interface level to next step.

Briefly, we *can* do without touching the existing implementations. Let's move 
more details into subtask :- )

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Attachments: screenshot-1.png
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even 
> if it is locked. This should not happen since it could leave another system 
> in an inconsistent state (imagine a changed {{JobGraph}} which restores from 
> an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
> which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
> * Getting rid of the {{SubmittedJobGraphListener}} would be helpful
> These problems made me think how reliable these components actually work. 
> Since these components are very important, I propose to refactor them.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2019-09-12 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928342#comment-16928342
 ] 

TisonKun commented on FLINK-10333:
--

I revoke the statement {{LeaderServer}} is a prerequisite for new 
high-availability services. As we discussed in the mailing list, we should 
narrow the intention per step.

Let's recur the big picture under this thread. We'd like to introduce a 
mechanism to ensure that

  - commit new state in ZooKeeper only if the contender is leader

and we choose a transaction store implementation for ZooKeeper scenario.

I will break down the implementation steps as below

First, re-implement {{ZooKeeperLeaderElectionService}} as described in the 
design document. All interfaces are compatible except we possibly change the 
layout of znodes(let's defer this discussion until a dedicated subtask created).

Second and further, we separately replace access points to 
ZooKeeper(abstractly, high-availability storage) such as JobGraphStore, 
CheckpointStore and so on with new leader election services which can return a 
transactional store.

If you agree this approach, I will create the first subtask and describe 
detailedly what we do and what we gain.

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Attachments: screenshot-1.png
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even 
> if it is locked. This should not happen since it could leave another system 
> in an inconsistent state (imagine a changed {{JobGraph}} which restores from 
> an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
> which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
> * Getting rid of the {{SubmittedJobGraphListener}} would be helpful
> These problems made me think how reliable these components actually work. 
> Since these components are very important, I propose to refactor them.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (FLINK-13417) Bump Zookeeper to 3.5.5

2019-09-12 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928316#comment-16928316
 ] 

TisonKun edited comment on FLINK-13417 at 9/12/19 7:43 AM:
---

[~StephanEwen] I figured out that it is because since 3.5.3 ZooKeeper default 
disable four letter words which HBase uses for waiting for server up. We can 
set the property in Flink test scope to enable four letter words and workaround 
this issue. Have patched the fix in private branch and still digging whether 
there are other issues.

references:

[1] https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_4lw
[2] 
https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_clusterOptions 
(look up {{zookeeper.4lw.commands.whitelist}})


was (Author: tison):
[~StephanEwen] I figured out that it is because since 3.5.3 ZooKeeper default 
disable four letter words which HBase uses for waiting for server up. We can 
set the property in Flink test scope to enable four letter words and workaround 
this issue. Have sent the fix and still digging whether there are other issues.

references:

[1] https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_4lw
[2] 
https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_clusterOptions 
(look up {{zookeeper.4lw.commands.whitelist}})

> Bump Zookeeper to 3.5.5
> ---
>
> Key: FLINK-13417
> URL: https://issues.apache.org/jira/browse/FLINK-13417
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Konstantin Knauf
>Priority: Blocker
> Fix For: 1.10.0
>
>
> User might want to secure their Zookeeper connection via SSL.
> This requires a Zookeeper version >= 3.5.1. We might as well try to bump it 
> to 3.5.5, which is the latest version. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13417) Bump Zookeeper to 3.5.5

2019-09-12 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928316#comment-16928316
 ] 

TisonKun commented on FLINK-13417:
--

[~StephanEwen] I figured out that it is because since 3.5.3 ZooKeeper default 
disable four letter words which HBase uses for waiting for server up. We can 
set the property in Flink test scope to enable four letter words and workaround 
this issue. Have sent the fix and still digging whether there are other issues.

references:

[1] https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_4lw
[2] 
https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_clusterOptions 
(look up {{zookeeper.4lw.commands.whitelist}})

> Bump Zookeeper to 3.5.5
> ---
>
> Key: FLINK-13417
> URL: https://issues.apache.org/jira/browse/FLINK-13417
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Konstantin Knauf
>Priority: Blocker
> Fix For: 1.10.0
>
>
> User might want to secure their Zookeeper connection via SSL.
> This requires a Zookeeper version >= 3.5.1. We might as well try to bump it 
> to 3.5.5, which is the latest version. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14054) Enable checkpointing via job configuration

2019-09-12 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928259#comment-16928259
 ] 

TisonKun commented on FLINK-14054:
--

I might mistake that we are already able to configure parallelism and other 
options in job level. [~qinjunjerry] do you have an idea how the config key of 
the configuration you proposed should be?

> Enable checkpointing via job configuration
> --
>
> Key: FLINK-14054
> URL: https://issues.apache.org/jira/browse/FLINK-14054
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Jun Qin
>Priority: Major
>
> Currently enabling checkpointing can only be done via the job code, see the 
> following quote from this Flink 
> [checkpointing|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing]
>  doc:
> {quote}By default, checkpointing is disabled. To enable checkpointing, call 
> {{enableCheckpointing(n)}} on the {{StreamExecutionEnvironment}}, where _n_ 
> is the checkpoint interval in milliseconds.
> {quote}
> This makes enabling checkingpointing after the job code has been released 
> difficult: one has to change and rebuild the job code.
> In addition, not only for developer, making checkpointing enabling 
> configurable is also of interest for operation teams:
>  * They may want to enable checkpointing for production but disable in test 
> (e.g., to save storage space)
>  * They may want to try out with and without checkpointing to evaluate the 
> impact to the job behaviour and performance.  
> Therefore, this request.  Thanks.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14054) Enable checkpointing via job configuration

2019-09-12 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928235#comment-16928235
 ] 

TisonKun commented on FLINK-14054:
--

Hi [~qinjunjerry]! Thanks for reporting your requirement. I agree that in your 
specific case enable checkpointing via configuration file instead of 
configuring inline provides more flexibility.

However, it seems we don't have an aspect setting job configuration in file 
yet. Maybe extra efforts are required for introducing such an aspect properly 
first.

CC [~twalthr] & [~till.rohrmann] as you are working on configuration recently.

> Enable checkpointing via job configuration
> --
>
> Key: FLINK-14054
> URL: https://issues.apache.org/jira/browse/FLINK-14054
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Jun Qin
>Priority: Major
>
> Currently enabling checkpointing can only be done via the job code, see the 
> following quote from this Flink 
> [checkpointing|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing]
>  doc:
> {quote}By default, checkpointing is disabled. To enable checkpointing, call 
> {{enableCheckpointing(n)}} on the {{StreamExecutionEnvironment}}, where _n_ 
> is the checkpoint interval in milliseconds.
> {quote}
> This makes enabling checkingpointing after the job code has been released 
> difficult: one has to change and rebuild the job code.
> In addition, not only for developer, making checkpointing enabling 
> configurable is also of interest for operation teams:
>  * They may want to enable checkpointing for production but disable in test 
> (e.g., to save storage space)
>  * They may want to try out with and without checkpointing to evaluate the 
> impact to the job behaviour and performance.  
> Therefore, this request.  Thanks.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-14054) Enable checkpointing via job configuration

2019-09-11 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-14054:
-
Description: 
Currently enabling checkpointing can only be done via the job code, see the 
following quote from this Flink 
[checkpointing|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing]
 doc:
{quote}By default, checkpointing is disabled. To enable checkpointing, call 
{{enableCheckpointing(n)}} on the {{StreamExecutionEnvironment}}, where _n_ is 
the checkpoint interval in milliseconds.
{quote}
This makes enabling checkingpointing after the job code has been released 
difficult: one has to change and rebuild the job code.

In addition, not only for developer, making checkpointing enabling configurable 
is also of interest for operation teams:
 * They may want to enable checkpointing for production but disable in test 
(e.g., to save storage space)
 * They may want to try out with and without checkpointing to evaluate the 
impact to the job behaviour and performance.  

Therefore, this request.  Thanks.

  was:
Currently enabling checkpointing can only be done via the job code, see the 
following quote from this Flink 
[checkpointing|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing]
 doc:
{quote}By default, checkpointing is disabled. To enable checkpointing, call 
{{enableCheckpointing(n)}} on the {{StreamExecutionEnvironment}}, where _n_ is 
the checkpoint interval in milliseconds.
{quote}
This makes enabling checkingpointing after the job code has been released 
difficult: one has to change and rebuild the job code.

In addition, not only for developer, making checkpointing enabling configurable 
is also of interest for operation teams:
 * They may want to enable checkpointing for production but disable in test 
(e.g., to save storage space)
 * They may want to try out with and without checkpointing to evaluate the 
impact to the job behaviour and performance.  

Therefore, this request.  Thanks.


> Enable checkpointing via job configuration
> --
>
> Key: FLINK-14054
> URL: https://issues.apache.org/jira/browse/FLINK-14054
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Jun Qin
>Priority: Major
>
> Currently enabling checkpointing can only be done via the job code, see the 
> following quote from this Flink 
> [checkpointing|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing]
>  doc:
> {quote}By default, checkpointing is disabled. To enable checkpointing, call 
> {{enableCheckpointing(n)}} on the {{StreamExecutionEnvironment}}, where _n_ 
> is the checkpoint interval in milliseconds.
> {quote}
> This makes enabling checkingpointing after the job code has been released 
> difficult: one has to change and rebuild the job code.
> In addition, not only for developer, making checkpointing enabling 
> configurable is also of interest for operation teams:
>  * They may want to enable checkpointing for production but disable in test 
> (e.g., to save storage space)
>  * They may want to try out with and without checkpointing to evaluate the 
> impact to the job behaviour and performance.  
> Therefore, this request.  Thanks.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14051) Deploy job cluster in attached mode

2019-09-11 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16927442#comment-16927442
 ] 

TisonKun commented on FLINK-14051:
--

Thanks for your information [~till.rohrmann].

I have two questions here:

1. I don't understand well what "the job might consists of multiple parts" 
means. AFAIK a Flink job can be described by a JobGraph and we don't support 
attach multiple JobGraphs for a single job.

2. Why it calls {{#deployJobCluster(..., detached=false)}} in 
{{ProgramDeployer#deployJobOnNewCluster}} under sql-client module? If it is the 
case as you described above then an attached job cluster is error prone.

> Deploy job cluster in attached mode
> ---
>
> Key: FLINK-14051
> URL: https://issues.apache.org/jira/browse/FLINK-14051
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission, Command Line Client
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> While working on FLINK-14048 I revisit the problem we handle deploy logic in 
> a complicated if-else branches in {{CliFrontend#runProgram}}. Previously we 
> said even in per-job mode and attached we deploy a session cluster for 
> historical reasons.
> However, I notice that {{#deployJobCluster}} has a parameter {{boolean 
> detached}}. Also it is used in sql-client package. So it looks like we can 
> deploy job cluster in attached mode as we do in sql-client package.
> However, as [~xccui] answered on mailing list 
> [here|https://lists.apache.org/x/thread.html/5464459db08f2a756af0c61eb02d34a26f04c27c62140886cad52731@%3Cuser.flink.apache.org%3E],
>  we support only standalone session cluster for sql-client. So maybe it is 
> not our case. Anyway, if we cannot deploy job cluster in attached mode, I'd 
> like to know the concrete reason.
> CC [~till.rohrmann] [~twalthr]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14048) Flink client hangs after trying to kill Yarn Job during deployment

2019-09-11 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16927372#comment-16927372
 ] 

TisonKun commented on FLINK-14048:
--

[~gyfora] also it looks like a duplication of FLINK-13895. Could you please 
check if the root cause of two issues is the same?

> Flink client hangs after trying to kill Yarn Job during deployment
> --
>
> Key: FLINK-14048
> URL: https://issues.apache.org/jira/browse/FLINK-14048
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Deployment / YARN
>Reporter: Gyula Fora
>Priority: Major
> Attachments: patch.diff
>
>
> If we kill the flink client run command from the terminal while deploying to 
> YARN (let's say we realize we used the wrong parameters), the YARN 
> application will be killed immediately but the client won't shut down.
> We get the following messages over and over:
> 19/09/10 23:35:55 INFO retry.RetryInvocationHandler: java.io.IOException: The 
> client is stopped, while invoking 
> ApplicationClientProtocolPBClientImpl.forceKillApplication over null after 14 
> failover attempts. Trying to failover after sleeping for 16296ms.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-14051) Deploy job cluster in attached mode

2019-09-11 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-14051:
-
Description: 
While working on FLINK-14048 I revisit the problem we handle deploy logic in a 
complicated if-else branches in {{CliFrontend#runProgram}}. Previously we said 
even in per-job mode and attached we deploy a session cluster for historical 
reasons.

However, I notice that {{#deployJobCluster}} has a parameter {{boolean 
detached}}. Also it is used in sql-client package. So it looks like we can 
deploy job cluster in attached mode as we do in sql-client package.

However, as [~xccui] answered on mailing list 
[here|https://lists.apache.org/x/thread.html/5464459db08f2a756af0c61eb02d34a26f04c27c62140886cad52731@%3Cuser.flink.apache.org%3E],
 we support only standalone session cluster for sql-client. So maybe it is not 
our case. Anyway, if we cannot deploy job cluster in attached mode, I'd like to 
know the concrete reason.

CC [~till.rohrmann] [~twalthr]

  was:
While working on FLINK-14048 I revisit the problem we handle deploy logic in a 
complicated if-else branched in {{CliFrontend#runProgram}}. Previously we said 
even in per-job mode and attached we deploy a session cluster for historical 
reasons.

However, I notice that {{#deployJobCluster}} has a parameter {{boolean 
detached}}. Also it is used in sql-client package. So it looks like we can 
deploy job cluster in attached mode as we do in sql-client package.

However, as [~xccui] answered on mailing list 
[here|https://lists.apache.org/x/thread.html/5464459db08f2a756af0c61eb02d34a26f04c27c62140886cad52731@%3Cuser.flink.apache.org%3E],
 we support only standalone session cluster for sql-client. So maybe it is not 
our case. Anyway, if we cannot deploy job cluster in attached mode, I'd like to 
know the concrete reason.

CC [~till.rohrmann] [~twalthr]


> Deploy job cluster in attached mode
> ---
>
> Key: FLINK-14051
> URL: https://issues.apache.org/jira/browse/FLINK-14051
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission, Command Line Client
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> While working on FLINK-14048 I revisit the problem we handle deploy logic in 
> a complicated if-else branches in {{CliFrontend#runProgram}}. Previously we 
> said even in per-job mode and attached we deploy a session cluster for 
> historical reasons.
> However, I notice that {{#deployJobCluster}} has a parameter {{boolean 
> detached}}. Also it is used in sql-client package. So it looks like we can 
> deploy job cluster in attached mode as we do in sql-client package.
> However, as [~xccui] answered on mailing list 
> [here|https://lists.apache.org/x/thread.html/5464459db08f2a756af0c61eb02d34a26f04c27c62140886cad52731@%3Cuser.flink.apache.org%3E],
>  we support only standalone session cluster for sql-client. So maybe it is 
> not our case. Anyway, if we cannot deploy job cluster in attached mode, I'd 
> like to know the concrete reason.
> CC [~till.rohrmann] [~twalthr]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14051) Deploy job cluster in attached mode

2019-09-11 Thread TisonKun (Jira)
TisonKun created FLINK-14051:


 Summary: Deploy job cluster in attached mode
 Key: FLINK-14051
 URL: https://issues.apache.org/jira/browse/FLINK-14051
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission, Command Line Client
Affects Versions: 1.10.0
Reporter: TisonKun
 Fix For: 1.10.0


While working on FLINK-14048 I revisit the problem we handle deploy logic in a 
complicated if-else branched in {{CliFrontend#runProgram}}. Previously we said 
even in per-job mode and attached we deploy a session cluster for historical 
reasons.

However, I notice that {{#deployJobCluster}} has a parameter {{boolean 
detached}}. Also it is used in sql-client package. So it looks like we can 
deploy job cluster in attached mode as we do in sql-client package.

However, as [~xccui] answered on mailing list 
[here|https://lists.apache.org/x/thread.html/5464459db08f2a756af0c61eb02d34a26f04c27c62140886cad52731@%3Cuser.flink.apache.org%3E],
 we support only standalone session cluster for sql-client. So maybe it is not 
our case. Anyway, if we cannot deploy job cluster in attached mode, I'd like to 
know the concrete reason.

CC [~till.rohrmann] [~twalthr]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14050) Refactor YarnClusterDescriptor inheritance

2019-09-11 Thread TisonKun (Jira)
TisonKun created FLINK-14050:


 Summary: Refactor YarnClusterDescriptor inheritance
 Key: FLINK-14050
 URL: https://issues.apache.org/jira/browse/FLINK-14050
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission, Command Line Client
Affects Versions: 1.10.0
Reporter: TisonKun
 Fix For: 1.10.0


Currently, the inheritance looks like

{{AbstractYarnClusterDescriptor}}
-> {{YarnClusterDescriptor}}
-> {{TestingYarnClusterDescriptor}}
-> {{NonDeployingYarnClusterDescriptor}}
->-> {{NonDeployingDetachedYarnClusterDescriptor}}

With an investigation, I find

1. {{AbstractYarnClusterDescriptor}} is introduced for migration purpose and no 
need any more.
2. {{TestingYarnClusterDescriptor}} is redundant and can be replaced directly 
with {{YarnClusterDescriptor}}.
3. Some methods like {{#createYarnClusterClient}} have parameters that never 
used, which are for historical reasons.

Thus, I propose we refactor {{YarnClusterDescriptor}} inheritance 
{{YarnClusterDescriptor}}
-> {{NonDeployingYarnClusterDescriptor}}
->-> {{NonDeployingDetachedYarnClusterDescriptor}}

and also methods remove unused parameters.

CC [~kkl0u] [~aljoscha] [~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14048) Flink client hangs after trying to kill Yarn Job during deployment

2019-09-11 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16927350#comment-16927350
 ] 

TisonKun commented on FLINK-14048:
--

I try to refactor the code for a proper exception handling. Could you apply the 
patch attached to see if the issue addressed?

> Flink client hangs after trying to kill Yarn Job during deployment
> --
>
> Key: FLINK-14048
> URL: https://issues.apache.org/jira/browse/FLINK-14048
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Deployment / YARN
>Reporter: Gyula Fora
>Priority: Major
> Attachments: patch.diff
>
>
> If we kill the flink client run command from the terminal while deploying to 
> YARN (let's say we realize we used the wrong parameters), the YARN 
> application will be killed immediately but the client won't shut down.
> We get the following messages over and over:
> 19/09/10 23:35:55 INFO retry.RetryInvocationHandler: java.io.IOException: The 
> client is stopped, while invoking 
> ApplicationClientProtocolPBClientImpl.forceKillApplication over null after 14 
> failover attempts. Trying to failover after sleeping for 16296ms.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-14048) Flink client hangs after trying to kill Yarn Job during deployment

2019-09-11 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-14048:
-
Attachment: patch.diff

> Flink client hangs after trying to kill Yarn Job during deployment
> --
>
> Key: FLINK-14048
> URL: https://issues.apache.org/jira/browse/FLINK-14048
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Deployment / YARN
>Reporter: Gyula Fora
>Priority: Major
> Attachments: patch.diff
>
>
> If we kill the flink client run command from the terminal while deploying to 
> YARN (let's say we realize we used the wrong parameters), the YARN 
> application will be killed immediately but the client won't shut down.
> We get the following messages over and over:
> 19/09/10 23:35:55 INFO retry.RetryInvocationHandler: java.io.IOException: The 
> client is stopped, while invoking 
> ApplicationClientProtocolPBClientImpl.forceKillApplication over null after 14 
> failover attempts. Trying to failover after sleeping for 16296ms.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14048) Flink client hangs after trying to kill Yarn Job during deployment

2019-09-11 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16927308#comment-16927308
 ] 

TisonKun commented on FLINK-14048:
--

[~gyfora] did you notice this problem when deploy per-job cluster? I find the 
relevant code snippet in {{CliFrontend#runProgram}} and it seems that when 
exception thrown(in this case, a signal cause exception) we don't close the 
{{ClusterClient}} properly. But it should only happen in per-job mode.

> Flink client hangs after trying to kill Yarn Job during deployment
> --
>
> Key: FLINK-14048
> URL: https://issues.apache.org/jira/browse/FLINK-14048
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Deployment / YARN
>Reporter: Gyula Fora
>Priority: Major
>
> If we kill the flink client run command from the terminal while deploying to 
> YARN (let's say we realize we used the wrong parameters), the YARN 
> application will be killed immediately but the client won't shut down.
> We get the following messages over and over:
> 19/09/10 23:35:55 INFO retry.RetryInvocationHandler: java.io.IOException: The 
> client is stopped, while invoking 
> ApplicationClientProtocolPBClientImpl.forceKillApplication over null after 14 
> failover attempts. Trying to failover after sleeping for 16296ms.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14041) Refactor LeaderRetrievalServiceHostnameResolutionTest and remove StandaloneUtils

2019-09-10 Thread TisonKun (Jira)
TisonKun created FLINK-14041:


 Summary: Refactor LeaderRetrievalServiceHostnameResolutionTest and 
remove StandaloneUtils
 Key: FLINK-14041
 URL: https://issues.apache.org/jira/browse/FLINK-14041
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.10.0
Reporter: TisonKun
 Fix For: 1.10.0


{{StandaloneUtils}} is a poor utility that can be just replace with 
{{HighAvailabilityServicesUtils}} on its only usages.

Propose to refactor {{LeaderRetrievalServiceHostnameResolutionTest}} and remove 
{{StandaloneUtils}}.

cc [~Zentol]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Closed] (FLINK-11664) Pass leadership information by LeaderAddressAndId

2019-09-10 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun closed FLINK-11664.

Resolution: Won't Do

> Pass leadership information by LeaderAddressAndId
> -
>
> Key: FLINK-11664
> URL: https://issues.apache.org/jira/browse/FLINK-11664
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Minor
>
> We already have {{LeaderAddressAndId}} class, and the existing 
> {{LeaderRetrievalListener#notifyLeaderAddress}} and 
> {{LeaderElectionService#confirmLeaderSessionID}} can benefit from using such 
> an encapsulation to pass leadership information.
> Specifically,
> {{LeaderRetrievalListener#notifyLeaderAddress(String leaderAddress, UUID 
> leaderSessionID)}} → 
> {{LeaderRetrievalListener#notifyLeadership(LeaderAddressAndId)}}
> {{LeaderElectionService#confirmLeaderSessionID(UUID leaderSessionID)}} → 
> {{LeaderElectionService#publishLeadership(LeaderAddressAndId)}}
> An option is rename {{LeaderAddressAndId}} to {{LeadershipInfo}}.
> cc [~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13961) Remove obsolete abstraction JobExecutor(Service)

2019-09-10 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-13961:
-
Affects Version/s: (was: 1.10.0)

> Remove obsolete abstraction JobExecutor(Service) 
> -
>
> Key: FLINK-13961
> URL: https://issues.apache.org/jira/browse/FLINK-13961
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Refer to Till's comment
> The JobExecutor and the JobExecutorService have been introduced to bridge 
> between the Flip-6 MiniCluster and the legacy FlinkMiniCluster. They should 
> be obsolete now and could be removed if needed.
> Actually we should make used of {{MiniClusterClient}} for submission ideally 
> but we have some tests based on MiniCluster in flink-runtime or somewhere 
> that doesn't have a dependency to flink-client; while move 
> {{MiniClusterClient}} to flink-runtime is unclear whether reasonable or not. 
> Thus I'd prefer keep {{executeJobBlocking}} for now and defer the possible 
> refactor.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Closed] (FLINK-13961) Remove obsolete abstraction JobExecutor(Service)

2019-09-10 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun closed FLINK-13961.

Fix Version/s: (was: 1.10.0)
   Resolution: Not A Problem

There are still users of this abstraction, although a proper way to do the 
submission might be through the cluster client.

Revisit when the prerequisite is ready.

> Remove obsolete abstraction JobExecutor(Service) 
> -
>
> Key: FLINK-13961
> URL: https://issues.apache.org/jira/browse/FLINK-13961
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Refer to Till's comment
> The JobExecutor and the JobExecutorService have been introduced to bridge 
> between the Flip-6 MiniCluster and the legacy FlinkMiniCluster. They should 
> be obsolete now and could be removed if needed.
> Actually we should make used of {{MiniClusterClient}} for submission ideally 
> but we have some tests based on MiniCluster in flink-runtime or somewhere 
> that doesn't have a dependency to flink-client; while move 
> {{MiniClusterClient}} to flink-runtime is unclear whether reasonable or not. 
> Thus I'd prefer keep {{executeJobBlocking}} for now and defer the possible 
> refactor.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14010) Dispatcher & JobManagers don't give up leadership when AM is shut down

2019-09-09 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16925460#comment-16925460
 ] 

TisonKun commented on FLINK-14010:
--

CC [~StephanEwen] [~till.rohrmann] [~xiaogang.shi]

Here comes a high-level problem, do we explicitly constrain Dispatcher, 
ResourceManager and JobManagers run on one process?

1. the usage of reference to {{JobManagerGateway}} in Dispatcher already infers 
that we require this.
2. back to the design of FLIP-6, we have a global singleton of Dispatcher, and 
for each job, launch a JobManager and ResourceManager. The implementation 
diverges quite a lot. Could you please provide any background?
3. if we explicitly constrain as above, we actually need not to start leader 
election services per components, actually, we can use the abstraction and 
layout as below:

- start a leader election service per dispatcher-resource-manager component, in 
cluster entrypoint level. It will participant the election and all metadata 
commits are delegate to this service.
- all cluster level components that need to publish their address, such as 
Dispatcher, ResourceManager and WebMonitor publish their address via this 
leader election service.
- Actors can be started as {{PermanentlyFencedRpcEndpoint}} and thus we survive 
from handling a lot of mutable shared state among leadership epoch. 
Specifically, cluster entrypoint acts as DispatcherRunner and so on, like 
JobManagerRunner to JobMaster. See also [this 
branch|https://github.com/tillrohrmann/flink/commits/removeSuspendFromJobMaster].

- back to this issue, cluster entrypoint({{YARNClusterEntrypoint}} maybe) 
reacts to AMRM request and thus all components can be required to shutdown 
properly.

> Dispatcher & JobManagers don't give up leadership when AM is shut down
> --
>
> Key: FLINK-14010
> URL: https://issues.apache.org/jira/browse/FLINK-14010
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.1, 1.9.0, 1.10.0
>Reporter: TisonKun
>Priority: Critical
>
> In YARN deployment scenario, YARN RM possibly launches a new AM for the job 
> even if the previous AM does not terminated, for example, when AMRM heartbeat 
> timeout. This is a common case that RM will send a shutdown request to the 
> previous AM and expect the AM shutdown properly.
> However, currently in {{YARNResourceManager}}, we handle this request in 
> {{onShutdownRequest}} which simply close the {{YARNResourceManager}} *but not 
> Dispatcher and JobManagers*. Thus, Dispatcher and JobManager launched in new 
> AM cannot be granted leadership properly. Visually,
> on previous AM: Dispatcher leader, JM leaders
> on new AM: ResourceManager leader
> since on client side or in per-job mode, JobManager address and port are 
> configured as the new AM, the whole cluster goes into an unrecoverable 
> inconsistent status: client all queries the dispatcher on new AM who is now 
> the leader. Briefly, Dispatcher and JobManagers on previous AM do not give up 
> their leadership properly.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14010) Dispatcher & JobManagers don't give up leadership when AM is shut down

2019-09-09 Thread TisonKun (Jira)
TisonKun created FLINK-14010:


 Summary: Dispatcher & JobManagers don't give up leadership when AM 
is shut down
 Key: FLINK-14010
 URL: https://issues.apache.org/jira/browse/FLINK-14010
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN, Runtime / Coordination
Affects Versions: 1.9.0, 1.8.1, 1.7.2, 1.10.0
Reporter: TisonKun


In YARN deployment scenario, YARN RM possibly launches a new AM for the job 
even if the previous AM does not terminated, for example, when AMRM heartbeat 
timeout. This is a common case that RM will send a shutdown request to the 
previous AM and expect the AM shutdown properly.

However, currently in {{YARNResourceManager}}, we handle this request in 
{{onShutdownRequest}} which simply close the {{YARNResourceManager}} *but not 
Dispatcher and JobManagers*. Thus, Dispatcher and JobManager launched in new AM 
cannot be granted leadership properly. Visually,

on previous AM: Dispatcher leader, JM leaders
on new AM: ResourceManager leader

since on client side or in per-job mode, JobManager address and port are 
configured as the new AM, the whole cluster goes into an unrecoverable 
inconsistent status: client all queries the dispatcher on new AM who is now the 
leader. Briefly, Dispatcher and JobManagers on previous AM do not give up their 
leadership properly.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13997) Remove legacy LeaderAddressAndId

2019-09-06 Thread TisonKun (Jira)
TisonKun created FLINK-13997:


 Summary: Remove legacy LeaderAddressAndId
 Key: FLINK-13997
 URL: https://issues.apache.org/jira/browse/FLINK-13997
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: TisonKun
 Fix For: 1.10.0


Also {{OneTimeLeaderListenerFuture}} which use {{LeaderAddressAndId}} but is 
dead code, too.

I'd like to supersede FLINK-11664 with this one because I can see the 
requirement tight {{leader address}} with {{leader session id}}, but it is not 
{{LeaderAddressAndId}}. It would be more natural to introduce such class when 
addressing FLINK-10333. Instead of a dedicate JIRA changes here and there.

WDYT? cc [~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13992) Refactor Optional parameter in InputGateWithMetrics#updateMetrics

2019-09-06 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-13992:
-
Issue Type: Improvement  (was: Bug)

> Refactor Optional parameter in InputGateWithMetrics#updateMetrics
> -
>
> Key: FLINK-13992
> URL: https://issues.apache.org/jira/browse/FLINK-13992
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> As consensus from community code style discussion, in 
> {{InputGateWithMetrics#updateMetrics}} we can refactor to reduce the usage of 
> Optional parameter.
> cc [~azagrebin]
> {code:java}
> diff --git 
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
>  
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> index 5d2cfd95c4..e548fbf02b 100644
> --- 
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> +++ 
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> @@ -24,6 +24,8 @@ import 
> org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
>  import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
>  import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
>  
> +import javax.annotation.Nonnull;
> +
>  import java.io.IOException;
>  import java.util.Optional;
>  import java.util.concurrent.CompletableFuture;
> @@ -67,12 +69,12 @@ public class InputGateWithMetrics extends InputGate {
>  
>   @Override
>   public Optional getNext() throws IOException, 
> InterruptedException {
> - return updateMetrics(inputGate.getNext());
> + return inputGate.getNext().map(this::updateMetrics);
>   }
>  
>   @Override
>   public Optional pollNext() throws IOException, 
> InterruptedException {
> - return updateMetrics(inputGate.pollNext());
> + return inputGate.pollNext().map(this::updateMetrics);
>   }
>  
>   @Override
> @@ -85,8 +87,8 @@ public class InputGateWithMetrics extends InputGate {
>   inputGate.close();
>   }
>  
> - private Optional updateMetrics(Optional 
> bufferOrEvent) {
> - bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize()));
> + private BufferOrEvent updateMetrics(@Nonnull BufferOrEvent 
> bufferOrEvent) {
> + numBytesIn.inc(bufferOrEvent.getSize());
>   return bufferOrEvent;
>   }
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13992) Refactor Optional parameter in InputGateWithMetrics#updateMetrics

2019-09-06 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924408#comment-16924408
 ] 

TisonKun commented on FLINK-13992:
--

Hi [~zjwang], do you think this issue valid? If so, could you help assign the 
issue to me? I'd like to provide a quick fix as posted above.

> Refactor Optional parameter in InputGateWithMetrics#updateMetrics
> -
>
> Key: FLINK-13992
> URL: https://issues.apache.org/jira/browse/FLINK-13992
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> As consensus from community code style discussion, in 
> {{InputGateWithMetrics#updateMetrics}} we can refactor to reduce the usage of 
> Optional parameter.
> cc [~azagrebin]
> {code:java}
> diff --git 
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
>  
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> index 5d2cfd95c4..e548fbf02b 100644
> --- 
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> +++ 
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> @@ -24,6 +24,8 @@ import 
> org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
>  import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
>  import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
>  
> +import javax.annotation.Nonnull;
> +
>  import java.io.IOException;
>  import java.util.Optional;
>  import java.util.concurrent.CompletableFuture;
> @@ -67,12 +69,12 @@ public class InputGateWithMetrics extends InputGate {
>  
>   @Override
>   public Optional getNext() throws IOException, 
> InterruptedException {
> - return updateMetrics(inputGate.getNext());
> + return inputGate.getNext().map(this::updateMetrics);
>   }
>  
>   @Override
>   public Optional pollNext() throws IOException, 
> InterruptedException {
> - return updateMetrics(inputGate.pollNext());
> + return inputGate.pollNext().map(this::updateMetrics);
>   }
>  
>   @Override
> @@ -85,8 +87,8 @@ public class InputGateWithMetrics extends InputGate {
>   inputGate.close();
>   }
>  
> - private Optional updateMetrics(Optional 
> bufferOrEvent) {
> - bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize()));
> + private BufferOrEvent updateMetrics(@Nonnull BufferOrEvent 
> bufferOrEvent) {
> + numBytesIn.inc(bufferOrEvent.getSize());
>   return bufferOrEvent;
>   }
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13961) Remove obsolete abstraction JobExecutor(Service)

2019-09-06 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924273#comment-16924273
 ] 

TisonKun commented on FLINK-13961:
--

I've opened a pull request #9643 to this issue. Feel free to give it a review 
:- )

> Remove obsolete abstraction JobExecutor(Service) 
> -
>
> Key: FLINK-13961
> URL: https://issues.apache.org/jira/browse/FLINK-13961
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Refer to Till's comment
> The JobExecutor and the JobExecutorService have been introduced to bridge 
> between the Flip-6 MiniCluster and the legacy FlinkMiniCluster. They should 
> be obsolete now and could be removed if needed.
> Actually we should make used of {{MiniClusterClient}} for submission ideally 
> but we have some tests based on MiniCluster in flink-runtime or somewhere 
> that doesn't have a dependency to flink-client; while move 
> {{MiniClusterClient}} to flink-runtime is unclear whether reasonable or not. 
> Thus I'd prefer keep {{executeJobBlocking}} for now and defer the possible 
> refactor.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13961) Remove obsolete abstraction JobExecutor(Service)

2019-09-06 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924256#comment-16924256
 ] 

TisonKun commented on FLINK-13961:
--

Thanks for your update [~kkl0u]! I will set the state to "In progress" once I 
start working on this :- )

> Remove obsolete abstraction JobExecutor(Service) 
> -
>
> Key: FLINK-13961
> URL: https://issues.apache.org/jira/browse/FLINK-13961
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> Refer to Till's comment
> The JobExecutor and the JobExecutorService have been introduced to bridge 
> between the Flip-6 MiniCluster and the legacy FlinkMiniCluster. They should 
> be obsolete now and could be removed if needed.
> Actually we should make used of {{MiniClusterClient}} for submission ideally 
> but we have some tests based on MiniCluster in flink-runtime or somewhere 
> that doesn't have a dependency to flink-client; while move 
> {{MiniClusterClient}} to flink-runtime is unclear whether reasonable or not. 
> Thus I'd prefer keep {{executeJobBlocking}} for now and defer the possible 
> refactor.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13992) Refactor Optional parameter in InputGateWithMetrics#updateMetrics

2019-09-06 Thread TisonKun (Jira)
TisonKun created FLINK-13992:


 Summary: Refactor Optional parameter in 
InputGateWithMetrics#updateMetrics
 Key: FLINK-13992
 URL: https://issues.apache.org/jira/browse/FLINK-13992
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: TisonKun
 Fix For: 1.10.0


As consensus from community code style discussion, in 
{{InputGateWithMetrics#updateMetrics}} we can refactor to reduce the usage of 
Optional parameter.

cc [~azagrebin]

{code:java}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index 5d2cfd95c4..e548fbf02b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -67,12 +69,12 @@ public class InputGateWithMetrics extends InputGate {
 
@Override
public Optional getNext() throws IOException, 
InterruptedException {
-   return updateMetrics(inputGate.getNext());
+   return inputGate.getNext().map(this::updateMetrics);
}
 
@Override
public Optional pollNext() throws IOException, 
InterruptedException {
-   return updateMetrics(inputGate.pollNext());
+   return inputGate.pollNext().map(this::updateMetrics);
}
 
@Override
@@ -85,8 +87,8 @@ public class InputGateWithMetrics extends InputGate {
inputGate.close();
}
 
-   private Optional updateMetrics(Optional 
bufferOrEvent) {
-   bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize()));
+   private BufferOrEvent updateMetrics(@Nonnull BufferOrEvent 
bufferOrEvent) {
+   numBytesIn.inc(bufferOrEvent.getSize());
return bufferOrEvent;
}
 }
{code}




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (FLINK-13990) Remove JobModificationException

2019-09-06 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924104#comment-16924104
 ] 

TisonKun edited comment on FLINK-13990 at 9/6/19 10:09 AM:
---

In fact, {{JobMasterException}} referred above is 
{{o.a.f.runtime.jobmaster.exception.JobMasterException}}.

We have another in used {{JobMasterException}}, 
{{o.a.f.runtime.jobmaster.JobMasterException}}. However, it has only one used 
point. So a valid issue is to investigate whether we can handle that used point 
in different(maybe more proper) way and adjust our exceptions inheritance.

I would not include any investigation of the latter into this issue, though.


was (Author: tison):
In fact, {{JobMasterException}} referred above is 
{{o.a.f.runtime.jobmaster.exception.JobMasterException}}.

We have another in used {{JobMasterException}}, 
{{o.a.f.runtime.jobmaster.JobMasterException}}. However, it has only one used 
point. So a valid issue is to investigate whether we can handle that used point 
in different(maybe more proper) way and adjust our exceptions inheritance.

> Remove JobModificationException
> ---
>
> Key: FLINK-13990
> URL: https://issues.apache.org/jira/browse/FLINK-13990
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> As for its name {{JobModificationException}}, I'm not sure whether the 
> purpose underneath still valid. But none of our codepaths use this exception. 
>  I think it was mainly used in {{Dispatcher}} but we evolve exception 
> handling there. We can always add back once it is back to valid.
> Propose to remove it.
> cc [~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13990) Remove JobModificationException

2019-09-06 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924104#comment-16924104
 ] 

TisonKun commented on FLINK-13990:
--

In fact, {{JobMasterException}} referred above is 
{{o.a.f.runtime.jobmaster.exception.JobMasterException}}.

We have another in used {{JobMasterException}}, 
{{o.a.f.runtime.jobmaster.JobMasterException}}. However, it has only one used 
point. So a valid issue is to investigate whether we can handle that used point 
in different(maybe more proper) way and adjust our exceptions inheritance.

> Remove JobModificationException
> ---
>
> Key: FLINK-13990
> URL: https://issues.apache.org/jira/browse/FLINK-13990
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> As for its name {{JobModificationException}}, I'm not sure whether the 
> purpose underneath still valid. But none of our codepaths use this exception. 
>  I think it was mainly used in {{Dispatcher}} but we evolve exception 
> handling there. We can always add back once it is back to valid.
> Propose to remove it.
> cc [~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13990) Remove JobModificationException

2019-09-06 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924101#comment-16924101
 ] 

TisonKun commented on FLINK-13990:
--

Also its base class {{JobMasterException}} that has none of outside usages.

> Remove JobModificationException
> ---
>
> Key: FLINK-13990
> URL: https://issues.apache.org/jira/browse/FLINK-13990
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> As for its name {{JobModificationException}}, I'm not sure whether the 
> purpose underneath still valid. But none of our codepaths use this exception. 
>  I think it was mainly used in {{Dispatcher}} but we evolve exception 
> handling there. We can always add back once it is back to valid.
> Propose to remove it.
> cc [~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13990) Remove JobModificationException

2019-09-06 Thread TisonKun (Jira)
TisonKun created FLINK-13990:


 Summary: Remove JobModificationException
 Key: FLINK-13990
 URL: https://issues.apache.org/jira/browse/FLINK-13990
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: TisonKun
 Fix For: 1.10.0


As for its name {{JobModificationException}}, I'm not sure whether the purpose 
underneath still valid. But none of our codepaths use this exception.  I think 
it was mainly used in {{Dispatcher}} but we evolve exception handling there. We 
can always add back once it is back to valid.

Propose to remove it.

cc [~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13989) Remove legacy ClassloadingProps

2019-09-06 Thread TisonKun (Jira)
TisonKun created FLINK-13989:


 Summary: Remove legacy ClassloadingProps
 Key: FLINK-13989
 URL: https://issues.apache.org/jira/browse/FLINK-13989
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: TisonKun
 Fix For: 1.10.0


{{ClassloadingProps}} is used for legacy {{JobManager}}, removed as dead code.

[~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13988) Remove legacy JobManagerMode

2019-09-06 Thread TisonKun (Jira)
TisonKun created FLINK-13988:


 Summary: Remove legacy JobManagerMode
 Key: FLINK-13988
 URL: https://issues.apache.org/jira/browse/FLINK-13988
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: TisonKun
 Fix For: 1.10.0


Indeed it belongs to pre FLIP-6 framework.

Also remove its usage in {{JobManagerCliOptions}} and the the unused 
{{JobManagerCliOptions}}.

cc [~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13970) Remove LifoSetQueue and SetQueue

2019-09-05 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923602#comment-16923602
 ] 

TisonKun commented on FLINK-13970:
--

With another pass I find {{SetQueue}} is quite similar with {{LifoSetQueue}} 
and I'd like to do the removal in one pass. Also {{SetQueue}} is no longer used.

> Remove LifoSetQueue and SetQueue
> 
>
> Key: FLINK-13970
> URL: https://issues.apache.org/jira/browse/FLINK-13970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> Hi [~till.rohrmann] I found a class {{LifoSetQueue}} which is not into used 
> any more. IIRC it was ever used in {{Scheduler}} and {{Instance}}. Shall we 
> remove this class also or put it under some directory collects utils?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (FLINK-13970) Remove LifoSetQueue and SetQueue

2019-09-05 Thread TisonKun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-13970:
-
Summary: Remove LifoSetQueue and SetQueue  (was: Remove or move 
LifoSetQueue)

> Remove LifoSetQueue and SetQueue
> 
>
> Key: FLINK-13970
> URL: https://issues.apache.org/jira/browse/FLINK-13970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> Hi [~till.rohrmann] I found a class {{LifoSetQueue}} which is not into used 
> any more. IIRC it was ever used in {{Scheduler}} and {{Instance}}. Shall we 
> remove this class also or put it under some directory collects utils?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13964) Remove usage of deprecated methods from MiniCluster

2019-09-05 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923244#comment-16923244
 ] 

TisonKun commented on FLINK-13964:
--

Technically the proposal is valid and there isn't too much effort to implement 
it.

I have just thought of whether we can access {{restAddressURI}} directly in 
MiniCluster scenario and specially handle this case due to its special natural. 
But let's say we are likely start multiple dispatcher rm components in 
MiniCluster scenario(for test mostly) it's more naturally we use leader 
retrieval services.

I don't see any further concerns includes rest endpoint retriever in 
cluster-side high-availability services. So +1 for the proposal.

For the (conceptually and implementation) separation of leader election 
services and leader retrieval services(a.k.a name services), I am drafting a 
JIRA based on a failure case I have met. It won't conflict with your proposal 
that cluster-side high-availability has all access. So again, let's do as you 
proposed.

> Remove usage of deprecated methods from MiniCluster
> ---
>
> Key: FLINK-13964
> URL: https://issues.apache.org/jira/browse/FLINK-13964
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.10.0
>
>
> With FLINK-13750 we deprecated 
> {{HighAvailabilityServices#getWebMonitorRetrieverService}}. This method is 
> still actively used by the {{MiniCluster}}. We should remove the usage in 
> order to also support custom {{HighAvailabilityService}} implementations 
> which no longer implement 
> {{HighAvailabilityServices#getWebMonitorRetrieverService}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13970) Remove or move LifoSetQueue

2019-09-05 Thread TisonKun (Jira)
TisonKun created FLINK-13970:


 Summary: Remove or move LifoSetQueue
 Key: FLINK-13970
 URL: https://issues.apache.org/jira/browse/FLINK-13970
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: TisonKun
 Fix For: 1.10.0


Hi [~till.rohrmann] I found a class {{LifoSetQueue}} which is not into used any 
more. IIRC it was ever used in {{Scheduler}} and {{Instance}}. Shall we remove 
this class also or put it under some directory collects utils?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2019-09-05 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923170#comment-16923170
 ] 

TisonKun commented on FLINK-10333:
--

Details and implementation of {{LeaderServer}} is regarded as a prerequisite 
for new high-availability services, otherwise we have to implement embedded one 
which should not be required as design(see also 
[here|https://lists.apache.org/x/thread.html/0da7ff1f985125f5f0f16b15cd1b6617f68d15cf11c421245071a485@%3Cdev.flink.apache.org%3E])
 and live with the inconsistency views/apis between different 
implementation(see concerns about retrieve JobMaster address above).

We can start a separated thread to handle it if we reach a consensus here. It 
would be cleanly individually integrated in current codebase.

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Attachments: screenshot-1.png
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even 
> if it is locked. This should not happen since it could leave another system 
> in an inconsistent state (imagine a changed {{JobGraph}} which restores from 
> an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
> which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
> * Getting rid of the {{SubmittedJobGraphListener}} would be helpful
> These problems made me think how reliable these components actually work. 
> Since these components are very important, I propose to refactor them.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2019-09-05 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923170#comment-16923170
 ] 

TisonKun edited comment on FLINK-10333 at 9/5/19 8:17 AM:
--

{{LeaderServer}} is regarded as a prerequisite for new high-availability 
services, otherwise we have to implement embedded one which should not be 
required as design(see also 
[here|https://lists.apache.org/x/thread.html/0da7ff1f985125f5f0f16b15cd1b6617f68d15cf11c421245071a485@%3Cdev.flink.apache.org%3E])
 and live with the inconsistency views/apis between different 
implementation(see concerns about retrieve JobMaster address above).

We can start a separated thread to handle its details and implementation if we 
reach a consensus here. It would be cleanly individually integrated in current 
codebase.


was (Author: tison):
Details and implementation of {{LeaderServer}} is regarded as a prerequisite 
for new high-availability services, otherwise we have to implement embedded one 
which should not be required as design(see also 
[here|https://lists.apache.org/x/thread.html/0da7ff1f985125f5f0f16b15cd1b6617f68d15cf11c421245071a485@%3Cdev.flink.apache.org%3E])
 and live with the inconsistency views/apis between different 
implementation(see concerns about retrieve JobMaster address above).

We can start a separated thread to handle it if we reach a consensus here. It 
would be cleanly individually integrated in current codebase.

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Attachments: screenshot-1.png
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even 
> if it is locked. This should not happen since it could leave another system 
> in an inconsistent state (imagine a changed {{JobGraph}} which restores from 
> an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
> which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
> * Getting rid of the {{SubmittedJobGraphListener}} would be helpful
> These problems made me think how reliable these components actually work. 
> Since these components are very important, I propose to refactor them.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2019-09-05 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923156#comment-16923156
 ] 

TisonKun commented on FLINK-10333:
--

Hi [~till.rohrmann] [~xiaogang.shi] [~StephanEwen], I'd like to share some 
progress here. Mainly about our perspective of high-availability services and 
ideas on non-ha case which follows the proposed design of leader stores. I'm 
gonna update the document with details correspondingly but first put here for 
previewing overall direction.

As the content layout in the linked document, we regards high-availability 
services provides by FLINK as 3 parts: leader election services, name 
services(known as leader retrieval service) and metadata storage.

In order to contribute the whole rework under the topic of FLINK-10333, it 
makes sense that we split the initial transaction store implementation from 
metadata storage includes job graphs and checkpoints store that make use of it. 
Basically, the first pass includes reimplemented leader election services and 
name services based on new store layout, and also a leader store implementation 
but don't be used outside yet. 

We have internally finished the integration work and seems that the new 
implementation works well with current implementation of job graph store and 
checkpoint store which will later bases on leader store. In other words, it is 
possible that we apply transaction store in steps.

However, there are a bit of concerns when the actual integration happens, which 
is mainly about non-ha case and high-availability services dependencies.

(1) For non-ha case, we notice that current 
{{StandaloneHAServices}}(pre-configured) and {{EmbeddedHAServices}}(in-memory) 
has their respective problems. 

For pre-configured case, we now have a {{getJobManagerLeaderRetriever(JobID, 
defaultJMAddress)}} method to workaround the problem that it is impossible to 
configure JM Address previously. The parameter is not in use in any other case 
in any other high-availability mode. Also in MiniCluster case and anywhere else 
leader address pre-configure becomes impossible, {{StandaloneHAServices}} 
cannot be used. For in-memory case, it is clearly that it doesn't fit any 
distributed cases.

Internally, we introduce a {{LeaderServer}}/{{LeaderClient}} pair which acts 
like a simplified standalone zookeeper cluster to provide leader elections and 
name services. Briefly, we start a {{LeaderServer}} actor in JM groups with 
fixed name, and {{LeaderClient}} actors in JM groups, TMs, and cluster client 
who knows where {{LeaderServer}} is and register for notified new leader 
address. In this way, we share a unified view between non-ha and zookeeper 
based implementation where the difference is that LeaderServer runs on one 
point and doesn't tolerate failure. Also, both {{StandaloneHAServices}} and 
{{EmbeddedHAServices}} can be unified under this abstraction and thus we have 
one implementation for non-ha case.

(2) For high-availability dependencies, not only we find that, as described in 
FLINK-13750, high-availability services requirement in client *and in TM* is 
different from that in JM; but also in TM, create {{RpcServices}} depends on a 
working high-availability services that used to retrieved RM address which used 
to determine TM bind-address. This will conflict if we want to first start a 
LeaderClient actor in TM and construct high-availability services. We are 
thinking about configuring an address range that JM group runs on and eliminate 
the dependency from RpcServices to high-availability services(only for name 
services here).

(3) As the rework road, we'd like to directly replace the implementation with 
solid tests and verification. It is because the internal APIs are incompatible, 
and even we introduce a switch, either place two implementations in contenders 
and use them respectively or implement a new series of 
contenders(Dispatcher/JM/...) using new implementation(just like what we do in 
FLIP-6) seems cost unreasonably too much.(Besides, the first approach is error 
prone because we handle different implementations manually in one place for the 
same purpose.)

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Attachments: screenshot-1.png
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed 

[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2019-09-05 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923109#comment-16923109
 ] 

TisonKun commented on FLINK-10333:
--

Thanks for your information [~till.rohrmann]. I think it is about how to make 
data structures such as job graphs and checkpoints backward compatible. Under  
the topic off FLINK-10333 we mainly deal with how to perform leader election, 
leader retrieval and metadata storage. As for metadata storage, it can handle 
no matter what kind of data because we always persist bytes and leave the serde 
jobs to upper-layer.

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Attachments: screenshot-1.png
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even 
> if it is locked. This should not happen since it could leave another system 
> in an inconsistent state (imagine a changed {{JobGraph}} which restores from 
> an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
> which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
> * Getting rid of the {{SubmittedJobGraphListener}} would be helpful
> These problems made me think how reliable these components actually work. 
> Since these components are very important, I propose to refactor them.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13500) RestClusterClient requires S3 access when HA is configured

2019-09-04 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923010#comment-16923010
 ] 

TisonKun commented on FLINK-13500:
--

Hi [~till.rohrmann] & [~Zentol], with FLINK-13750 resolved I think we can mark 
this issue as resolved also

> RestClusterClient requires S3 access when HA is configured
> --
>
> Key: FLINK-13500
> URL: https://issues.apache.org/jira/browse/FLINK-13500
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / REST
>Affects Versions: 1.8.1
>Reporter: David Judd
>Priority: Major
>
> RestClusterClient initialization calls ClusterClient initialization, which 
> calls
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices
> In turn, createHighAvailabilityServices calls 
> BlobUtils.createBlobStoreFromConfig, which in our case tries to talk to S3.
> It seems very surprising to me that (a) RestClusterClient needs any form of 
> access other than to the REST API, and (b) that client initialization would 
> attempt a write as a side effect. I do not see either of these surprising 
> facts described in the documentation–are they intentional?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13961) Remove obsolete abstraction JobExecutor(Service)

2019-09-04 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923003#comment-16923003
 ] 

TisonKun commented on FLINK-13961:
--

Hi [~kkl0u] I'd like to work on this issue. Please assign the issue to me if 
there is no more concern. I'm going to start progress once FLINK-13946 resolved 
since there would be several conflicts and FLINK-13946 is almost done.

> Remove obsolete abstraction JobExecutor(Service) 
> -
>
> Key: FLINK-13961
> URL: https://issues.apache.org/jira/browse/FLINK-13961
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.10.0
>
>
> Refer to Till's comment
> The JobExecutor and the JobExecutorService have been introduced to bridge 
> between the Flip-6 MiniCluster and the legacy FlinkMiniCluster. They should 
> be obsolete now and could be removed if needed.
> Actually we should make used of {{MiniClusterClient}} for submission ideally 
> but we have some tests based on MiniCluster in flink-runtime or somewhere 
> that doesn't have a dependency to flink-client; while move 
> {{MiniClusterClient}} to flink-runtime is unclear whether reasonable or not. 
> Thus I'd prefer keep {{executeJobBlocking}} for now and defer the possible 
> refactor.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13417) Bump Zookeeper to 3.5.5

2019-09-04 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16922970#comment-16922970
 ] 

TisonKun commented on FLINK-13417:
--

Yes [~till.rohrmann]. I locally build with zk 3.5 and no compile error reported 
while I fired CI it passed almost builds, see also

https://travis-ci.org/TisonKun/flink/builds/580757901

which reported failures on {{HBaseConnectorITCase}} when started HBase 
MiniCluster when started MiniZooKeeperCluster. It seems like a problem of 
testing class implementation.

Maybe [~carp84] can provide some inputs here.


{code:java}
java.io.IOException: Waiting for startup of standalone server

at 
org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster.startup(MiniZooKeeperCluster.java:261)
at 
org.apache.hadoop.hbase.HBaseTestingUtility.startMiniZKCluster(HBaseTestingUtility.java:814)
at 
org.apache.hadoop.hbase.HBaseTestingUtility.startMiniZKCluster(HBaseTestingUtility.java:784)
at 
org.apache.hadoop.hbase.HBaseTestingUtility.startMiniCluster(HBaseTestingUtility.java:1041)
at 
org.apache.hadoop.hbase.HBaseTestingUtility.startMiniCluster(HBaseTestingUtility.java:917)
at 
org.apache.hadoop.hbase.HBaseTestingUtility.startMiniCluster(HBaseTestingUtility.java:899)
at 
org.apache.hadoop.hbase.HBaseTestingUtility.startMiniCluster(HBaseTestingUtility.java:881)
at 
org.apache.flink.addons.hbase.util.HBaseTestingClusterAutoStarter.setUp(HBaseTestingClusterAutoStarter.java:147)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
{code}


> Bump Zookeeper to 3.5.5
> ---
>
> Key: FLINK-13417
> URL: https://issues.apache.org/jira/browse/FLINK-13417
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Konstantin Knauf
>Priority: Blocker
> Fix For: 1.10.0
>
>
> User might want to secure their Zookeeper connection via SSL.
> This requires a Zookeeper version >= 3.5.1. We might as well try to bump it 
> to 3.5.5, which is the latest version. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13961) Remove obsolete abstraction JobExecutor(Service)

2019-09-04 Thread TisonKun (Jira)
TisonKun created FLINK-13961:


 Summary: Remove obsolete abstraction JobExecutor(Service) 
 Key: FLINK-13961
 URL: https://issues.apache.org/jira/browse/FLINK-13961
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission
Affects Versions: 1.10.0
Reporter: TisonKun
 Fix For: 1.10.0


Refer to Till's comment

The JobExecutor and the JobExecutorService have been introduced to bridge 
between the Flip-6 MiniCluster and the legacy FlinkMiniCluster. They should be 
obsolete now and could be removed if needed.

Actually we should make used of {{MiniClusterClient}} for submission ideally 
but we have some tests based on MiniCluster in flink-runtime or somewhere that 
doesn't have a dependency to flink-client; while move {{MiniClusterClient}} to 
flink-runtime is unclear whether reasonable or not. Thus I'd prefer keep 
{{executeJobBlocking}} for now and defer the possible refactor.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-13946) Remove deactivated JobSession-related code.

2019-09-03 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921951#comment-16921951
 ] 

TisonKun commented on FLINK-13946:
--

Good to hear. I'm volunteer to review your patch :-)

> Remove deactivated JobSession-related code.
> ---
>
> Key: FLINK-13946
> URL: https://issues.apache.org/jira/browse/FLINK-13946
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission
>Affects Versions: 1.9.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>
> This issue refers to removing the code related to job session as described in 
> [FLINK-2097|https://issues.apache.org/jira/browse/FLINK-2097].  The feature 
> is deactivated, as pointed by the comment 
> [here|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L285]
>  and it complicates the code paths related to job submission, namely the 
> lifecycle of the Remote and LocalExecutors.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


  1   2   3   4   5   6   >