[FLINK-8055][QS] Deduplicate logging messages about QS start.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e059e96 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e059e96 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e059e96 Branch: refs/heads/master Commit: 5e059e968633c4292734ebed209fa1b3c30529a1 Parents: 75c1454 Author: kkloudas <kklou...@gmail.com> Authored: Thu Nov 16 17:02:16 2017 +0100 Committer: kkloudas <kklou...@gmail.com> Committed: Fri Nov 17 10:46:09 2017 +0100 ---------------------------------------------------------------------- .../network/AbstractServerBase.java | 20 ++++++++++---------- .../flink/queryablestate/network/Client.java | 20 ++++++++++++++------ .../server/KvStateServerImpl.java | 5 ----- .../HAAbstractQueryableStateTestBase.java | 2 +- .../network/AbstractServerTest.java | 2 +- .../network/KvStateServerHandlerTest.java | 2 +- .../runtime/io/network/NetworkEnvironment.java | 2 -- 7 files changed, 27 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java index 07ca26d..82a05f2 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java @@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit; @Internal public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends MessageBody> { - private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class); + protected final Logger log = LoggerFactory.getLogger(getClass()); /** AbstractServerBase config: low water mark. */ private static final int LOW_WATER_MARK = 8 * 1024; @@ -180,16 +180,16 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M */ public void start() throws Throwable { Preconditions.checkState(serverAddress == null, - "The " + serverName + " already running @ " + serverAddress + '.'); + serverName + " is already running @ " + serverAddress + '.'); Iterator<Integer> portIterator = bindPortRange.iterator(); while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {} if (serverAddress != null) { - LOG.info("Started the {} @ {}.", serverName, serverAddress); + log.info("Started {} @ {}.", serverName, serverAddress); } else { - LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName); - throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied."); + log.info("Unable to start {}. All ports in provided range are occupied.", serverName); + throw new FlinkRuntimeException("Unable to start " + serverName + ". All ports in provided range are occupied."); } } @@ -203,7 +203,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M * @throws Exception If something goes wrong during the bind operation. */ private boolean attemptToBind(final int port) throws Throwable { - LOG.debug("Attempting to start server {} on port {}.", serverName, port); + log.debug("Attempting to start {} on port {}.", serverName, port); this.queryExecutor = createQueryExecutor(); this.handler = initializeHandler(); @@ -250,7 +250,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M throw future.cause(); } catch (BindException e) { - LOG.debug("Failed to start server {} on port {}: {}.", serverName, port, e.getMessage()); + log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage()); shutdown(); } // any other type of exception we let it bubble up. @@ -261,7 +261,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M * Shuts down the server and all related thread pools. */ public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); + log.info("Shutting down {} @ {}", serverName, serverAddress); if (handler != null) { handler.shutdown(); @@ -311,7 +311,7 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M } @VisibleForTesting - public boolean isExecutorShutdown() { - return queryExecutor.isShutdown(); + public boolean isEventGroupShutdown() { + return bootstrap == null || bootstrap.group().isTerminated(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java index e21145b..12286fa 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java @@ -19,6 +19,7 @@ package org.apache.flink.queryablestate.network; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.queryablestate.FutureUtils; import org.apache.flink.queryablestate.network.messages.MessageBody; import org.apache.flink.queryablestate.network.messages.MessageSerializer; @@ -282,12 +283,14 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> { while (!queuedRequests.isEmpty()) { final PendingRequest pending = queuedRequests.poll(); - established.sendRequest(pending.request) - .thenAccept(resp -> pending.complete(resp)) - .exceptionally(throwable -> { - pending.completeExceptionally(throwable); - return null; - }); + established.sendRequest(pending.request).whenComplete( + (response, throwable) -> { + if (throwable != null) { + pending.completeExceptionally(throwable); + } else { + pending.complete(response); + } + }); } // Publish the channel for the general public @@ -533,4 +536,9 @@ public class Client<REQ extends MessageBody, RESP extends MessageBody> { } } } + + @VisibleForTesting + public boolean isEventGroupShutdown() { + return bootstrap == null || bootstrap.group().isTerminated(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java index fe07687..3a37a3a 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java @@ -29,9 +29,6 @@ import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.KvStateServer; import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Iterator; @@ -42,8 +39,6 @@ import java.util.Iterator; @Internal public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer { - private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class); - /** The {@link KvStateRegistry} to query for state instances. */ private final KvStateRegistry kvStateRegistry; http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java index fc4b2bc..79809b3 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java @@ -65,7 +65,7 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); cluster = new TestingCluster(config, false); - cluster.start(); + cluster.start(true); client = new QueryableStateClient("localhost", proxyPortRangeStart); http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java index 2775cd4..3d2ed40 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java @@ -58,7 +58,7 @@ public class AbstractServerTest { // the expected exception along with the adequate message expectedEx.expect(FlinkRuntimeException.class); - expectedEx.expectMessage("Unable to start the Test Server 2. All ports in provided range are occupied."); + expectedEx.expectMessage("Unable to start Test Server 2. All ports in provided range are occupied."); TestServer server1 = null; TestServer server2 = null; http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java index 041544d..7b301ed 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java @@ -391,7 +391,7 @@ public class KvStateServerHandlerTest extends TestLogger { localTestServer.start(); localTestServer.shutdown(); - assertTrue(localTestServer.isExecutorShutdown()); + assertTrue(localTestServer.getQueryExecutor().isTerminated()); MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 4fffacd..71d0386 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -310,7 +310,6 @@ public class NetworkEnvironment { if (kvStateServer != null) { try { kvStateServer.start(); - LOG.info("Started the Queryable State Data Server @ {}", kvStateServer.getServerAddress()); } catch (Throwable ie) { kvStateServer.shutdown(); kvStateServer = null; @@ -321,7 +320,6 @@ public class NetworkEnvironment { if (kvStateProxy != null) { try { kvStateProxy.start(); - LOG.info("Started the Queryable State Client Proxy @ {}", kvStateProxy.getServerAddress()); } catch (Throwable ie) { kvStateProxy.shutdown(); kvStateProxy = null;