[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112611#comment-16112611 ] ASF GitHub Bot commented on FLINK-7334: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4462 > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112604#comment-16112604 ] ASF GitHub Bot commented on FLINK-7334: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4462 Thanks for reviewing @zentol. I'll address your comments and then merge this PR. > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112353#comment-16112353 ] ASF GitHub Bot commented on FLINK-7334: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4462#discussion_r131072988 --- Diff: flink-runtime/src/test/resources/log4j-test.properties --- @@ -16,7 +16,7 @@ # limitations under the License. -log4j.rootLogger=OFF, console --- End diff -- Thanks for catching this @zentol. > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16111425#comment-16111425 ] ASF GitHub Bot commented on FLINK-7334: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4462#discussion_r130949194 --- Diff: flink-runtime/src/test/resources/log4j-test.properties --- @@ -16,7 +16,7 @@ # limitations under the License. -log4j.rootLogger=OFF, console --- End diff -- needs to be reverted > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16111258#comment-16111258 ] ASF GitHub Bot commented on FLINK-7334: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4462 Thanks for your review @zentol. I've addressed your comments. > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16111256#comment-16111256 ] ASF GitHub Bot commented on FLINK-7334: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4462#discussion_r130930816 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java --- @@ -168,7 +165,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception { eq(taskManagerLocation), eq(jmLeaderId), any(Time.class) - )).thenReturn(FlinkCompletableFuture.completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); + )).thenReturn(java.util.concurrent.CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort))); --- End diff -- True. Will change it. > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16111255#comment-16111255 ] ASF GitHub Bot commented on FLINK-7334: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4462#discussion_r130930727 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java --- @@ -178,20 +178,19 @@ public int getPort() { } @Override - public Future connect(String address, Class clazz) { + public CompletableFuture connect(String address, Class clazz) { RpcGateway gateway = registeredConnections.get(address); if (gateway != null) { if (clazz.isAssignableFrom(gateway.getClass())) { @SuppressWarnings("unchecked") C typedGateway = (C) gateway; - return FlinkCompletableFuture.completed(typedGateway); + return CompletableFuture.completedFuture(typedGateway); } else { - return FlinkCompletableFuture.completedExceptionally( - new Exception("Gateway registered under " + address + " is not of type " + clazz)); + return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz)); } } else { - return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name")); + return FutureUtils.completedExceptionally(new Exception("No gateway registered under that name")); --- End diff -- will add it. > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16111253#comment-16111253 ] ASF GitHub Bot commented on FLINK-7334: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4462#discussion_r130930461 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java --- @@ -92,20 +92,19 @@ public void registerGateway(String address, RpcGateway gateway) { } @Override - public Future connect(String address, Class clazz) { + public CompletableFuture connect(String address, Class clazz) { RpcGateway gateway = registeredConnections.get(address); if (gateway != null) { if (clazz.isAssignableFrom(gateway.getClass())) { @SuppressWarnings("unchecked") C typedGateway = (C) gateway; - return FlinkCompletableFuture.completed(typedGateway); + return CompletableFuture.completedFuture(typedGateway); } else { - return FlinkCompletableFuture.completedExceptionally( - new Exception("Gateway registered under " + address + " is not of type " + clazz)); + return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz)); } } else { - return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name")); + return FutureUtils.completedExceptionally(new Exception("No gateway registered under that name")); --- End diff -- True. Will add it. > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16111251#comment-16111251 ] ASF GitHub Bot commented on FLINK-7334: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4462#discussion_r130929937 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -755,48 +751,42 @@ public void failSlot(final ResourceID taskManagerId, if (registeredTaskManagers.containsKey(taskManagerId)) { final RegistrationResponse response = new JMTMRegistrationSuccess( resourceId, libraryCacheManager.getBlobServerPort()); - return FlinkCompletableFuture.completed(response); + return CompletableFuture.completedFuture(response); } else { - return getRpcService().execute(new Callable() { - @Override - public TaskExecutorGateway call() throws Exception { - return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class) - .get(rpcTimeout.getSize(), rpcTimeout.getUnit()); - } - }).handleAsync(new BiFunction() { - @Override - public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) { - if (throwable != null) { - return new RegistrationResponse.Decline(throwable.getMessage()); - } - - if (!JobMaster.this.leaderSessionID.equals(leaderId)) { - log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " + - "leader session ID {} did not equal the received leader session ID {}.", - taskManagerId, taskManagerRpcAddress, - JobMaster.this.leaderSessionID, leaderId); - return new RegistrationResponse.Decline("Invalid leader session id"); - } - - slotPoolGateway.registerTaskManager(taskManagerId); - registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway)); - - // monitor the task manager as heartbeat target - taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget() { - @Override - public void receiveHeartbeat(ResourceID resourceID, Void payload) { - // the task manager will not request heartbeat, so this method will never be called currently + return getRpcService() + .connect(taskManagerRpcAddress, TaskExecutorGateway.class) --- End diff -- Because we were blocking a thread from the `RpcService's` `Executor` without a reason by calling `get` on the returned future by `RpcService#connect`. > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110892#comment-16110892 ] ASF GitHub Bot commented on FLINK-7334: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4462#discussion_r130874525 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java --- @@ -168,7 +165,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception { eq(taskManagerLocation), eq(jmLeaderId), any(Time.class) - )).thenReturn(FlinkCompletableFuture.completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); + )).thenReturn(java.util.concurrent.CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort))); --- End diff -- This can be shortened. (multiple times in this file) > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110891#comment-16110891 ] ASF GitHub Bot commented on FLINK-7334: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4462#discussion_r130873991 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java --- @@ -178,20 +178,19 @@ public int getPort() { } @Override - public Future connect(String address, Class clazz) { + public CompletableFuture connect(String address, Class clazz) { RpcGateway gateway = registeredConnections.get(address); if (gateway != null) { if (clazz.isAssignableFrom(gateway.getClass())) { @SuppressWarnings("unchecked") C typedGateway = (C) gateway; - return FlinkCompletableFuture.completed(typedGateway); + return CompletableFuture.completedFuture(typedGateway); } else { - return FlinkCompletableFuture.completedExceptionally( - new Exception("Gateway registered under " + address + " is not of type " + clazz)); + return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz)); } } else { - return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name")); + return FutureUtils.completedExceptionally(new Exception("No gateway registered under that name")); --- End diff -- same as above > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110889#comment-16110889 ] ASF GitHub Bot commented on FLINK-7334: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4462#discussion_r130873887 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java --- @@ -92,20 +92,19 @@ public void registerGateway(String address, RpcGateway gateway) { } @Override - public Future connect(String address, Class clazz) { + public CompletableFuture connect(String address, Class clazz) { RpcGateway gateway = registeredConnections.get(address); if (gateway != null) { if (clazz.isAssignableFrom(gateway.getClass())) { @SuppressWarnings("unchecked") C typedGateway = (C) gateway; - return FlinkCompletableFuture.completed(typedGateway); + return CompletableFuture.completedFuture(typedGateway); } else { - return FlinkCompletableFuture.completedExceptionally( - new Exception("Gateway registered under " + address + " is not of type " + clazz)); + return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz)); } } else { - return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name")); + return FutureUtils.completedExceptionally(new Exception("No gateway registered under that name")); --- End diff -- We could include the name in the exception. > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110890#comment-16110890 ] ASF GitHub Bot commented on FLINK-7334: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4462#discussion_r130868023 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -755,48 +751,42 @@ public void failSlot(final ResourceID taskManagerId, if (registeredTaskManagers.containsKey(taskManagerId)) { final RegistrationResponse response = new JMTMRegistrationSuccess( resourceId, libraryCacheManager.getBlobServerPort()); - return FlinkCompletableFuture.completed(response); + return CompletableFuture.completedFuture(response); } else { - return getRpcService().execute(new Callable() { - @Override - public TaskExecutorGateway call() throws Exception { - return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class) - .get(rpcTimeout.getSize(), rpcTimeout.getUnit()); - } - }).handleAsync(new BiFunction() { - @Override - public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) { - if (throwable != null) { - return new RegistrationResponse.Decline(throwable.getMessage()); - } - - if (!JobMaster.this.leaderSessionID.equals(leaderId)) { - log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " + - "leader session ID {} did not equal the received leader session ID {}.", - taskManagerId, taskManagerRpcAddress, - JobMaster.this.leaderSessionID, leaderId); - return new RegistrationResponse.Decline("Invalid leader session id"); - } - - slotPoolGateway.registerTaskManager(taskManagerId); - registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway)); - - // monitor the task manager as heartbeat target - taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget() { - @Override - public void receiveHeartbeat(ResourceID resourceID, Void payload) { - // the task manager will not request heartbeat, so this method will never be called currently + return getRpcService() + .connect(taskManagerRpcAddress, TaskExecutorGateway.class) --- End diff -- why is this no longer executed by the RpcService? > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110814#comment-16110814 ] ASF GitHub Bot commented on FLINK-7334: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4462 [FLINK-7334] [futures] Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService ## What is the purpose of the change Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService. Moreover, it removes the remaining usage of Flink futures in tests. ## Brief change log - Change RpcCompletenessTest to only accept `CompletableFutures` as return type - Change signature of `RpcGateways` - Adapt all `RpcEndpoint` implementations - Remove Flink Future usage from remaining tests This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink rfRpcGateway Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4462.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4462 commit 0e497367d8666a580d033848977b0639b037e554 Author: Till Rohrmann Date: 2017-08-01T09:33:48Z [FLINK-7334] [futures] Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService Remove Futures from RpcGateways Remove Future usage > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)