rpuch commented on code in PR #677:
URL: https://github.com/apache/ignite-3/pull/677#discussion_r855213465


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java:
##########
@@ -720,17 +725,20 @@ public synchronized CompletableFuture<Void> 
unregisterWatch(long id) {
         try {
             var rangeCriterion = 
KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
 
-            return new CursorWrapper<>(
-                    metaStorageSvcFut.thenApply(svc -> 
svc.range(rangeCriterion.from(), rangeCriterion.to(), appliedRevision()))
+            CompletableFuture<Cursor<Entry>> cursor = 
metaStorageSvcFut.thenCombine(

Review Comment:
   `cursorFuture` again?



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java:
##########
@@ -111,88 +120,95 @@ public DistributedConfigurationStorage(MetaStorageManager 
metaStorageMgr, VaultM
         this.vaultMgr = vaultMgr;
     }
 
+    @Override
+    public void close() throws Exception {
+        IgniteUtils.shutdownAndAwaitTermination(threadPool, 10, 
TimeUnit.SECONDS);
+    }
+
     /** {@inheritDoc} */
     @Override
-    public Map<String, Serializable> readAllLatest(String prefix) {
-        var data = new HashMap<String, Serializable>();
+    public CompletableFuture<Map<String, ? extends Serializable>> 
readAllLatest(String prefix) {
+        return supplyAsync(() -> {
+            var data = new HashMap<String, Serializable>();
 
-        var rangeStart = new ByteArray(DISTRIBUTED_PREFIX + prefix);
+            var rangeStart = new ByteArray(DISTRIBUTED_PREFIX + prefix);
 
-        var rangeEnd = new ByteArray(incrementLastChar(DISTRIBUTED_PREFIX + 
prefix));
+            var rangeEnd = new ByteArray(incrementLastChar(DISTRIBUTED_PREFIX 
+ prefix));
 
-        try (Cursor<Entry> entries = metaStorageMgr.range(rangeStart, 
rangeEnd)) {
-            for (Entry entry : entries) {
-                ByteArray key = entry.key();
-                byte[] value = entry.value();
+            try (Cursor<Entry> entries = metaStorageMgr.range(rangeStart, 
rangeEnd)) {
+                for (Entry entry : entries) {
+                    ByteArray key = entry.key();
+                    byte[] value = entry.value();
 
-                if (entry.tombstone()) {
-                    continue;
-                }
+                    if (entry.tombstone()) {
+                        continue;
+                    }
 
-                // Meta Storage should not return nulls as values
-                assert value != null;
+                    // Meta Storage should not return nulls as values
+                    assert value != null;
 
-                if (key.equals(MASTER_KEY)) {
-                    continue;
-                }
+                    if (key.equals(MASTER_KEY)) {
+                        continue;
+                    }
 
-                String dataKey = 
key.toString().substring(DISTRIBUTED_PREFIX.length());
+                    String dataKey = 
key.toString().substring(DISTRIBUTED_PREFIX.length());
 
-                data.put(dataKey, 
ConfigurationSerializationUtil.fromBytes(value));
+                    data.put(dataKey, 
ConfigurationSerializationUtil.fromBytes(value));
+                }
+            } catch (Exception e) {
+                throw new StorageException("Exception when closing a Meta 
Storage cursor", e);
             }
-        } catch (Exception e) {
-            throw new StorageException("Exception when closing a Meta Storage 
cursor", e);
-        }
 
-        return data;
+            return data;
+        }, threadPool);
     }
 
     /** {@inheritDoc} */
     @Override
-    public Serializable readLatest(String key) throws StorageException {
-        try {
-            Entry entry = metaStorageMgr.get(new ByteArray(DISTRIBUTED_PREFIX 
+ key)).join();
+    public CompletableFuture<Serializable> readLatest(String key) {
+        return metaStorageMgr.get(new ByteArray(DISTRIBUTED_PREFIX + key))
+                .thenApply(entry -> {
+                    byte[] value = entry.value();
 
-            return entry.value() == null ? null : 
ConfigurationSerializationUtil.fromBytes(entry.value());
-        } catch (Exception e) {
-            throw new StorageException("Exception while reading data from Meta 
Storage", e);

Review Comment:
   Should exception (if occurs) be wrapped in `StorageException` in the new 
code as well?



##########
modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java:
##########
@@ -54,9 +56,9 @@ public class PersistentVaultService implements VaultService {
         RocksDB.loadLibrary();
     }
 
-    private final ExecutorService threadPool = Executors.newFixedThreadPool(2);
+    private final ExecutorService threadPool = 
Executors.newCachedThreadPool(new NamedThreadFactory("vault"));

Review Comment:
   One more unbounded pool



##########
modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java:
##########
@@ -94,59 +92,70 @@ public void start() {
                                 .setFilterPolicy(new BloomFilter(10, false))
                                 .setOptimizeFiltersForMemory(true)
                 );
-
-        try {
-            db = RocksDB.open(options, path.toString());
-        } catch (RocksDBException e) {
-            throw new IgniteInternalException(e);
-        }
     }
 
-    /** {@inheritDoc} */
     @Override
-    public void stop() {
-        // TODO: IGNITE-15161 Implement component's stop.
+    public void start() {
         try {
-            close();
+            db = RocksDB.open(options, path.toString());
         } catch (RocksDBException e) {
             throw new IgniteInternalException(e);
         }
     }
 
     /** {@inheritDoc} */
     @Override
-    public void close() throws RocksDBException {
-        db.syncWal();
-
+    public void close() throws Exception {
         IgniteUtils.shutdownAndAwaitTermination(threadPool, 10, 
TimeUnit.SECONDS);

Review Comment:
   `InFlightFutures` can be used to track and, on stop, cancel the futures that 
will never be resolved due to not having a chance to be executed



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java:
##########
@@ -65,58 +74,60 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
+    @Override
+    public void close() throws Exception {
+        IgniteUtils.shutdownAndAwaitTermination(threadPool, 10, 
TimeUnit.SECONDS);

Review Comment:
   `InFlightFutures` can be used to track and, on stop, cancel the futures that 
will never be resolved due to not having a chance to be executed



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java:
##########
@@ -111,88 +120,95 @@ public DistributedConfigurationStorage(MetaStorageManager 
metaStorageMgr, VaultM
         this.vaultMgr = vaultMgr;
     }
 
+    @Override
+    public void close() throws Exception {
+        IgniteUtils.shutdownAndAwaitTermination(threadPool, 10, 
TimeUnit.SECONDS);

Review Comment:
   `InFlightFutures` can be used to track and, on stop, cancel the futures that 
will never be resolved due to not having a chance to be executed



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java:
##########
@@ -693,7 +693,12 @@ public synchronized CompletableFuture<Void> 
unregisterWatch(long id) {
         }
 
         try {
-            return new CursorWrapper<>(metaStorageSvcFut.thenApply(svc -> 
svc.range(keyFrom, keyTo, appliedRevision())));
+            CompletableFuture<Cursor<Entry>> cursor = 
metaStorageSvcFut.thenCombine(

Review Comment:
   `cursorFuture`?



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java:
##########
@@ -56,6 +63,8 @@ public class LocalConfigurationStorage implements 
ConfigurationStorage {
     /** End key in range for searching local configuration keys. */
     private static final ByteArray LOC_KEYS_END_RANGE = 
ByteArray.fromString(incrementLastChar(LOC_PREFIX));
 
+    private final ExecutorService threadPool = 
Executors.newCachedThreadPool(new NamedThreadFactory("loc-cfg"));

Review Comment:
   Again, an unbounded thread pool: should we set some limit (like 10 for now)?



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java:
##########
@@ -99,6 +106,8 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
      */
     private final AtomicLong changeId = new AtomicLong(0L);
 
+    private final ExecutorService threadPool = 
Executors.newCachedThreadPool(new NamedThreadFactory("dst-cfg"));

Review Comment:
   I wonder if it's ok to have an unbounded thread pool. Tasks submitted to it 
should be pretty short-lived, but it still a bit scary to have a thread pool 
which has not limit on the number of threads. How about introducing 10 for 
starters?



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java:
##########
@@ -65,58 +74,60 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
+    @Override
+    public void close() throws Exception {
+        IgniteUtils.shutdownAndAwaitTermination(threadPool, 10, 
TimeUnit.SECONDS);
+    }
+
     /** {@inheritDoc} */
     @Override
-    public synchronized Map<String, ? extends Serializable> 
readAllLatest(String prefix) {
+    public CompletableFuture<Map<String, ? extends Serializable>> 
readAllLatest(String prefix) {
         var rangeStart = new ByteArray(LOC_PREFIX + prefix);
 
         var rangeEnd = new ByteArray(incrementLastChar(LOC_PREFIX + prefix));
 
-        return readAll(rangeStart, rangeEnd).values();
+        return readAll(rangeStart, rangeEnd).thenApply(Data::values);
     }
 
     /** {@inheritDoc} */
     @Override
-    public synchronized Serializable readLatest(String key) throws 
StorageException {
-        try {
-            VaultEntry vaultEntry = vaultMgr.get(new ByteArray(LOC_PREFIX + 
key)).join();
-
-            return vaultEntry.empty() ? null : 
ConfigurationSerializationUtil.fromBytes(vaultEntry.value());
-        } catch (Exception e) {
-            throw new StorageException("Exception while reading vault entry", 
e);

Review Comment:
   The new code does not wrap an Exception with a StorageException anymore, is 
this ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to