flink git commit: [FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown.
Repository: flink Updated Branches: refs/heads/master 81dc260dc -> a0838de79 [FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0838de7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0838de7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0838de7 Branch: refs/heads/master Commit: a0838de79ff73b0322f3ce255df54f5f33b2bf3b Parents: 81dc260 Author: kkloudas Authored: Tue Nov 14 15:05:45 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 10:29:30 2017 +0100 -- .../network/AbstractServerHandler.java | 2 +- .../client/proxy/KvStateClientProxyHandler.java | 11 +- .../itcases/AbstractQueryableStateTestBase.java | 230 --- 3 files changed, 150 insertions(+), 93 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java -- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java index 9e02291..7e71a11 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java @@ -262,7 +262,7 @@ public abstract class AbstractServerHandlerhttp://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java -- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java index 73ef7f3..af33701 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.query.KvStateClientProxy; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateMessage; -import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.util.Preconditions; @@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory; import java.net.ConnectException; import java.net.InetSocketAddress; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -133,12 +131,11 @@ public class KvStateClientProxyHandler extends AbstractServerHandler { if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || + if ( + throwable.getCause() instanceof UnknownKvStateIdException || throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException || - throwable.getCause() instanceof UnknownKvStateLocation || - throwable.getCause() instanceof ConnectException) { + throwable.getCause() instanceof ConnectException + ) { // These failures are likely
[3/5] flink git commit: [FLINK-8065][QS] Improve error message when client already shut down.
[FLINK-8065][QS] Improve error message when client already shut down. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75c14541 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75c14541 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75c14541 Branch: refs/heads/master Commit: 75c14541fdc52d5446b179e8e660b8a4fd90310c Parents: ff7e3cf Author: kkloudas Authored: Wed Nov 15 15:38:36 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 10:46:09 2017 +0100 -- .../main/java/org/apache/flink/queryablestate/network/Client.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/75c14541/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java -- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java index 13d34fb..e21145b 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java @@ -133,7 +133,7 @@ public class Client { public CompletableFuture sendRequest(final InetSocketAddress serverAddress, final REQ request) { if (shutDown.get()) { - return FutureUtils.getFailedFuture(new IllegalStateException("Shut down")); + return FutureUtils.getFailedFuture(new IllegalStateException(clientName + " is already shut down.")); } EstablishedConnection connection = establishedConnections.get(serverAddress);
[4/5] flink git commit: [FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds
[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fe078f3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fe078f3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fe078f3 Branch: refs/heads/master Commit: 2fe078f3927595cbc3c5de6635a710494e0f34b4 Parents: 5e059e9 Author: kkloudas Authored: Thu Nov 16 17:45:49 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 10:46:10 2017 +0100 -- .../itcases/AbstractQueryableStateTestBase.java | 32 +++- .../flink/runtime/jobmanager/JobManager.scala | 4 +-- .../runtime/jobmanager/JobManagerTest.java | 5 +-- 3 files changed, 29 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2fe078f3/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java -- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index a789dbd..65e9bb5 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -276,10 +276,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { /** * Tests that duplicate query registrations fail the job at the JobManager. -* -* NOTE: This test is only in the non-HA variant of the tests because -* in the HA mode we use the actual JM code which does not recognize the -* {@code NotifyWhenJobStatus} message. */ @Test public void testDuplicateRegistrationFailsJob() throws Exception { @@ -435,10 +431,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { /** * Tests that the correct exception is thrown if the query -* contains a wrong queryable state name. +* contains a wrong jobId or wrong queryable state name. */ @Test - public void testWrongQueryableStateName() throws Exception { + public void testWrongJobIdAndWrongQueryableStateName() throws Exception { // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); @@ -486,7 +482,27 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); assertEquals(JobStatus.RUNNING, jobStatus.state()); - CompletableFuture>> future = client.getKvState( + final JobID wrongJobId = new JobID(); + + CompletableFuture>> unknownJobFuture = client.getKvState( + wrongJobId, // this is the wrong job id + "hankuna", + 0, + BasicTypeInfo.INT_TYPE_INFO, + valueState); + + try { + unknownJobFuture.get(); + fail(); // by now the job must have failed. + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof RuntimeException); + Assert.assertTrue(e.getCause().getMessage().contains( + "FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")")); + } catch (Exception ignored) { + fail("Unexpected type of exception."); + } + + CompletableFuture>> unknownQSName = client.getKvState( jobId, "wrong-hankuna", // this is the wrong name. 0, @@ -494,7 +510,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { valueState); try { - future.get(); + unknownQSName.g
[5/5] flink git commit: [FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState()
[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState() Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4d86975 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4d86975 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4d86975 Branch: refs/heads/master Commit: a4d86975967942054d1bd466641e9c835fb014ac Parents: 2fe078f Author: kkloudas Authored: Fri Nov 17 09:26:10 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 10:46:11 2017 +0100 -- .../flink/runtime/query/KvStateRegistry.java| 23 ++-- 1 file changed, 7 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a4d86975/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java index af19d81..ed1f92e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java @@ -45,7 +45,7 @@ public class KvStateRegistry { new ConcurrentHashMap<>(); /** Registry listener to be notified on registration/unregistration. */ - private final AtomicReference listener = new AtomicReference<>(); + private final AtomicReference listenerRef = new AtomicReference<>(); /** * Registers a listener with the registry. @@ -54,7 +54,7 @@ public class KvStateRegistry { * @throws IllegalStateException If there is a registered listener */ public void registerListener(KvStateRegistryListener listener) { - if (!this.listener.compareAndSet(null, listener)) { + if (!listenerRef.compareAndSet(null, listener)) { throw new IllegalStateException("Listener already registered."); } } @@ -63,20 +63,10 @@ public class KvStateRegistry { * Unregisters the listener with the registry. */ public void unregisterListener() { - listener.set(null); + listenerRef.set(null); } /** -* Registers the KvState instance identified by the given 4-tuple of JobID, -* JobVertexID, key group index, and registration name. -* -* @param kvStateId KvStateID to identify the KvState instance -* @param kvState KvState instance to register -* @throws IllegalStateException If there is a KvState instance registered -* with the same ID. -*/ - - /** * Registers the KvState instance and returns the assigned ID. * * @param jobIdJobId the KvState instance belongs to @@ -96,7 +86,7 @@ public class KvStateRegistry { KvStateID kvStateId = new KvStateID(); if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) { - KvStateRegistryListener listener = this.listener.get(); + final KvStateRegistryListener listener = listenerRef.get(); if (listener != null) { listener.notifyKvStateRegistered( jobId, @@ -108,7 +98,8 @@ public class KvStateRegistry { return kvStateId; } else { - throw new IllegalStateException(kvStateId + " is already registered."); + throw new IllegalStateException( + "State \"" + registrationName + " \"(id=" + kvStateId + ") appears registered although it should not."); } } @@ -127,7 +118,7 @@ public class KvStateRegistry { KvStateID kvStateId) { if (registeredKvStates.remove(kvStateId) != null) { - KvStateRegistryListener listener = this.listener.get(); + final KvStateRegistryListener listener = listenerRef.get(); if (listener != null) { listener.notifyKvStateUnregistered( jobId,
[1/5] flink git commit: [FLINK-8062][QS] Make getKvState() with namespace private.
Repository: flink Updated Branches: refs/heads/master a0838de79 -> a4d869759 [FLINK-8062][QS] Make getKvState() with namespace private. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff7e3cf6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff7e3cf6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff7e3cf6 Branch: refs/heads/master Commit: ff7e3cf6749a6b6bc898fde871c36661c8629c23 Parents: a0838de Author: kkloudas Authored: Wed Nov 15 15:32:42 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 10:46:08 2017 +0100 -- .../flink/queryablestate/client/QueryableStateClient.java | 3 +-- .../itcases/AbstractQueryableStateTestBase.java | 7 +-- 2 files changed, 2 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ff7e3cf6/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java -- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java index 304505a..03e02e1 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java @@ -186,8 +186,7 @@ public class QueryableStateClient { * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. * @return Future holding the immutable {@link State} object containing the result. */ - @PublicEvolving - public CompletableFuture getKvState( + private CompletableFuture getKvState( final JobID jobId, final String queryableStateName, final K key, http://git-wip-us.apache.org/repos/asf/flink/blob/ff7e3cf6/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java -- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index c1cbb61..a789dbd 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -47,7 +47,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.queryablestate.client.VoidNamespace; import org.apache.flink.queryablestate.client.VoidNamespaceSerializer; -import org.apache.flink.queryablestate.client.VoidNamespaceTypeInfo; import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -491,9 +490,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { jobId, "wrong-hankuna", // this is the wrong name. 0, - VoidNamespace.INSTANCE, BasicTypeInfo.INT_TYPE_INFO, - VoidNamespaceTypeInfo.INSTANCE, valueState); try { @@ -572,9 +569,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { jobId, queryableState.getQueryableStateName(), 0, - VoidNamespace.INSTANCE, BasicTypeInfo.INT_TYPE_INFO, - VoidNamespaceTypeInfo.INSTANCE, valueState); cluster.submitJobDetached(jobGraph); @@ -1486,7 +1481,7 @@ public abstract class Abst
[2/5] flink git commit: [FLINK-8055][QS] Deduplicate logging messages about QS start.
[FLINK-8055][QS] Deduplicate logging messages about QS start. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e059e96 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e059e96 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e059e96 Branch: refs/heads/master Commit: 5e059e968633c4292734ebed209fa1b3c30529a1 Parents: 75c1454 Author: kkloudas Authored: Thu Nov 16 17:02:16 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 10:46:09 2017 +0100 -- .../network/AbstractServerBase.java | 20 ++-- .../flink/queryablestate/network/Client.java| 20 ++-- .../server/KvStateServerImpl.java | 5 - .../HAAbstractQueryableStateTestBase.java | 2 +- .../network/AbstractServerTest.java | 2 +- .../network/KvStateServerHandlerTest.java | 2 +- .../runtime/io/network/NetworkEnvironment.java | 2 -- 7 files changed, 27 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java -- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java index 07ca26d..82a05f2 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java @@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit; @Internal public abstract class AbstractServerBase { - private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class); + protected final Logger log = LoggerFactory.getLogger(getClass()); /** AbstractServerBase config: low water mark. */ private static final int LOW_WATER_MARK = 8 * 1024; @@ -180,16 +180,16 @@ public abstract class AbstractServerBase portIterator = bindPortRange.iterator(); while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {} if (serverAddress != null) { - LOG.info("Started the {} @ {}.", serverName, serverAddress); + log.info("Started {} @ {}.", serverName, serverAddress); } else { - LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName); - throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied."); + log.info("Unable to start {}. All ports in provided range are occupied.", serverName); + throw new FlinkRuntimeException("Unable to start " + serverName + ". All ports in provided range are occupied."); } } @@ -203,7 +203,7 @@ public abstract class AbstractServerBasehttp://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java -- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java index e21145b..12286fa 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java @@ -19,6 +19,7 @@ package org.apache.flink.queryablestate.network; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.queryablestate.FutureUtils; import org.apache.flink.queryablestate.network.messages.MessageBody; import org.apache.flink.queryablestate.network.messages.MessageSerializer; @@ -282,12 +283,14 @@ public class Client { while (!queuedRequests.isEmpty()) { final PendingRequest pending = queuedRequests.poll(); - established.sendRequest(pending.request) -
[2/6] flink git commit: [FLINK-8062][QS] Make getKvState() with namespace private.
[FLINK-8062][QS] Make getKvState() with namespace private. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/96b350ad Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/96b350ad Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/96b350ad Branch: refs/heads/release-1.4 Commit: 96b350ad91a1f248d0a3c616d1e59638013892be Parents: d0324e3 Author: kkloudas Authored: Wed Nov 15 15:32:42 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 11:18:47 2017 +0100 -- .../flink/queryablestate/client/QueryableStateClient.java | 3 +-- .../itcases/AbstractQueryableStateTestBase.java | 7 +-- 2 files changed, 2 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/96b350ad/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java -- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java index 304505a..03e02e1 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java @@ -186,8 +186,7 @@ public class QueryableStateClient { * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. * @return Future holding the immutable {@link State} object containing the result. */ - @PublicEvolving - public CompletableFuture getKvState( + private CompletableFuture getKvState( final JobID jobId, final String queryableStateName, final K key, http://git-wip-us.apache.org/repos/asf/flink/blob/96b350ad/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java -- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index c1cbb61..a789dbd 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -47,7 +47,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.queryablestate.client.VoidNamespace; import org.apache.flink.queryablestate.client.VoidNamespaceSerializer; -import org.apache.flink.queryablestate.client.VoidNamespaceTypeInfo; import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -491,9 +490,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { jobId, "wrong-hankuna", // this is the wrong name. 0, - VoidNamespace.INSTANCE, BasicTypeInfo.INT_TYPE_INFO, - VoidNamespaceTypeInfo.INSTANCE, valueState); try { @@ -572,9 +569,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { jobId, queryableState.getQueryableStateName(), 0, - VoidNamespace.INSTANCE, BasicTypeInfo.INT_TYPE_INFO, - VoidNamespaceTypeInfo.INSTANCE, valueState); cluster.submitJobDetached(jobGraph); @@ -1486,7 +1481,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { if (!resu
[3/6] flink git commit: [FLINK-8065][QS] Improve error message when client already shut down.
[FLINK-8065][QS] Improve error message when client already shut down. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6314e486 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6314e486 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6314e486 Branch: refs/heads/release-1.4 Commit: 6314e4861df3100e8edd666f00e062c128f6e09f Parents: 96b350a Author: kkloudas Authored: Wed Nov 15 15:38:36 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 11:19:08 2017 +0100 -- .../main/java/org/apache/flink/queryablestate/network/Client.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6314e486/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java -- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java index 13d34fb..e21145b 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java @@ -133,7 +133,7 @@ public class Client { public CompletableFuture sendRequest(final InetSocketAddress serverAddress, final REQ request) { if (shutDown.get()) { - return FutureUtils.getFailedFuture(new IllegalStateException("Shut down")); + return FutureUtils.getFailedFuture(new IllegalStateException(clientName + " is already shut down.")); } EstablishedConnection connection = establishedConnections.get(serverAddress);
[5/6] flink git commit: [FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds.
[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a68d752 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a68d752 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a68d752 Branch: refs/heads/release-1.4 Commit: 1a68d7527932b12bd2cb392c7c7781023756bf0c Parents: 12b0c58 Author: kkloudas Authored: Thu Nov 16 17:45:49 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 11:20:55 2017 +0100 -- .../itcases/AbstractQueryableStateTestBase.java | 32 +++- .../flink/runtime/jobmanager/JobManager.scala | 4 +-- .../runtime/jobmanager/JobManagerTest.java | 5 +-- 3 files changed, 29 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java -- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index a789dbd..65e9bb5 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -276,10 +276,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { /** * Tests that duplicate query registrations fail the job at the JobManager. -* -* NOTE: This test is only in the non-HA variant of the tests because -* in the HA mode we use the actual JM code which does not recognize the -* {@code NotifyWhenJobStatus} message. */ @Test public void testDuplicateRegistrationFailsJob() throws Exception { @@ -435,10 +431,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { /** * Tests that the correct exception is thrown if the query -* contains a wrong queryable state name. +* contains a wrong jobId or wrong queryable state name. */ @Test - public void testWrongQueryableStateName() throws Exception { + public void testWrongJobIdAndWrongQueryableStateName() throws Exception { // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); @@ -486,7 +482,27 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); assertEquals(JobStatus.RUNNING, jobStatus.state()); - CompletableFuture>> future = client.getKvState( + final JobID wrongJobId = new JobID(); + + CompletableFuture>> unknownJobFuture = client.getKvState( + wrongJobId, // this is the wrong job id + "hankuna", + 0, + BasicTypeInfo.INT_TYPE_INFO, + valueState); + + try { + unknownJobFuture.get(); + fail(); // by now the job must have failed. + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof RuntimeException); + Assert.assertTrue(e.getCause().getMessage().contains( + "FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")")); + } catch (Exception ignored) { + fail("Unexpected type of exception."); + } + + CompletableFuture>> unknownQSName = client.getKvState( jobId, "wrong-hankuna", // this is the wrong name. 0, @@ -494,7 +510,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { valueState); try { - future.get(); + unknownQS
[1/6] flink git commit: [FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown.
Repository: flink Updated Branches: refs/heads/release-1.4 42e24413b -> 3753ae251 [FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0324e34 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0324e34 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0324e34 Branch: refs/heads/release-1.4 Commit: d0324e34a06e7374179d1627a4a3653d07f1c614 Parents: 42e2441 Author: kkloudas Authored: Tue Nov 14 15:05:45 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 10:37:18 2017 +0100 -- .../network/AbstractServerHandler.java | 2 +- .../client/proxy/KvStateClientProxyHandler.java | 11 +- .../itcases/AbstractQueryableStateTestBase.java | 230 --- 3 files changed, 150 insertions(+), 93 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java -- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java index 9e02291..7e71a11 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java @@ -262,7 +262,7 @@ public abstract class AbstractServerHandlerhttp://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java -- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java index 73ef7f3..af33701 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.query.KvStateClientProxy; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateMessage; -import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.util.Preconditions; @@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory; import java.net.ConnectException; import java.net.InetSocketAddress; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -133,12 +131,11 @@ public class KvStateClientProxyHandler extends AbstractServerHandler { if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || + if ( + throwable.getCause() instanceof UnknownKvStateIdException || throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException || - throwable.getCause() instanceof UnknownKvStateLocation || - throwable.getCause() instanceof ConnectException) { + throwable.getCause() instanceof ConnectException + ) { // These failures a
[6/6] flink git commit: [FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState().
[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState(). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3753ae25 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3753ae25 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3753ae25 Branch: refs/heads/release-1.4 Commit: 3753ae2517fbc940c05ea54e3eb0a960fecdf879 Parents: 1a68d75 Author: kkloudas Authored: Fri Nov 17 09:26:10 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 11:21:18 2017 +0100 -- .../flink/runtime/query/KvStateRegistry.java| 23 ++-- 1 file changed, 7 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3753ae25/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java index af19d81..ed1f92e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java @@ -45,7 +45,7 @@ public class KvStateRegistry { new ConcurrentHashMap<>(); /** Registry listener to be notified on registration/unregistration. */ - private final AtomicReference listener = new AtomicReference<>(); + private final AtomicReference listenerRef = new AtomicReference<>(); /** * Registers a listener with the registry. @@ -54,7 +54,7 @@ public class KvStateRegistry { * @throws IllegalStateException If there is a registered listener */ public void registerListener(KvStateRegistryListener listener) { - if (!this.listener.compareAndSet(null, listener)) { + if (!listenerRef.compareAndSet(null, listener)) { throw new IllegalStateException("Listener already registered."); } } @@ -63,20 +63,10 @@ public class KvStateRegistry { * Unregisters the listener with the registry. */ public void unregisterListener() { - listener.set(null); + listenerRef.set(null); } /** -* Registers the KvState instance identified by the given 4-tuple of JobID, -* JobVertexID, key group index, and registration name. -* -* @param kvStateId KvStateID to identify the KvState instance -* @param kvState KvState instance to register -* @throws IllegalStateException If there is a KvState instance registered -* with the same ID. -*/ - - /** * Registers the KvState instance and returns the assigned ID. * * @param jobIdJobId the KvState instance belongs to @@ -96,7 +86,7 @@ public class KvStateRegistry { KvStateID kvStateId = new KvStateID(); if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) { - KvStateRegistryListener listener = this.listener.get(); + final KvStateRegistryListener listener = listenerRef.get(); if (listener != null) { listener.notifyKvStateRegistered( jobId, @@ -108,7 +98,8 @@ public class KvStateRegistry { return kvStateId; } else { - throw new IllegalStateException(kvStateId + " is already registered."); + throw new IllegalStateException( + "State \"" + registrationName + " \"(id=" + kvStateId + ") appears registered although it should not."); } } @@ -127,7 +118,7 @@ public class KvStateRegistry { KvStateID kvStateId) { if (registeredKvStates.remove(kvStateId) != null) { - KvStateRegistryListener listener = this.listener.get(); + final KvStateRegistryListener listener = listenerRef.get(); if (listener != null) { listener.notifyKvStateUnregistered( jobId,
[4/6] flink git commit: [FLINK-8055][QS] Deduplicate logging messages about QS start.
[FLINK-8055][QS] Deduplicate logging messages about QS start. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12b0c58f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12b0c58f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12b0c58f Branch: refs/heads/release-1.4 Commit: 12b0c58f6780376ac2da0f02c6e7eb8a24ab8a13 Parents: 6314e48 Author: kkloudas Authored: Thu Nov 16 17:02:16 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 11:19:35 2017 +0100 -- .../network/AbstractServerBase.java | 20 ++-- .../flink/queryablestate/network/Client.java| 20 ++-- .../server/KvStateServerImpl.java | 5 - .../HAAbstractQueryableStateTestBase.java | 2 +- .../network/AbstractServerTest.java | 2 +- .../network/KvStateServerHandlerTest.java | 2 +- .../runtime/io/network/NetworkEnvironment.java | 2 -- 7 files changed, 27 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java -- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java index 07ca26d..82a05f2 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java @@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit; @Internal public abstract class AbstractServerBase { - private static final Logger LOG = LoggerFactory.getLogger(AbstractServerBase.class); + protected final Logger log = LoggerFactory.getLogger(getClass()); /** AbstractServerBase config: low water mark. */ private static final int LOW_WATER_MARK = 8 * 1024; @@ -180,16 +180,16 @@ public abstract class AbstractServerBase portIterator = bindPortRange.iterator(); while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {} if (serverAddress != null) { - LOG.info("Started the {} @ {}.", serverName, serverAddress); + log.info("Started {} @ {}.", serverName, serverAddress); } else { - LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName); - throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied."); + log.info("Unable to start {}. All ports in provided range are occupied.", serverName); + throw new FlinkRuntimeException("Unable to start " + serverName + ". All ports in provided range are occupied."); } } @@ -203,7 +203,7 @@ public abstract class AbstractServerBasehttp://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java -- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java index e21145b..12286fa 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java @@ -19,6 +19,7 @@ package org.apache.flink.queryablestate.network; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.queryablestate.FutureUtils; import org.apache.flink.queryablestate.network.messages.MessageBody; import org.apache.flink.queryablestate.network.messages.MessageSerializer; @@ -282,12 +283,14 @@ public class Client { while (!queuedRequests.isEmpty()) { final PendingRequest pending = queuedRequests.poll(); - established.sendRequest(pending.request) -
flink git commit: [FLINK-8061][QS] Remove trailing * in QSClient javadocs.
Repository: flink Updated Branches: refs/heads/master a4d869759 -> 3edbb7bce [FLINK-8061][QS] Remove trailing * in QSClient javadocs. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3edbb7bc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3edbb7bc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3edbb7bc Branch: refs/heads/master Commit: 3edbb7bce5b30386a67b1b01ef1591a681601219 Parents: a4d8697 Author: Vetriselvan1187 Authored: Mon Nov 13 22:24:43 2017 +0530 Committer: kkloudas Committed: Fri Nov 17 15:28:46 2017 +0100 -- .../apache/flink/queryablestate/client/QueryableStateClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3edbb7bc/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java -- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java index 03e02e1..7abf6bc 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java @@ -132,7 +132,7 @@ public class QueryableStateClient { } /** -* Returns a future holding the request result. * +* Returns a future holding the request result. * @param jobId JobID of the job the queryable state belongs to. * @param queryableStateNameName under which the state is queryable. * @param key The key we are interested in. @@ -155,7 +155,7 @@ public class QueryableStateClient { } /** -* Returns a future holding the request result. * +* Returns a future holding the request result. * @param jobId JobID of the job the queryable state belongs to. * @param queryableStateNameName under which the state is queryable. * @param key The key we are interested in.
flink git commit: [FLINK-8061][QS] Remove trailing * in QSClient javadocs.
Repository: flink Updated Branches: refs/heads/release-1.4 3753ae251 -> e784f3a18 [FLINK-8061][QS] Remove trailing * in QSClient javadocs. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e784f3a1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e784f3a1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e784f3a1 Branch: refs/heads/release-1.4 Commit: e784f3a184ecc35ea26fdfa6bcb4fa74520fa9df Parents: 3753ae2 Author: Vetriselvan1187 Authored: Mon Nov 13 17:54:43 2017 +0100 Committer: kkloudas Committed: Fri Nov 17 15:31:15 2017 +0100 -- .../apache/flink/queryablestate/client/QueryableStateClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e784f3a1/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java -- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java index 03e02e1..7abf6bc 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java @@ -132,7 +132,7 @@ public class QueryableStateClient { } /** -* Returns a future holding the request result. * +* Returns a future holding the request result. * @param jobId JobID of the job the queryable state belongs to. * @param queryableStateNameName under which the state is queryable. * @param key The key we are interested in. @@ -155,7 +155,7 @@ public class QueryableStateClient { } /** -* Returns a future holding the request result. * +* Returns a future holding the request result. * @param jobId JobID of the job the queryable state belongs to. * @param queryableStateNameName under which the state is queryable. * @param key The key we are interested in.
[1/2] flink git commit: [FLINK-7266] [core] Prevent attempt for parent directory deletion for object stores
Repository: flink Updated Branches: refs/heads/master 3edbb7bce -> b00f1b326 [FLINK-7266] [core] Prevent attempt for parent directory deletion for object stores This closes #4397 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b00f1b32 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b00f1b32 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b00f1b32 Branch: refs/heads/master Commit: b00f1b326c1ab4221a555200a4d5798e1565b821 Parents: f29f805 Author: Stephan Ewen Authored: Tue Jul 25 17:26:38 2017 +0200 Committer: Aljoscha Krettek Committed: Fri Nov 17 16:48:29 2017 +0100 -- .../flink/runtime/state/filesystem/FileStateHandle.java | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b00f1b32/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java index bdf3f42..7655f0b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.filesystem; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.FileUtils; @@ -77,14 +78,15 @@ public class FileStateHandle implements StreamStateHandle { */ @Override public void discardState() throws Exception { - FileSystem fs = getFileSystem(); fs.delete(filePath, false); - try { - FileUtils.deletePathIfEmpty(fs, filePath.getParent()); - } catch (Exception ignored) {} + if (fs.getKind() == FileSystemKind.FILE_SYSTEM) { + try { + FileUtils.deletePathIfEmpty(fs, filePath.getParent()); + } catch (Exception ignored) {} + } } /**
[2/2] flink git commit: [FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore
[FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f29f8057 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f29f8057 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f29f8057 Branch: refs/heads/master Commit: f29f80575dac1c7e59dd7074118953b8be26520f Parents: 3edbb7b Author: Stephan Ewen Authored: Tue Jul 25 17:19:25 2017 +0200 Committer: Aljoscha Krettek Committed: Fri Nov 17 16:48:29 2017 +0100 -- .../org/apache/flink/core/fs/FileSystem.java| 5 + .../apache/flink/core/fs/FileSystemKind.java| 40 .../core/fs/SafetyNetWrapperFileSystem.java | 5 + .../flink/core/fs/local/LocalFileSystem.java| 10 +- .../core/fs/local/LocalFileSystemTest.java | 7 ++ .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 47 + .../flink/runtime/fs/maprfs/MapRFileSystem.java | 6 ++ .../flink/runtime/fs/hdfs/HdfsKindTest.java | 101 +++ 8 files changed, 219 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java -- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index d66a893..982e496 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -633,6 +633,11 @@ public abstract class FileSystem { */ public abstract boolean isDistributedFS(); + /** +* Gets a description of the characteristics of this file system. +*/ + public abstract FileSystemKind getKind(); + // // output directory initialization // http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java -- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java new file mode 100644 index 000..52f58ac --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An enumeration defining the kind and characteristics of a {@link FileSystem}. + */ +@PublicEvolving +public enum FileSystemKind { + + /** +* An actual file system, with files and directories. +*/ + FILE_SYSTEM, + + /** +* An Object store. Files correspond to objects. +* There are not really directories, but a directory-like structure may be mimicked +* by hierarchical naming of files. +*/ + OBJECT_STORE +} http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java -- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java index a1167dd..e7f43a4 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java @@ -141,6 +141,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr } @Override + public FileSystemKind getKind() { + return uns
[1/2] flink git commit: [FLINK-7266] [core] Prevent attempt for parent directory deletion for object stores
Repository: flink Updated Branches: refs/heads/release-1.4 e784f3a18 -> 666b1b2e6 [FLINK-7266] [core] Prevent attempt for parent directory deletion for object stores This closes #4397 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/666b1b2e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/666b1b2e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/666b1b2e Branch: refs/heads/release-1.4 Commit: 666b1b2e62463ae4985d237535d56c9e0ab9dba9 Parents: a0dbe18 Author: Stephan Ewen Authored: Tue Jul 25 17:26:38 2017 +0200 Committer: Aljoscha Krettek Committed: Fri Nov 17 17:22:24 2017 +0100 -- .../flink/runtime/state/filesystem/FileStateHandle.java | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/666b1b2e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java index bdf3f42..7655f0b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.filesystem; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.FileUtils; @@ -77,14 +78,15 @@ public class FileStateHandle implements StreamStateHandle { */ @Override public void discardState() throws Exception { - FileSystem fs = getFileSystem(); fs.delete(filePath, false); - try { - FileUtils.deletePathIfEmpty(fs, filePath.getParent()); - } catch (Exception ignored) {} + if (fs.getKind() == FileSystemKind.FILE_SYSTEM) { + try { + FileUtils.deletePathIfEmpty(fs, filePath.getParent()); + } catch (Exception ignored) {} + } } /**
[2/2] flink git commit: [FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore
[FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0dbe182 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0dbe182 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0dbe182 Branch: refs/heads/release-1.4 Commit: a0dbe182fa677a87f601cbedc4115e63fff9fe4f Parents: e784f3a Author: Stephan Ewen Authored: Tue Jul 25 17:19:25 2017 +0200 Committer: Aljoscha Krettek Committed: Fri Nov 17 17:22:24 2017 +0100 -- .../org/apache/flink/core/fs/FileSystem.java| 5 + .../apache/flink/core/fs/FileSystemKind.java| 40 .../core/fs/SafetyNetWrapperFileSystem.java | 5 + .../flink/core/fs/local/LocalFileSystem.java| 10 +- .../core/fs/local/LocalFileSystemTest.java | 7 ++ .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 47 + .../flink/runtime/fs/maprfs/MapRFileSystem.java | 6 ++ .../flink/runtime/fs/hdfs/HdfsKindTest.java | 101 +++ 8 files changed, 219 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a0dbe182/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java -- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index d66a893..982e496 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -633,6 +633,11 @@ public abstract class FileSystem { */ public abstract boolean isDistributedFS(); + /** +* Gets a description of the characteristics of this file system. +*/ + public abstract FileSystemKind getKind(); + // // output directory initialization // http://git-wip-us.apache.org/repos/asf/flink/blob/a0dbe182/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java -- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java new file mode 100644 index 000..52f58ac --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An enumeration defining the kind and characteristics of a {@link FileSystem}. + */ +@PublicEvolving +public enum FileSystemKind { + + /** +* An actual file system, with files and directories. +*/ + FILE_SYSTEM, + + /** +* An Object store. Files correspond to objects. +* There are not really directories, but a directory-like structure may be mimicked +* by hierarchical naming of files. +*/ + OBJECT_STORE +} http://git-wip-us.apache.org/repos/asf/flink/blob/a0dbe182/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java -- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java index a1167dd..e7f43a4 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java @@ -141,6 +141,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr } @Override + public FileSystemKind getKind() { + retur