[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-12-06 Thread kl0u
Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/4993


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-14 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150890435
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -208,9 +239,15 @@ public void shutdown() {
/** The established connection after the connect succeeds. */
private EstablishedConnection established;
 
+   /** Atomic shut down future. */
+   private final AtomicReference 
connectionShutdownFuture = new AtomicReference<>(null);
+
/** Closed flag. */
private boolean closed;
 
+// /** Shut down future. */
--- End diff --

?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-14 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150899000
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -419,20 +472,31 @@ void close() {
 * @param cause The cause to close the channel with.
 * @return Channel close future
 */
-   private boolean close(Throwable cause) {
-   if (failureCause.compareAndSet(null, cause)) {
-   channel.close();
-   stats.reportInactiveConnection();
+   private CompletableFuture close(Throwable cause) {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
 
-   for (long requestId : pendingRequests.keySet()) 
{
-   TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && 
pending.completeExceptionally(cause)) {
-   stats.reportFailedRequest();
+   if (connectionShutdownFuture.compareAndSet(null, 
shutdownFuture) &&
+   failureCause.compareAndSet(null, 
cause)) {
+
+   channel.close().addListener(finished -> {
+   stats.reportInactiveConnection();
+   for (long requestId : 
pendingRequests.keySet()) {
+   TimestampedCompletableFuture 
pending = pendingRequests.remove(requestId);
+   if (pending != null && 
pending.completeExceptionally(cause)) {
+   
stats.reportFailedRequest();
+   }
}
-   }
-   return true;
+
+   if (finished.isSuccess()) {
--- End diff --

This seems weird at first sight but I'm guessing it's correct. I.e. we 
never finish the returned Future with the `cause` that was handed in. We only 
fail it exceptionally if anything in closing the channel went wrong, right? 


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-14 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150893389
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
 ---
@@ -106,6 +108,12 @@ public InetSocketAddress getServerAddress() {
 
@Override
public void shutdown() {
-   super.shutdown();
+   try {
--- End diff --

Why is this not also returning a future?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-14 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150878782
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
--- End diff --

I agree, but we need the `timeout` in order to shut down the `executors`.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-14 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150856825
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -483,27 +511,31 @@ private boolean close(Throwable cause) {
@Override
public void onRequestResult(long requestId, RESP response) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && pending.complete(response)) {
+   if (pending != null && !pending.isDone()) {
long durationMillis = (System.nanoTime() - 
pending.getTimestamp()) / 1_000_000L;
stats.reportSuccessfulRequest(durationMillis);
--- End diff --

This was creating test instabilities in the 
`AbstractServerTest.testPortRangeSuccess()`. This is why I decided to complete 
the future after increasing the counter. Given that we check if the 
`pending.isDone()`, we do not leave much room for false increases. 


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150243346
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
--- End diff --

We want to avoid pulling in the dependency of `runtime` here. Does it make 
sense to move the `Executors` to another module, e.g. `core`.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150232000
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
--- End diff --

Ideally we would get rid of the `timeout` parameter because that is 
something the user specifies when calling `get()` on the returned termination 
future.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150231729
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
--- End diff --

What if `awaitTermination` returns `false`? Then we should call 
`queryExecutor.shutdownNow`


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150230620
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
--- End diff --

This method when being called twice will give you a wrong result. The 
second call will immediately return you a completed future.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150224448
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
+   } else {
+   groupShutdownFuture.complete(null);
+   }
+   }
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
+   if (handler != null) {
+   handler.shutdown().thenRun(() -> {
+   handler = null;
+   handlerShutdownFuture.complete(null);
+   });
+   } else {
+   handlerShutdownFuture.complete(null);
}
--- End diff --

Why do we execute the handler shutdown in the callback of the 
`queryExecShutdownFuture`?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225368
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
--- End diff --

JavaDocs need to be adapted


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228196
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -419,20 +440,27 @@ void close() {
 * @param cause The cause to close the channel with.
 * @return Channel close future
 */
-   private boolean close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
if (failureCause.compareAndSet(null, cause)) {
-   channel.close();
-   stats.reportInactiveConnection();
+   final CompletableFuture tmp = new 
CompletableFuture<>();
+   channel.close().addListener(finished -> 
tmp.complete(null));
--- End diff --

result of channel future is ignored.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150221573
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
@@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress 
remoteAddress, final int remotePor
new DisabledKvStateRequestStats());
}
 
-   /** Shuts down the client. */
+   /**
+* Shuts down the client and waits for 10 seconds
+* for the shutdown to be completed.
+*
+* If this expires, or an exception is thrown for
+* any reason, then a warning is printed containing the
+* exception.
+*/
public void shutdown() {
-   client.shutdown();
+   try {
+   client.shutdown().get(10L, TimeUnit.SECONDS);
+   LOG.info("The Queryable State Client was shutdown 
successfully.");
+   } catch (Exception e) {
+   LOG.warn("The Queryable State Client shutdown failed: 
", e);
+   }
+   }
+
+   /**
+* Shuts down the client and waits until shutdown is completed.
+*
+* If an exception is thrown for any reason, then this exception
+* is further propagated upwards.
+*/
+   public void shutdownAndWait() throws Throwable {
+   try {
+   client.shutdown().join();
+   } catch (CompletionException e) {
+   throw e.getCause();
--- End diff --

What if `getCause` is `null`? There is 
`ExceptionUtils.stripFromCompletionException`.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150229482
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -483,27 +511,31 @@ private boolean close(Throwable cause) {
@Override
public void onRequestResult(long requestId, RESP response) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && pending.complete(response)) {
+   if (pending != null && !pending.isDone()) {
long durationMillis = (System.nanoTime() - 
pending.getTimestamp()) / 1_000_000L;
stats.reportSuccessfulRequest(durationMillis);
+   pending.complete(response);
}
}
 
@Override
public void onRequestFailure(long requestId, Throwable cause) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && 
pending.completeExceptionally(cause)) {
+   if (pending != null && !pending.isDone()) {
stats.reportFailedRequest();
+   pending.completeExceptionally(cause);
}
}
 
@Override
public void onFailure(Throwable cause) {
-   if (close(cause)) {
-   // Remove from established channels, otherwise 
future
-   // requests will be handled by this failed 
channel.
-   establishedConnections.remove(serverAddress, 
this);
-   }
+   close(cause).thenAccept(cancelled -> {
+   if (cancelled) {
--- End diff --

Why do we only remove `this` from `establischedConnections` if `cancelled` 
is true?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225343
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
 ---
@@ -185,7 +185,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, 
Throwable cause) throws E
/**
 * Shuts down any handler specific resources, e.g. thread pools etc.
 */
--- End diff --

JavaDocs missing


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150224857
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
--- End diff --

Shouldn't we take the `finished` value to complete the 
`groupShutdownFuture`? There might have been an exception been thrown. We 
should forward this then as well.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228244
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -419,20 +440,27 @@ void close() {
 * @param cause The cause to close the channel with.
 * @return Channel close future
 */
-   private boolean close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
if (failureCause.compareAndSet(null, cause)) {
-   channel.close();
-   stats.reportInactiveConnection();
+   final CompletableFuture tmp = new 
CompletableFuture<>();
+   channel.close().addListener(finished -> 
tmp.complete(null));
 
-   for (long requestId : pendingRequests.keySet()) 
{
-   TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && 
pending.completeExceptionally(cause)) {
-   stats.reportFailedRequest();
+   tmp.thenRun(() -> {
--- End diff --

here as well.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225411
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
--- End diff --

What does the boolean future value tell us about the `shutdown`?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228810
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -483,27 +511,31 @@ private boolean close(Throwable cause) {
@Override
public void onRequestResult(long requestId, RESP response) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && pending.complete(response)) {
+   if (pending != null && !pending.isDone()) {
long durationMillis = (System.nanoTime() - 
pending.getTimestamp()) / 1_000_000L;
stats.reportSuccessfulRequest(durationMillis);
+   pending.complete(response);
}
}
 
@Override
public void onRequestFailure(long requestId, Throwable cause) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && 
pending.completeExceptionally(cause)) {
+   if (pending != null && !pending.isDone()) {
stats.reportFailedRequest();
+   pending.completeExceptionally(cause);
--- End diff --

Same here?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150223999
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
--- End diff --

why not setting the timeout here to `0`?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150221248
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
@@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress 
remoteAddress, final int remotePor
new DisabledKvStateRequestStats());
}
 
-   /** Shuts down the client. */
+   /**
+* Shuts down the client and waits for 10 seconds
+* for the shutdown to be completed.
+*
+* If this expires, or an exception is thrown for
+* any reason, then a warning is printed containing the
+* exception.
+*/
public void shutdown() {
-   client.shutdown();
+   try {
+   client.shutdown().get(10L, TimeUnit.SECONDS);
--- End diff --

Not sure whether we should hard code the timeout here.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225215
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
+   } else {
+   groupShutdownFuture.complete(null);
+   }
+   }
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
+   if (handler != null) {
+   handler.shutdown().thenRun(() -> {
+   handler = null;
--- End diff --

I would actually refrain from state changes from an aysnchronous callback 
thread.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150223049
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
--- End diff --

Better to use `Executors.gracefulShutdown` for this.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150221458
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
@@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress 
remoteAddress, final int remotePor
new DisabledKvStateRequestStats());
}
 
-   /** Shuts down the client. */
+   /**
+* Shuts down the client and waits for 10 seconds
+* for the shutdown to be completed.
+*
+* If this expires, or an exception is thrown for
+* any reason, then a warning is printed containing the
+* exception.
+*/
public void shutdown() {
-   client.shutdown();
+   try {
+   client.shutdown().get(10L, TimeUnit.SECONDS);
+   LOG.info("The Queryable State Client was shutdown 
successfully.");
+   } catch (Exception e) {
+   LOG.warn("The Queryable State Client shutdown failed: 
", e);
+   }
+   }
+
+   /**
+* Shuts down the client and waits until shutdown is completed.
+*
+* If an exception is thrown for any reason, then this exception
+* is further propagated upwards.
+*/
+   public void shutdownAndWait() throws Throwable {
--- End diff --

We should not throw `Throwable`.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150230125
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -419,20 +440,27 @@ void close() {
 * @param cause The cause to close the channel with.
 * @return Channel close future
 */
-   private boolean close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
--- End diff --

I think this method can give you wrong results when being called twice. The 
second call will give you a completed future even though the first call could 
still be running.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150226027
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
+
if (shutDown.compareAndSet(false, true)) {
+   final List connectionFutures = 
new ArrayList<>();
--- End diff --

could be initialized with the number of established + pending connections.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150224310
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
--- End diff --

JavaDocs should be updated.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150224362
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
--- End diff --

why do we complete a `boolean` future with `null`?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228785
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -483,27 +511,31 @@ private boolean close(Throwable cause) {
@Override
public void onRequestResult(long requestId, RESP response) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && pending.complete(response)) {
+   if (pending != null && !pending.isDone()) {
long durationMillis = (System.nanoTime() - 
pending.getTimestamp()) / 1_000_000L;
stats.reportSuccessfulRequest(durationMillis);
--- End diff --

Shouldn't we only increase the `stats.reportSuccessfulRequest` if we could 
complete `pending`?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150226886
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
+
if (shutDown.compareAndSet(false, true)) {
+   final List connectionFutures = 
new ArrayList<>();
for (Map.Entry conn : establishedConnections.entrySet()) {
if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
for (Map.Entry 
conn : pendingConnections.entrySet()) {
if (pendingConnections.remove(conn.getKey()) != 
null) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
-   }
-   }
+   CompletableFuture.allOf(connectionFutures.toArray(new 
CompletableFuture[connectionFutures.size()])).thenRun(
--- End diff --

Also here we shouldn't ignore the result of the `allOf` future because it 
might contain an exception.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150227018
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -309,31 +323,38 @@ private void handInChannel(Channel channel) {
/**
 * Close the connecting channel with a ClosedChannelException.
 */
-   private void close() {
-   close(new ClosedChannelException());
+   private CompletableFuture close() {
+   return close(new ClosedChannelException());
}
 
/**
 * Close the connecting channel with an Exception (can be 
{@code null})
 * or forward to the established channel.
 */
-   private void close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
synchronized (connectLock) {
-   if (!closed) {
+   final CompletableFuture shutdownFuture;
+   if (closed) {
+   shutdownFuture = new 
CompletableFuture<>();
--- End diff --

shorter: `CompletableFuture.completedFuture(null)`


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150230379
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -309,31 +323,38 @@ private void handInChannel(Channel channel) {
/**
 * Close the connecting channel with a ClosedChannelException.
 */
-   private void close() {
-   close(new ClosedChannelException());
+   private CompletableFuture close() {
+   return close(new ClosedChannelException());
}
 
/**
 * Close the connecting channel with an Exception (can be 
{@code null})
 * or forward to the established channel.
 */
-   private void close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
--- End diff --

This method is blocking even though it shouldn't be. Imagine that two close 
calls happen concurrently. One of them should trigger the closing operation and 
the other should immediately return with the termination future.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150226661
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
+
if (shutDown.compareAndSet(false, true)) {
+   final List connectionFutures = 
new ArrayList<>();
for (Map.Entry conn : establishedConnections.entrySet()) {
if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
for (Map.Entry 
conn : pendingConnections.entrySet()) {
if (pendingConnections.remove(conn.getKey()) != 
null) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
-   }
-   }
+   CompletableFuture.allOf(connectionFutures.toArray(new 
CompletableFuture[connectionFutures.size()])).thenRun(
+   () -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = 
bootstrap.group();
+   if (group != null) {
+   
group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS)
--- End diff --

why not setting the timeout to `0`?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225136
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
+   } else {
+   groupShutdownFuture.complete(null);
+   }
+   }
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
+   if (handler != null) {
+   handler.shutdown().thenRun(() -> {
--- End diff --

same here, what if an exception happened while shutting `handler` down? 
Shouldn't we complete the `handlerShutdownFuture` with it?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228589
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -419,20 +440,27 @@ void close() {
 * @param cause The cause to close the channel with.
 * @return Channel close future
 */
-   private boolean close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
if (failureCause.compareAndSet(null, cause)) {
-   channel.close();
-   stats.reportInactiveConnection();
+   final CompletableFuture tmp = new 
CompletableFuture<>();
+   channel.close().addListener(finished -> 
tmp.complete(null));
 
-   for (long requestId : pendingRequests.keySet()) 
{
-   TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && 
pending.completeExceptionally(cause)) {
-   stats.reportFailedRequest();
+   tmp.thenRun(() -> {
--- End diff --

Instead of creating the additional indirection with `shutdownFuture`, 
couldn't we simply return the result here as teh shutdown future?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228000
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -309,31 +323,38 @@ private void handInChannel(Channel channel) {
/**
 * Close the connecting channel with a ClosedChannelException.
 */
-   private void close() {
-   close(new ClosedChannelException());
+   private CompletableFuture close() {
+   return close(new ClosedChannelException());
}
 
/**
 * Close the connecting channel with an Exception (can be 
{@code null})
 * or forward to the established channel.
 */
-   private void close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
synchronized (connectLock) {
-   if (!closed) {
+   final CompletableFuture shutdownFuture;
+   if (closed) {
+   shutdownFuture = new 
CompletableFuture<>();
+   shutdownFuture.complete(null);
+   } else {
if (failureCause == null) {
failureCause = cause;
}
 
if (established != null) {
-   established.close();
+   shutdownFuture = 
established.close();
} else {
PendingRequest pending;
while ((pending = 
queuedRequests.poll()) != null) {

pending.completeExceptionally(cause);
}
+   shutdownFuture = new 
CompletableFuture<>();
+   shutdownFuture.complete(null);
}
-   closed = true;
+   shutdownFuture.thenRun(() -> closed = 
true);
--- End diff --

Why do we set `closed` to true` only after the shutdown completed? 
Shouldn't `close` directly set it to true such that all preceding operations 
won't be executed anymore?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150221485
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
@@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress 
remoteAddress, final int remotePor
new DisabledKvStateRequestStats());
}
 
-   /** Shuts down the client. */
+   /**
+* Shuts down the client and waits for 10 seconds
+* for the shutdown to be completed.
+*
+* If this expires, or an exception is thrown for
+* any reason, then a warning is printed containing the
+* exception.
+*/
public void shutdown() {
-   client.shutdown();
+   try {
+   client.shutdown().get(10L, TimeUnit.SECONDS);
+   LOG.info("The Queryable State Client was shutdown 
successfully.");
+   } catch (Exception e) {
+   LOG.warn("The Queryable State Client shutdown failed: 
", e);
+   }
+   }
+
+   /**
+* Shuts down the client and waits until shutdown is completed.
+*
+* If an exception is thrown for any reason, then this exception
+* is further propagated upwards.
+*/
+   public void shutdownAndWait() throws Throwable {
+   try {
+   client.shutdown().join();
--- End diff --

`get()` is better here


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150222505
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -250,8 +252,8 @@ private boolean attemptToBind(final int port) throws 
Throwable {
 
throw future.cause();
} catch (BindException e) {
-   LOG.debug("Failed to start server {} on port {}: {}.", 
serverName, port, e.getMessage());
-   shutdown();
+   log.debug("Failed to start {} on port {}: {}.", 
serverName, port, e.getMessage());
+   shutdownServer(Time.seconds(10L)).join();
--- End diff --

I would use `get()` because it makes the thrown exceptions explicit.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150223433
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
--- End diff --

Is is strictly necessary to first shut down the query executor before 
shutting down the netty server?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150226636
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
+
if (shutDown.compareAndSet(false, true)) {
+   final List connectionFutures = 
new ArrayList<>();
for (Map.Entry conn : establishedConnections.entrySet()) {
if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
for (Map.Entry 
conn : pendingConnections.entrySet()) {
if (pendingConnections.remove(conn.getKey()) != 
null) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
-   }
-   }
+   CompletableFuture.allOf(connectionFutures.toArray(new 
CompletableFuture[connectionFutures.size()])).thenRun(
+   () -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = 
bootstrap.group();
+   if (group != null) {
+   
group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS)
+   
.addListener(finished -> shutdownFuture.complete(true));
--- End diff --

result should be considered.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/4993

[FLINK-7974][FLINK-7975][QS] Wait for shutdown in QS client and servers.

## What is the purpose of the change

Previously we were freeing the resources held by the QS client and servers 
on `shutdown()`, but we were not waiting for this to complete. Now we are 
waiting for everything to be freed.

## Brief change log

The `Client` and the `AbstractServerBase` contain the biggest changes. The 
actual implementations (`KvStateClientProxyImpl`, `KvStateServerImpl`, and 
`QueryableStateClient`) build on that.

## Verifying this change

Some tests were also adjusted but mainly the changes are tested in 
`AbstractServerTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): NO
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: NO
  - The serializers: NO
  - The runtime per-record code paths (performance sensitive): NO
  - Anything that affects deployment or recovery: NO
  - The S3 file system connector: NO

## Documentation

  - Does this pull request introduce a new feature? NO
  - If yes, how is the feature documented? not applicable

R @aljoscha or @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink qs-shutdown-fin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4993.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4993


commit 46bced58030a76d65163369b58f9c02c814a4e08
Author: kkloudas 
Date:   2017-11-09T18:21:43Z

[FLINK-7975][QS] Wait for QS client to shutdown.

commit 62f80529f5657223d792576686ddad6832d13506
Author: kkloudas 
Date:   2017-11-09T18:30:29Z

[FLINK-7974][QS] Wait for QS abstract server to shutdown.




---