sashapolo commented on a change in pull request #160:
URL: https://github.com/apache/ignite-3/pull/160#discussion_r647570165
##########
File path:
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -69,6 +70,15 @@
/** Meta storage raft group name. */
private static final String METASTORAGE_RAFT_GROUP_NAME =
"metastorage_raft_group";
+ /** Prefix that we add to configuration keys to distinguish them in meta
storage. Must end with dot. */
Review comment:
```suggestion
/** Prefix added to configuration keys to distinguish them in the meta
storage. Must end with a dot. */
```
##########
File path:
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -161,14 +176,22 @@ public MetaStorageManager(
public synchronized void deployWatches() {
try {
var watch = watchAggregator.watch(
- vaultMgr.appliedRevision() + 1,
+ appliedRevision() + 1,
this::storeEntries
);
if (watch.isEmpty())
deployFut.complete(Optional.empty());
- else
- dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id
-> deployFut.complete(Optional.of(id))).join();
+ else {
+ CompletableFuture<Void> fut =
+
dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id ->
deployFut.complete(Optional.of(id)));
+
+ if (metaStorageNodesOnStart)
+ fut.join();
Review comment:
Is it ok to wait for the futures forever? Should we use a version with a
timeout? This pattern is repeated across the whole code of this class....
##########
File path:
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -80,45 +83,45 @@
/**
* Constructor.
*
- * @param metaStorageMgr MetaStorage Manager.
+ * @param metaStorageMgr Meta storage manager.
+ * @param vaultMgr Vault manager.
*/
- public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
+ public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr,
VaultManager vaultMgr) {
this.metaStorageMgr = metaStorageMgr;
+
+ this.vaultMgr = vaultMgr;
}
/** {@inheritDoc} */
@Override public synchronized Data readAll() throws StorageException {
HashMap<String, Serializable> data = new HashMap<>();
- Iterator<Entry> entries = allDistributedConfigKeys().iterator();
+ Iterator<org.apache.ignite.internal.vault.common.Entry> entries =
storedDistributedConfigKeys();
- long maxRevision = 0L;
+ long appliedRevision = 0L;
if (!entries.hasNext())
return new Data(data, ver.get());
- Entry entryForMasterKey = entries.next();
-
- // First key must be the masterKey because it's supposed to be the
first in lexicographical order
- assert entryForMasterKey.key().equals(MASTER_KEY);
-
while (entries.hasNext()) {
- Entry entry = entries.next();
+ var entry = entries.next();
-
data.put(entry.key().toString().substring((DISTRIBUTED_PREFIX).length()),
(Serializable)ByteUtils.fromBytes(entry.value()));
+ if (entry.key().equals(MetaStorageManager.APPLIED_REV)) {
+ appliedRevision =
ByteUtils.bytesToLong(Objects.requireNonNull(entry.value()), 0);
Review comment:
I think it might be a good idea to introduce the `bytesToLong(byte[])`
method
##########
File path:
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -235,17 +238,17 @@ public DistributedConfigurationStorage(MetaStorageManager
metaStorageMgr) {
}
/**
- * Method that returns all distributed configuration keys from meta
storage filtered out by the current applied
- * revision as an upper bound. Applied revision is a revision of the last
successful vault update.
+ * Method that returns all distributed configuration keys from meta
storage that were stored in vault filtered out by the
Review comment:
```suggestion
* Method that returns all distributed configuration keys from the meta
storage that were stored in the vault filtered out by the
```
##########
File path:
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -69,6 +70,15 @@
/** Meta storage raft group name. */
private static final String METASTORAGE_RAFT_GROUP_NAME =
"metastorage_raft_group";
+ /** Prefix that we add to configuration keys to distinguish them in meta
storage. Must end with dot. */
+ public static final String DISTRIBUTED_PREFIX = "dst-cfg.";
+
+ /**
+ * Special key for vault where applied revision for {@link
MetaStorageManager#storeEntries(Collection, long)}
Review comment:
```suggestion
* Special key for the vault where the applied revision for {@link
MetaStorageManager#storeEntries(Collection, long)}
```
##########
File path:
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -80,45 +83,45 @@
/**
* Constructor.
*
- * @param metaStorageMgr MetaStorage Manager.
+ * @param metaStorageMgr Meta storage manager.
+ * @param vaultMgr Vault manager.
*/
- public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
+ public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr,
VaultManager vaultMgr) {
this.metaStorageMgr = metaStorageMgr;
+
+ this.vaultMgr = vaultMgr;
}
/** {@inheritDoc} */
@Override public synchronized Data readAll() throws StorageException {
HashMap<String, Serializable> data = new HashMap<>();
- Iterator<Entry> entries = allDistributedConfigKeys().iterator();
+ Iterator<org.apache.ignite.internal.vault.common.Entry> entries =
storedDistributedConfigKeys();
- long maxRevision = 0L;
+ long appliedRevision = 0L;
if (!entries.hasNext())
return new Data(data, ver.get());
- Entry entryForMasterKey = entries.next();
-
- // First key must be the masterKey because it's supposed to be the
first in lexicographical order
- assert entryForMasterKey.key().equals(MASTER_KEY);
-
while (entries.hasNext()) {
- Entry entry = entries.next();
+ var entry = entries.next();
-
data.put(entry.key().toString().substring((DISTRIBUTED_PREFIX).length()),
(Serializable)ByteUtils.fromBytes(entry.value()));
+ if (entry.key().equals(MetaStorageManager.APPLIED_REV)) {
+ appliedRevision =
ByteUtils.bytesToLong(Objects.requireNonNull(entry.value()), 0);
- // Move to stream
- if (maxRevision < entry.revision())
- maxRevision = entry.revision();
+ continue;
+ }
+
data.put(entry.key().toString().substring((DISTRIBUTED_PREFIX).length()),
(Serializable)ByteUtils.fromBytes(entry.value()));
Review comment:
this line is too long and contains a lot of stuff. Can you extract some
intermediate variables?
##########
File path:
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -235,17 +238,17 @@ public DistributedConfigurationStorage(MetaStorageManager
metaStorageMgr) {
}
/**
- * Method that returns all distributed configuration keys from meta
storage filtered out by the current applied
- * revision as an upper bound. Applied revision is a revision of the last
successful vault update.
+ * Method that returns all distributed configuration keys from meta
storage that were stored in vault filtered out by the
+ * current applied revision as an upper bound. Applied revision is a
revision of the last successful vault update.
* <p>
* This is possible to distinguish cfg keys from meta storage because we
add special prefix {@link
Review comment:
```suggestion
* This is possible to distinguish cfg keys from meta storage because we
add a special prefix {@link
```
##########
File path:
modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
##########
@@ -37,24 +36,21 @@
* and providing interface for managing local keys.
*/
public class VaultManager {
- /** Special key for vault where applied revision for {@code putAll}
operation is stored. */
- private static ByteArray APPLIED_REV =
ByteArray.fromString("applied_revision");
-
/** Special key, which reserved for storing the name of the current node.
*/
private static final ByteArray NODE_NAME =
ByteArray.fromString("node_name");
/** Mutex. */
private final Object mux = new Object();
/** Instance of vault */
- private VaultService vaultService;
+ private VaultService vaultSvc;
Review comment:
Are the abbreviation rules from Ignite 2 applicable to Ignite 3? I
personally find `vaultService` to be a much better name
##########
File path:
modules/vault/src/test/java/org/apache/ignite/internal/vault/impl/VaultBaseContractsTest.java
##########
@@ -147,52 +142,44 @@ public void range() throws ExecutionException,
InterruptedException {
}
/**
- * watch contract.
+ * putAll with applied revision contract.
*/
@Test
- public void watch() throws ExecutionException, InterruptedException {
- ByteArray key;
-
- Map<ByteArray, byte[]> values = new HashMap<>();
-
- for (int i = 0; i < 10; i++) {
- key = getKey(i);
+ public void putAllAndRevision() throws ExecutionException,
InterruptedException, IgniteInternalCheckedException {
+ Map<ByteArray, byte[]> entries = new HashMap<>();
- values.put(key, getValue(key, i));
- }
+ int entriesNum = 100;
- values.forEach((k, v) -> vaultManager.put(k, v));
+ ByteArray appRevKey = ByteArray.fromString("test_applied_revision");
- for (Map.Entry<ByteArray, byte[]> entry : values.entrySet())
- assertEquals(entry.getValue(),
vaultManager.get(entry.getKey()).get().value());
+ for (int i = 0; i < entriesNum; i++) {
+ ByteArray key = getKey(i);
- CountDownLatch counter = new CountDownLatch(4);
+ entries.put(key, getValue(key, i));
+ }
- VaultWatch vaultWatch = new VaultWatch(getKey(3), getKey(7), new
VaultListener() {
- @Override public boolean onUpdate(@NotNull Iterable<Entry>
entries) {
- counter.countDown();
+ for (int i = 0; i < entriesNum; i++) {
+ ByteArray key = getKey(i);
- return true;
- }
+ assertNull(vaultManager.get(key).get().value());
+ }
- @Override public void onError(@NotNull Throwable e) {
- // no-op
- }
- });
+ vaultManager.putAll(entries, appRevKey, 1L);
- vaultManager.watch(vaultWatch);
+ for (int i = 0; i < entriesNum; i++) {
+ ByteArray key = getKey(i);
- for (int i = 3; i < 7; i++)
- vaultManager.put(getKey(i), ("new" + i).getBytes());
+ assertEquals(entries.get(key),
vaultManager.get(key).get().value());
Review comment:
This should probably be `assertArrayEquals` in case you want to compare
the array contents, otherwise use `assertSame` (note: this is applicable to
several places in this class)
##########
File path:
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -161,14 +176,22 @@ public MetaStorageManager(
public synchronized void deployWatches() {
try {
var watch = watchAggregator.watch(
- vaultMgr.appliedRevision() + 1,
+ appliedRevision() + 1,
this::storeEntries
);
if (watch.isEmpty())
deployFut.complete(Optional.empty());
- else
- dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id
-> deployFut.complete(Optional.of(id))).join();
+ else {
+ CompletableFuture<Void> fut =
+
dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id ->
deployFut.complete(Optional.of(id)));
Review comment:
should we fail the `deployFut` if `fut` fails?
##########
File path:
modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -235,17 +238,17 @@ public DistributedConfigurationStorage(MetaStorageManager
metaStorageMgr) {
}
/**
- * Method that returns all distributed configuration keys from meta
storage filtered out by the current applied
- * revision as an upper bound. Applied revision is a revision of the last
successful vault update.
+ * Method that returns all distributed configuration keys from meta
storage that were stored in vault filtered out by the
+ * current applied revision as an upper bound. Applied revision is a
revision of the last successful vault update.
* <p>
* This is possible to distinguish cfg keys from meta storage because we
add special prefix {@link
- * DistributedConfigurationStorage#DISTRIBUTED_PREFIX} to all
configuration keys that we put to meta storage.
+ * MetaStorageManager#DISTRIBUTED_PREFIX} to all configuration keys that
we put to meta storage.
Review comment:
```suggestion
* MetaStorageManager#DISTRIBUTED_PREFIX} to all configuration keys that
we put to the meta storage.
```
##########
File path:
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -414,6 +437,23 @@ public synchronized void deployWatches() {
return metaStorageSvcFut.thenCompose(MetaStorageService::compact);
}
+ /**
+ * @return Applied revision for {@link VaultManager#putAll(Map, ByteArray,
long)} operation.
+ * @throws IgniteInternalCheckedException If couldn't get applied revision
from vault.
+ */
+ @NotNull private Long appliedRevision() throws
IgniteInternalCheckedException {
Review comment:
why not `long`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]