[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3753ae25
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3753ae25
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3753ae25

Branch: refs/heads/release-1.4
Commit: 3753ae2517fbc940c05ea54e3eb0a960fecdf879
Parents: 1a68d75
Author: kkloudas <kklou...@gmail.com>
Authored: Fri Nov 17 09:26:10 2017 +0100
Committer: kkloudas <kklou...@gmail.com>
Committed: Fri Nov 17 11:21:18 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/query/KvStateRegistry.java    | 23 ++++++--------------
 1 file changed, 7 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3753ae25/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index af19d81..ed1f92e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -45,7 +45,7 @@ public class KvStateRegistry {
                        new ConcurrentHashMap<>();
 
        /** Registry listener to be notified on registration/unregistration. */
-       private final AtomicReference<KvStateRegistryListener> listener = new 
AtomicReference<>();
+       private final AtomicReference<KvStateRegistryListener> listenerRef = 
new AtomicReference<>();
 
        /**
         * Registers a listener with the registry.
@@ -54,7 +54,7 @@ public class KvStateRegistry {
         * @throws IllegalStateException If there is a registered listener
         */
        public void registerListener(KvStateRegistryListener listener) {
-               if (!this.listener.compareAndSet(null, listener)) {
+               if (!listenerRef.compareAndSet(null, listener)) {
                        throw new IllegalStateException("Listener already 
registered.");
                }
        }
@@ -63,20 +63,10 @@ public class KvStateRegistry {
         * Unregisters the listener with the registry.
         */
        public void unregisterListener() {
-               listener.set(null);
+               listenerRef.set(null);
        }
 
        /**
-        * Registers the KvState instance identified by the given 4-tuple of 
JobID,
-        * JobVertexID, key group index, and registration name.
-        *
-        * @param kvStateId KvStateID to identify the KvState instance
-        * @param kvState   KvState instance to register
-        * @throws IllegalStateException If there is a KvState instance 
registered
-        *                               with the same ID.
-        */
-
-       /**
         * Registers the KvState instance and returns the assigned ID.
         *
         * @param jobId            JobId the KvState instance belongs to
@@ -96,7 +86,7 @@ public class KvStateRegistry {
                KvStateID kvStateId = new KvStateID();
 
                if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) 
{
-                       KvStateRegistryListener listener = this.listener.get();
+                       final KvStateRegistryListener listener = 
listenerRef.get();
                        if (listener != null) {
                                listener.notifyKvStateRegistered(
                                                jobId,
@@ -108,7 +98,8 @@ public class KvStateRegistry {
 
                        return kvStateId;
                } else {
-                       throw new IllegalStateException(kvStateId + " is 
already registered.");
+                       throw new IllegalStateException(
+                                       "State \"" + registrationName + " 
\"(id=" + kvStateId + ") appears registered although it should not.");
                }
        }
 
@@ -127,7 +118,7 @@ public class KvStateRegistry {
                        KvStateID kvStateId) {
 
                if (registeredKvStates.remove(kvStateId) != null) {
-                       KvStateRegistryListener listener = this.listener.get();
+                       final KvStateRegistryListener listener = 
listenerRef.get();
                        if (listener != null) {
                                listener.notifyKvStateUnregistered(
                                                jobId,

Reply via email to