ibessonov commented on a change in pull request #114:
URL: https://github.com/apache/ignite-3/pull/114#discussion_r627190228



##########
File path: 
modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
##########
@@ -428,7 +428,7 @@ private void updateFromListener(
 
         ConfigurationStorage storage = storageInstances.get(storageType);
 
-        long storageRevision = changedEntries.storageRevision();
+        long storageRevision = changedEntries.cfgVersion();

Review comment:
       Can we rename it to "changeId" to avoid further confusion?

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -19,24 +19,47 @@
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.configuration.storage.ConfigurationStorage;
 import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
 import org.apache.ignite.configuration.storage.ConfigurationType;
 import org.apache.ignite.configuration.storage.Data;
 import org.apache.ignite.configuration.storage.StorageException;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operation;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Distributed configuration storage.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class 
DistributedConfigurationStorage implements ConfigurationStorage {
+public class DistributedConfigurationStorage implements ConfigurationStorage {
+    /** Prefix that we add to configuration keys to distinguish them in 
metastorage. */
+    private static String DISTRIBUTED_PREFIX = 
ConfigurationType.DISTRIBUTED.name() + "-cfg";

Review comment:
       not final

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -19,24 +19,47 @@
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.configuration.storage.ConfigurationStorage;
 import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
 import org.apache.ignite.configuration.storage.ConfigurationType;
 import org.apache.ignite.configuration.storage.Data;
 import org.apache.ignite.configuration.storage.StorageException;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operation;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Distributed configuration storage.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class 
DistributedConfigurationStorage implements ConfigurationStorage {
+public class DistributedConfigurationStorage implements ConfigurationStorage {
+    /** Prefix that we add to configuration keys to distinguish them in 
metastorage. */
+    private static String DISTRIBUTED_PREFIX = 
ConfigurationType.DISTRIBUTED.name() + "-cfg";
+
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(DistributedConfigurationStorage.class);
+
+    /** Key for CAS-ing configuration keys to metastorage. */
+    private static Key masterKey = new Key(DISTRIBUTED_PREFIX + ".");

Review comment:
       not final

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager 
metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is 
applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new 
Key(DISTRIBUTED_PREFIX + "."),

Review comment:
       can we reuse "masterKey" constant here?

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager 
metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is 
applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new 
Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        long maxRevision = 0L;

Review comment:
       this should be a revision of master key, right? You don't need to 
explicitly compute max value.

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager 
metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is 
applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new 
Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        long maxRevision = 0L;
+
+        Entry entryForMasterKey = null;
+
+        for (Entry entry : cur) {
+            if (!entry.key().equals(masterKey)) {
+                
data.put(entry.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),
+                    (Serializable)ByteUtils.fromBytes(entry.value()));

Review comment:
       How exactly do you serialize these values? You should do this 
effectively!

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX 
+ ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + 
(char)('.' + 1));

Review comment:
       Cool, make similar constant for metastorage

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX 
+ ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + 
(char)('.' + 1));
+
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Iterator<Entry> iter =
+            vaultMgr.range(localKeysStartRange, localKeysEndRange);
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        while (iter.hasNext()) {
+            Entry val = iter.next();
+
+            data.put(val.key().toString().replaceFirst(LOCAL_PREFIX + ".", ""),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long sentVersion) {
+        if (sentVersion != version.get())
             return CompletableFuture.completedFuture(false);
 
+        CompletableFuture[] futs = new CompletableFuture[newValues.size()];
+
+        int i = 0;
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + 
entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                futs[i++] = vaultMgr.put(key, 
ByteUtils.toBytes(entry.getValue()));

Review comment:
       I told you why I don't like this solution.

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX 
+ ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + 
(char)('.' + 1));
+
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Iterator<Entry> iter =
+            vaultMgr.range(localKeysStartRange, localKeysEndRange);
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        while (iter.hasNext()) {
+            Entry val = iter.next();
+
+            data.put(val.key().toString().replaceFirst(LOCAL_PREFIX + ".", ""),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long sentVersion) {
+        if (sentVersion != version.get())
             return CompletableFuture.completedFuture(false);
 
+        CompletableFuture[] futs = new CompletableFuture[newValues.size()];
+
+        int i = 0;
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + 
entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                futs[i++] = vaultMgr.put(key, 
ByteUtils.toBytes(entry.getValue()));
             else
-                map.remove(entry.getKey());
+                futs[i++] = vaultMgr.remove(key);
         }
 
-        this.version.incrementAndGet();
+        try {
+            CompletableFuture.allOf(futs).get();
+
+            latch = new CountDownLatch(newValues.size());
+
+            latch.await();
+
+            for (Map.Entry<String, Serializable> entry : newValues.entrySet()) 
{
+                ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + 
entry.getKey());
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new 
Data(newValues, this.version.get(), 0)));
+                Entry e = vaultMgr.get(key).get();
+
+                if (e.value() != ByteUtils.toBytes(entry.getValue()))

Review comment:
       We should discuss this in person, current approach doesn't look right.

##########
File path: 
modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
##########
@@ -44,6 +44,8 @@
      * Add listener to the storage that notifies of data changes.
      * @param listener Listener.
      */
+    // TODO: seems that it's not needed to have an ability to set several 
listeners to storage, as far as only one is responsible

Review comment:
       Please assign ticket to every TODO that yo put in the code, otherwise 
we'll lose them

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager 
metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is 
applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new 
Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));

Review comment:
       I know why you did this, but please extract it into method and add 
comment about range :)

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager 
metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is 
applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new 
Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        long maxRevision = 0L;
+
+        Entry entryForMasterKey = null;
+
+        for (Entry entry : cur) {
+            if (!entry.key().equals(masterKey)) {
+                
data.put(entry.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),
+                    (Serializable)ByteUtils.fromBytes(entry.value()));
+
+                if (maxRevision < entry.revision())
+                    maxRevision = entry.revision();
+            } else
+                entryForMasterKey = entry;
+        }
+
+        if (!data.isEmpty()) {
+            assert entryForMasterKey != null;
+
+            assert maxRevision == entryForMasterKey.revision();
+
+            assert maxRevision >= version.get();
+
+            return new Data(data, maxRevision);
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long sentVersion) {
+        assert sentVersion <= version.get();
+
+        if (sentVersion != version.get())
+            // This means that sentVersion is less than version and other node 
has already updated configuration and
+            // write should be retried. Actual version will be set when watch 
and corresponding configuration listener
+            // updates configuration and notifyApplied is triggered afterwards.
             return CompletableFuture.completedFuture(false);
 
+        HashSet<Operation> operations = new HashSet<>();
+
+        HashSet<Operation> failures = new HashSet<>();
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            Key key = new Key(DISTRIBUTED_PREFIX + "." + entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                operations.add(Operations.put(key, 
ByteUtils.toBytes(entry.getValue())));
             else
-                map.remove(entry.getKey());
-        }
+                operations.add(Operations.remove(key));
 
-        this.version.incrementAndGet();
+            failures.add(Operations.noop());
+        }
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new 
Data(newValues, this.version.get(), 0)));
+        operations.add(Operations.put(masterKey, new byte[1]));
 
