[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4d86975 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4d86975 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4d86975 Branch: refs/heads/master Commit: a4d86975967942054d1bd466641e9c835fb014ac Parents: 2fe078f Author: kkloudas <kklou...@gmail.com> Authored: Fri Nov 17 09:26:10 2017 +0100 Committer: kkloudas <kklou...@gmail.com> Committed: Fri Nov 17 10:46:11 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/query/KvStateRegistry.java | 23 ++++++-------------- 1 file changed, 7 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a4d86975/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java index af19d81..ed1f92e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java @@ -45,7 +45,7 @@ public class KvStateRegistry { new ConcurrentHashMap<>(); /** Registry listener to be notified on registration/unregistration. */ - private final AtomicReference<KvStateRegistryListener> listener = new AtomicReference<>(); + private final AtomicReference<KvStateRegistryListener> listenerRef = new AtomicReference<>(); /** * Registers a listener with the registry. @@ -54,7 +54,7 @@ public class KvStateRegistry { * @throws IllegalStateException If there is a registered listener */ public void registerListener(KvStateRegistryListener listener) { - if (!this.listener.compareAndSet(null, listener)) { + if (!listenerRef.compareAndSet(null, listener)) { throw new IllegalStateException("Listener already registered."); } } @@ -63,20 +63,10 @@ public class KvStateRegistry { * Unregisters the listener with the registry. */ public void unregisterListener() { - listener.set(null); + listenerRef.set(null); } /** - * Registers the KvState instance identified by the given 4-tuple of JobID, - * JobVertexID, key group index, and registration name. - * - * @param kvStateId KvStateID to identify the KvState instance - * @param kvState KvState instance to register - * @throws IllegalStateException If there is a KvState instance registered - * with the same ID. - */ - - /** * Registers the KvState instance and returns the assigned ID. * * @param jobId JobId the KvState instance belongs to @@ -96,7 +86,7 @@ public class KvStateRegistry { KvStateID kvStateId = new KvStateID(); if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) { - KvStateRegistryListener listener = this.listener.get(); + final KvStateRegistryListener listener = listenerRef.get(); if (listener != null) { listener.notifyKvStateRegistered( jobId, @@ -108,7 +98,8 @@ public class KvStateRegistry { return kvStateId; } else { - throw new IllegalStateException(kvStateId + " is already registered."); + throw new IllegalStateException( + "State \"" + registrationName + " \"(id=" + kvStateId + ") appears registered although it should not."); } } @@ -127,7 +118,7 @@ public class KvStateRegistry { KvStateID kvStateId) { if (registeredKvStates.remove(kvStateId) != null) { - KvStateRegistryListener listener = this.listener.get(); + final KvStateRegistryListener listener = listenerRef.get(); if (listener != null) { listener.notifyKvStateUnregistered( jobId,