[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/4993 ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150890435 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -208,9 +239,15 @@ public void shutdown() { /** The established connection after the connect succeeds. */ private EstablishedConnection established; + /** Atomic shut down future. */ + private final AtomicReferenceconnectionShutdownFuture = new AtomicReference<>(null); + /** Closed flag. */ private boolean closed; +// /** Shut down future. */ --- End diff -- ? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150899000 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +472,31 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { - if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); + private CompletableFuture close(Throwable cause) { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); - for (long requestId : pendingRequests.keySet()) { - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { - stats.reportFailedRequest(); + if (connectionShutdownFuture.compareAndSet(null, shutdownFuture) && + failureCause.compareAndSet(null, cause)) { + + channel.close().addListener(finished -> { + stats.reportInactiveConnection(); + for (long requestId : pendingRequests.keySet()) { + TimestampedCompletableFuture pending = pendingRequests.remove(requestId); + if (pending != null && pending.completeExceptionally(cause)) { + stats.reportFailedRequest(); + } } - } - return true; + + if (finished.isSuccess()) { --- End diff -- This seems weird at first sight but I'm guessing it's correct. I.e. we never finish the returned Future with the `cause` that was handed in. We only fail it exceptionally if anything in closing the channel went wrong, right? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150893389 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java --- @@ -106,6 +108,12 @@ public InetSocketAddress getServerAddress() { @Override public void shutdown() { - super.shutdown(); + try { --- End diff -- Why is this not also returning a future? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150878782 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { --- End diff -- I agree, but we need the `timeout` in order to shut down the `executors`. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150856825 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -483,27 +511,31 @@ private boolean close(Throwable cause) { @Override public void onRequestResult(long requestId, RESP response) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { + if (pending != null && !pending.isDone()) { long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; stats.reportSuccessfulRequest(durationMillis); --- End diff -- This was creating test instabilities in the `AbstractServerTest.testPortRangeSuccess()`. This is why I decided to complete the future after increasing the counter. Given that we check if the `pending.isDone()`, we do not leave much room for false increases. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150243346 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; --- End diff -- We want to avoid pulling in the dependency of `runtime` here. Does it make sense to move the `Executors` to another module, e.g. `core`. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150232000 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { --- End diff -- Ideally we would get rid of the `timeout` parameter because that is something the user specifies when calling `get()` on the returned termination future. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150231729 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); --- End diff -- What if `awaitTermination` returns `false`? Then we should call `queryExecutor.shutdownNow` ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150230620 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { --- End diff -- This method when being called twice will give you a wrong result. The second call will immediately return you a completed future. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150224448 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); + } else { + groupShutdownFuture.complete(null); + } + } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + if (handler != null) { + handler.shutdown().thenRun(() -> { + handler = null; + handlerShutdownFuture.complete(null); + }); + } else { + handlerShutdownFuture.complete(null); } --- End diff -- Why do we execute the handler shutdown in the callback of the `queryExecShutdownFuture`? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225368 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ --- End diff -- JavaDocs need to be adapted ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228196 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +440,27 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { + private CompletableFuture close(Throwable cause) { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); + final CompletableFuture tmp = new CompletableFuture<>(); + channel.close().addListener(finished -> tmp.complete(null)); --- End diff -- result of channel future is ignored. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150221573 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor new DisabledKvStateRequestStats()); } - /** Shuts down the client. */ + /** +* Shuts down the client and waits for 10 seconds +* for the shutdown to be completed. +* +* If this expires, or an exception is thrown for +* any reason, then a warning is printed containing the +* exception. +*/ public void shutdown() { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + LOG.info("The Queryable State Client was shutdown successfully."); + } catch (Exception e) { + LOG.warn("The Queryable State Client shutdown failed: ", e); + } + } + + /** +* Shuts down the client and waits until shutdown is completed. +* +* If an exception is thrown for any reason, then this exception +* is further propagated upwards. +*/ + public void shutdownAndWait() throws Throwable { + try { + client.shutdown().join(); + } catch (CompletionException e) { + throw e.getCause(); --- End diff -- What if `getCause` is `null`? There is `ExceptionUtils.stripFromCompletionException`. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150229482 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -483,27 +511,31 @@ private boolean close(Throwable cause) { @Override public void onRequestResult(long requestId, RESP response) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { + if (pending != null && !pending.isDone()) { long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; stats.reportSuccessfulRequest(durationMillis); + pending.complete(response); } } @Override public void onRequestFailure(long requestId, Throwable cause) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { + if (pending != null && !pending.isDone()) { stats.reportFailedRequest(); + pending.completeExceptionally(cause); } } @Override public void onFailure(Throwable cause) { - if (close(cause)) { - // Remove from established channels, otherwise future - // requests will be handled by this failed channel. - establishedConnections.remove(serverAddress, this); - } + close(cause).thenAccept(cancelled -> { + if (cancelled) { --- End diff -- Why do we only remove `this` from `establischedConnections` if `cancelled` is true? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225343 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java --- @@ -185,7 +185,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E /** * Shuts down any handler specific resources, e.g. thread pools etc. */ --- End diff -- JavaDocs missing ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150224857 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); --- End diff -- Shouldn't we take the `finished` value to complete the `groupShutdownFuture`? There might have been an exception been thrown. We should forward this then as well. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228244 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +440,27 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { + private CompletableFuture close(Throwable cause) { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); + final CompletableFuture tmp = new CompletableFuture<>(); + channel.close().addListener(finished -> tmp.complete(null)); - for (long requestId : pendingRequests.keySet()) { - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { - stats.reportFailedRequest(); + tmp.thenRun(() -> { --- End diff -- here as well. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225411 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { --- End diff -- What does the boolean future value tell us about the `shutdown`? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228810 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -483,27 +511,31 @@ private boolean close(Throwable cause) { @Override public void onRequestResult(long requestId, RESP response) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { + if (pending != null && !pending.isDone()) { long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; stats.reportSuccessfulRequest(durationMillis); + pending.complete(response); } } @Override public void onRequestFailure(long requestId, Throwable cause) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { + if (pending != null && !pending.isDone()) { stats.reportFailedRequest(); + pending.completeExceptionally(cause); --- End diff -- Same here? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150223999 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) --- End diff -- why not setting the timeout here to `0`? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150221248 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor new DisabledKvStateRequestStats()); } - /** Shuts down the client. */ + /** +* Shuts down the client and waits for 10 seconds +* for the shutdown to be completed. +* +* If this expires, or an exception is thrown for +* any reason, then a warning is printed containing the +* exception. +*/ public void shutdown() { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); --- End diff -- Not sure whether we should hard code the timeout here. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225215 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); + } else { + groupShutdownFuture.complete(null); + } + } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + if (handler != null) { + handler.shutdown().thenRun(() -> { + handler = null; --- End diff -- I would actually refrain from state changes from an aysnchronous callback thread. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150223049 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; --- End diff -- Better to use `Executors.gracefulShutdown` for this. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150221458 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor new DisabledKvStateRequestStats()); } - /** Shuts down the client. */ + /** +* Shuts down the client and waits for 10 seconds +* for the shutdown to be completed. +* +* If this expires, or an exception is thrown for +* any reason, then a warning is printed containing the +* exception. +*/ public void shutdown() { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + LOG.info("The Queryable State Client was shutdown successfully."); + } catch (Exception e) { + LOG.warn("The Queryable State Client shutdown failed: ", e); + } + } + + /** +* Shuts down the client and waits until shutdown is completed. +* +* If an exception is thrown for any reason, then this exception +* is further propagated upwards. +*/ + public void shutdownAndWait() throws Throwable { --- End diff -- We should not throw `Throwable`. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150230125 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +440,27 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { + private CompletableFuture close(Throwable cause) { --- End diff -- I think this method can give you wrong results when being called twice. The second call will give you a completed future even though the first call could still be running. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150226027 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); + if (shutDown.compareAndSet(false, true)) { + final ListconnectionFutures = new ArrayList<>(); --- End diff -- could be initialized with the number of established + pending connections. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150224310 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ --- End diff -- JavaDocs should be updated. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150224362 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); --- End diff -- why do we complete a `boolean` future with `null`? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228785 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -483,27 +511,31 @@ private boolean close(Throwable cause) { @Override public void onRequestResult(long requestId, RESP response) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { + if (pending != null && !pending.isDone()) { long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; stats.reportSuccessfulRequest(durationMillis); --- End diff -- Shouldn't we only increase the `stats.reportSuccessfulRequest` if we could complete `pending`? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150226886 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); + if (shutDown.compareAndSet(false, true)) { + final ListconnectionFutures = new ArrayList<>(); for (Map.Entry conn : establishedConnections.entrySet()) { if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } for (Map.Entry conn : pendingConnections.entrySet()) { if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); - } - } + CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[connectionFutures.size()])).thenRun( --- End diff -- Also here we shouldn't ignore the result of the `allOf` future because it might contain an exception. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150227018 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -309,31 +323,38 @@ private void handInChannel(Channel channel) { /** * Close the connecting channel with a ClosedChannelException. */ - private void close() { - close(new ClosedChannelException()); + private CompletableFuture close() { + return close(new ClosedChannelException()); } /** * Close the connecting channel with an Exception (can be {@code null}) * or forward to the established channel. */ - private void close(Throwable cause) { + private CompletableFuture close(Throwable cause) { synchronized (connectLock) { - if (!closed) { + final CompletableFuture shutdownFuture; + if (closed) { + shutdownFuture = new CompletableFuture<>(); --- End diff -- shorter: `CompletableFuture.completedFuture(null)` ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150230379 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -309,31 +323,38 @@ private void handInChannel(Channel channel) { /** * Close the connecting channel with a ClosedChannelException. */ - private void close() { - close(new ClosedChannelException()); + private CompletableFuture close() { + return close(new ClosedChannelException()); } /** * Close the connecting channel with an Exception (can be {@code null}) * or forward to the established channel. */ - private void close(Throwable cause) { + private CompletableFuture close(Throwable cause) { --- End diff -- This method is blocking even though it shouldn't be. Imagine that two close calls happen concurrently. One of them should trigger the closing operation and the other should immediately return with the termination future. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150226661 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); + if (shutDown.compareAndSet(false, true)) { + final ListconnectionFutures = new ArrayList<>(); for (Map.Entry conn : establishedConnections.entrySet()) { if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } for (Map.Entry conn : pendingConnections.entrySet()) { if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); - } - } + CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[connectionFutures.size()])).thenRun( + () -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS) --- End diff -- why not setting the timeout to `0`? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225136 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); + } else { + groupShutdownFuture.complete(null); + } + } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + if (handler != null) { + handler.shutdown().thenRun(() -> { --- End diff -- same here, what if an exception happened while shutting `handler` down? Shouldn't we complete the `handlerShutdownFuture` with it? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228589 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +440,27 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { + private CompletableFuture close(Throwable cause) { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); + final CompletableFuture tmp = new CompletableFuture<>(); + channel.close().addListener(finished -> tmp.complete(null)); - for (long requestId : pendingRequests.keySet()) { - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { - stats.reportFailedRequest(); + tmp.thenRun(() -> { --- End diff -- Instead of creating the additional indirection with `shutdownFuture`, couldn't we simply return the result here as teh shutdown future? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228000 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -309,31 +323,38 @@ private void handInChannel(Channel channel) { /** * Close the connecting channel with a ClosedChannelException. */ - private void close() { - close(new ClosedChannelException()); + private CompletableFuture close() { + return close(new ClosedChannelException()); } /** * Close the connecting channel with an Exception (can be {@code null}) * or forward to the established channel. */ - private void close(Throwable cause) { + private CompletableFuture close(Throwable cause) { synchronized (connectLock) { - if (!closed) { + final CompletableFuture shutdownFuture; + if (closed) { + shutdownFuture = new CompletableFuture<>(); + shutdownFuture.complete(null); + } else { if (failureCause == null) { failureCause = cause; } if (established != null) { - established.close(); + shutdownFuture = established.close(); } else { PendingRequest pending; while ((pending = queuedRequests.poll()) != null) { pending.completeExceptionally(cause); } + shutdownFuture = new CompletableFuture<>(); + shutdownFuture.complete(null); } - closed = true; + shutdownFuture.thenRun(() -> closed = true); --- End diff -- Why do we set `closed` to true` only after the shutdown completed? Shouldn't `close` directly set it to true such that all preceding operations won't be executed anymore? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150221485 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor new DisabledKvStateRequestStats()); } - /** Shuts down the client. */ + /** +* Shuts down the client and waits for 10 seconds +* for the shutdown to be completed. +* +* If this expires, or an exception is thrown for +* any reason, then a warning is printed containing the +* exception. +*/ public void shutdown() { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + LOG.info("The Queryable State Client was shutdown successfully."); + } catch (Exception e) { + LOG.warn("The Queryable State Client shutdown failed: ", e); + } + } + + /** +* Shuts down the client and waits until shutdown is completed. +* +* If an exception is thrown for any reason, then this exception +* is further propagated upwards. +*/ + public void shutdownAndWait() throws Throwable { + try { + client.shutdown().join(); --- End diff -- `get()` is better here ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150222505 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -250,8 +252,8 @@ private boolean attemptToBind(final int port) throws Throwable { throw future.cause(); } catch (BindException e) { - LOG.debug("Failed to start server {} on port {}: {}.", serverName, port, e.getMessage()); - shutdown(); + log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage()); + shutdownServer(Time.seconds(10L)).join(); --- End diff -- I would use `get()` because it makes the thrown exceptions explicit. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150223433 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { --- End diff -- Is is strictly necessary to first shut down the query executor before shutting down the netty server? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150226636 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); + if (shutDown.compareAndSet(false, true)) { + final ListconnectionFutures = new ArrayList<>(); for (Map.Entry conn : establishedConnections.entrySet()) { if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } for (Map.Entry conn : pendingConnections.entrySet()) { if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); - } - } + CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[connectionFutures.size()])).thenRun( + () -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS) + .addListener(finished -> shutdownFuture.complete(true)); --- End diff -- result should be considered. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/4993 [FLINK-7974][FLINK-7975][QS] Wait for shutdown in QS client and servers. ## What is the purpose of the change Previously we were freeing the resources held by the QS client and servers on `shutdown()`, but we were not waiting for this to complete. Now we are waiting for everything to be freed. ## Brief change log The `Client` and the `AbstractServerBase` contain the biggest changes. The actual implementations (`KvStateClientProxyImpl`, `KvStateServerImpl`, and `QueryableStateClient`) build on that. ## Verifying this change Some tests were also adjusted but mainly the changes are tested in `AbstractServerTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): NO - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: NO - The serializers: NO - The runtime per-record code paths (performance sensitive): NO - Anything that affects deployment or recovery: NO - The S3 file system connector: NO ## Documentation - Does this pull request introduce a new feature? NO - If yes, how is the feature documented? not applicable R @aljoscha or @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink qs-shutdown-fin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4993.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4993 commit 46bced58030a76d65163369b58f9c02c814a4e08 Author: kkloudasDate: 2017-11-09T18:21:43Z [FLINK-7975][QS] Wait for QS client to shutdown. commit 62f80529f5657223d792576686ddad6832d13506 Author: kkloudas Date: 2017-11-09T18:30:29Z [FLINK-7974][QS] Wait for QS abstract server to shutdown. ---