-        return CompletableFuture.completedFuture(true);
+        return 
metaStorageMgr.invoke(Conditions.key(masterKey).revision().eq(version.get()), 
operations, failures);
     }
 
     /** {@inheritDoc} */
-    @Override public void addListener(ConfigurationStorageListener listener) {
+    @Override public synchronized void 
addListener(ConfigurationStorageListener listener) {
         listeners.add(listener);
+
+        if (watchId == null) {
+            watchId = metaStorageMgr.registerWatchByPrefix(masterKey, new 
WatchListener() {
+                @Override public boolean onUpdate(@NotNull 
Iterable<WatchEvent> events) {
+                    HashMap<String, Serializable> data = new HashMap<>();
+
+                    long maxRevision = 0L;
+
+                    Entry entryForMasterKey = null;
+
+                    for (WatchEvent event : events) {
+                        Entry e = event.newEntry();
+
+                        if (!e.key().equals(masterKey)) {
+                            
data.put(e.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),

Review comment:
       Same here

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager 
metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is 
applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new 
Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        long maxRevision = 0L;
+
+        Entry entryForMasterKey = null;
+
+        for (Entry entry : cur) {
+            if (!entry.key().equals(masterKey)) {
+                
data.put(entry.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),
+                    (Serializable)ByteUtils.fromBytes(entry.value()));
+
+                if (maxRevision < entry.revision())
+                    maxRevision = entry.revision();
+            } else
+                entryForMasterKey = entry;
+        }
+
+        if (!data.isEmpty()) {
+            assert entryForMasterKey != null;
+
+            assert maxRevision == entryForMasterKey.revision();
+
+            assert maxRevision >= version.get();
+
+            return new Data(data, maxRevision);
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long sentVersion) {
+        assert sentVersion <= version.get();
+
+        if (sentVersion != version.get())
+            // This means that sentVersion is less than version and other node 
has already updated configuration and
+            // write should be retried. Actual version will be set when watch 
and corresponding configuration listener
+            // updates configuration and notifyApplied is triggered afterwards.
             return CompletableFuture.completedFuture(false);
 
+        HashSet<Operation> operations = new HashSet<>();
+
+        HashSet<Operation> failures = new HashSet<>();
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            Key key = new Key(DISTRIBUTED_PREFIX + "." + entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                operations.add(Operations.put(key, 
ByteUtils.toBytes(entry.getValue())));
             else
-                map.remove(entry.getKey());
-        }
+                operations.add(Operations.remove(key));
 
-        this.version.incrementAndGet();
+            failures.add(Operations.noop());
+        }
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new 
Data(newValues, this.version.get(), 0)));
+        operations.add(Operations.put(masterKey, new byte[1]));

Review comment:
       Will metastorage update revision if value is the same every time?

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager 
metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is 
applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new 
Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        long maxRevision = 0L;
+
+        Entry entryForMasterKey = null;
+
+        for (Entry entry : cur) {
+            if (!entry.key().equals(masterKey)) {
+                
data.put(entry.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),
+                    (Serializable)ByteUtils.fromBytes(entry.value()));
+
+                if (maxRevision < entry.revision())
+                    maxRevision = entry.revision();
+            } else
+                entryForMasterKey = entry;
+        }
+
+        if (!data.isEmpty()) {
+            assert entryForMasterKey != null;
+
+            assert maxRevision == entryForMasterKey.revision();
+
+            assert maxRevision >= version.get();
+
+            return new Data(data, maxRevision);
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long sentVersion) {
+        assert sentVersion <= version.get();
+
+        if (sentVersion != version.get())
+            // This means that sentVersion is less than version and other node 
has already updated configuration and
+            // write should be retried. Actual version will be set when watch 
and corresponding configuration listener
+            // updates configuration and notifyApplied is triggered afterwards.
             return CompletableFuture.completedFuture(false);
 
+        HashSet<Operation> operations = new HashSet<>();
+
+        HashSet<Operation> failures = new HashSet<>();
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            Key key = new Key(DISTRIBUTED_PREFIX + "." + entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                operations.add(Operations.put(key, 
ByteUtils.toBytes(entry.getValue())));
             else
-                map.remove(entry.getKey());
-        }
+                operations.add(Operations.remove(key));
 
-        this.version.incrementAndGet();
+            failures.add(Operations.noop());
+        }
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new 
Data(newValues, this.version.get(), 0)));
+        operations.add(Operations.put(masterKey, new byte[1]));
 
-        return CompletableFuture.completedFuture(true);
+        return 
metaStorageMgr.invoke(Conditions.key(masterKey).revision().eq(version.get()), 
operations, failures);
     }
 
     /** {@inheritDoc} */
-    @Override public void addListener(ConfigurationStorageListener listener) {
+    @Override public synchronized void 
addListener(ConfigurationStorageListener listener) {
         listeners.add(listener);
+
+        if (watchId == null) {
+            watchId = metaStorageMgr.registerWatchByPrefix(masterKey, new 
WatchListener() {
+                @Override public boolean onUpdate(@NotNull 
Iterable<WatchEvent> events) {
+                    HashMap<String, Serializable> data = new HashMap<>();
+
+                    long maxRevision = 0L;
+
+                    Entry entryForMasterKey = null;
+
+                    for (WatchEvent event : events) {
+                        Entry e = event.newEntry();
+
+                        if (!e.key().equals(masterKey)) {
+                            
data.put(e.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),
+                                (Serializable)ByteUtils.fromBytes(e.value()));
+
+                            if (maxRevision < e.revision())
+                                maxRevision = e.revision();
+                        } else
+                            entryForMasterKey = e;
+                    }
+
+                    // Contract of metastorage ensures that all updates of one 
revision will come in one batch.
+                    // Also masterKey should be updated every time when we 
update cfg.
+                    // That means that masterKey update must be included in 
the batch.
+                    assert entryForMasterKey != null;
+
+                    assert maxRevision == entryForMasterKey.revision();
+
+                    assert maxRevision >= version.get();
+
+                    long finalMaxRevision = maxRevision;
+
+                    listeners.forEach(listener -> 
listener.onEntriesChanged(new Data(data, finalMaxRevision)));
+
+                    return true;
+                }
+
+                @Override public void onError(@NotNull Throwable e) {
+                    LOG.error("Metastorage listener issue", e);
+                }
+            });
+
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void removeListener(ConfigurationStorageListener 
listener) {
+    @Override public synchronized void 
removeListener(ConfigurationStorageListener listener) {

Review comment:
       I'm not sure that we even need this method :( No one uses it

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX 
+ ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + 
(char)('.' + 1));
+
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Iterator<Entry> iter =
+            vaultMgr.range(localKeysStartRange, localKeysEndRange);
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        while (iter.hasNext()) {
+            Entry val = iter.next();
+
+            data.put(val.key().toString().replaceFirst(LOCAL_PREFIX + ".", ""),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long sentVersion) {
+        if (sentVersion != version.get())
             return CompletableFuture.completedFuture(false);
 
+        CompletableFuture[] futs = new CompletableFuture[newValues.size()];
+
+        int i = 0;
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + 
entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                futs[i++] = vaultMgr.put(key, 
ByteUtils.toBytes(entry.getValue()));
             else
-                map.remove(entry.getKey());
+                futs[i++] = vaultMgr.remove(key);
         }
 
-        this.version.incrementAndGet();
+        try {
+            CompletableFuture.allOf(futs).get();
+
+            latch = new CountDownLatch(newValues.size());
+
+            latch.await();

Review comment:
       WHAT?
   Let me explain, how do you avoid races?

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX 
+ ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + 
(char)('.' + 1));
+
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Iterator<Entry> iter =
+            vaultMgr.range(localKeysStartRange, localKeysEndRange);
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        while (iter.hasNext()) {
+            Entry val = iter.next();
+
+            data.put(val.key().toString().replaceFirst(LOCAL_PREFIX + ".", ""),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long sentVersion) {
+        if (sentVersion != version.get())
             return CompletableFuture.completedFuture(false);
 
+        CompletableFuture[] futs = new CompletableFuture[newValues.size()];
+
+        int i = 0;
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + 
entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                futs[i++] = vaultMgr.put(key, 
ByteUtils.toBytes(entry.getValue()));
             else
-                map.remove(entry.getKey());
+                futs[i++] = vaultMgr.remove(key);
         }
 
-        this.version.incrementAndGet();
+        try {
+            CompletableFuture.allOf(futs).get();
+
+            latch = new CountDownLatch(newValues.size());
+
+            latch.await();
+
+            for (Map.Entry<String, Serializable> entry : newValues.entrySet()) 
{
+                ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + 
entry.getKey());
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new 
Data(newValues, this.version.get(), 0)));
+                Entry e = vaultMgr.get(key).get();
+
+                if (e.value() != ByteUtils.toBytes(entry.getValue()))
+                    // value by some key was overwritten, that means that 
changes not
+                    // from LocalConfigurationStorage.write overlapped with 
current changes, so write should be retried.
+                    return CompletableFuture.completedFuture(false);
+            }
+        }
+        catch (InterruptedException | ExecutionException e) {
+            return CompletableFuture.completedFuture(false);

Review comment:
       hm...

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX 
+ ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + 
(char)('.' + 1));
+
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Iterator<Entry> iter =
+            vaultMgr.range(localKeysStartRange, localKeysEndRange);
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        while (iter.hasNext()) {
+            Entry val = iter.next();
+
+            data.put(val.key().toString().replaceFirst(LOCAL_PREFIX + ".", ""),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, 
Serializable> newValues, long sentVersion) {
+        if (sentVersion != version.get())
             return CompletableFuture.completedFuture(false);
 
+        CompletableFuture[] futs = new CompletableFuture[newValues.size()];
+
+        int i = 0;
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + 
entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                futs[i++] = vaultMgr.put(key, 
ByteUtils.toBytes(entry.getValue()));
             else
-                map.remove(entry.getKey());
+                futs[i++] = vaultMgr.remove(key);
         }
 
-        this.version.incrementAndGet();
+        try {
+            CompletableFuture.allOf(futs).get();

Review comment:
       What? 

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX 
+ ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + 
(char)('.' + 1));
+
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Iterator<Entry> iter =
+            vaultMgr.range(localKeysStartRange, localKeysEndRange);
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        while (iter.hasNext()) {
+            Entry val = iter.next();
+
+            data.put(val.key().toString().replaceFirst(LOCAL_PREFIX + ".", ""),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        return new Data(data, version.get());

Review comment:
       version is not persisted, right?

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager 
metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new 
CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is 
applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new 
Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        long maxRevision = 0L;
+
+        Entry entryForMasterKey = null;
+
+        for (Entry entry : cur) {
+            if (!entry.key().equals(masterKey)) {
+                
data.put(entry.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),

Review comment:
       This method expects regexp, so there are two issues:
   - it is very slow;
   - "." symbol is not properly escaped.
   Why won't you use "substring"?

##########
File path: 
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -19,24 +19,47 @@
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.configuration.storage.ConfigurationStorage;
 import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
 import org.apache.ignite.configuration.storage.ConfigurationType;
 import org.apache.ignite.configuration.storage.Data;
 import org.apache.ignite.configuration.storage.StorageException;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operation;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Distributed configuration storage.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class 
DistributedConfigurationStorage implements ConfigurationStorage {
+public class DistributedConfigurationStorage implements ConfigurationStorage {
+    /** Prefix that we add to configuration keys to distinguish them in 
metastorage. */
+    private static String DISTRIBUTED_PREFIX = 
ConfigurationType.DISTRIBUTED.name() + "-cfg";

Review comment:
       I'd prefer shorter string, because this part will be duplicated 
literally hundreds of times in many messages




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to