tkalkirill commented on code in PR #3189:
URL: https://github.com/apache/ignite-3/pull/3189#discussion_r1483897277
##########
modules/vault/src/main/java/org/apache/ignite/internal/vault/persistence/PersistentVaultService.java:
##########
@@ -112,59 +93,44 @@ public void start() {
}
}
- /** {@inheritDoc} */
@Override
public void close() {
- IgniteUtils.shutdownAndAwaitTermination(threadPool, 10,
TimeUnit.SECONDS);
-
- futureTracker.cancelInFlightFutures();
-
- RocksUtils.closeAll(options, db);
+ RocksUtils.closeAll(db, options);
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<VaultEntry> get(ByteArray key) {
- return supplyAsync(() -> {
- try {
- byte[] value = db.get(key.bytes());
-
- return value == null ? null : new VaultEntry(key, value);
- } catch (RocksDBException e) {
- throw new IgniteInternalException("Unable to read data from
RocksDB", e);
- }
- });
+ public VaultEntry get(ByteArray key) {
Review Comment:
```suggestion
public @Nullable VaultEntry get(ByteArray key) {
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java:
##########
@@ -117,50 +121,47 @@ public LowWatermark(
/**
* Starts the watermark manager.
*/
- public void start() {
- inBusyLock(busyLock, () -> {
- vaultManager.get(LOW_WATERMARK_VAULT_KEY)
- .thenCompose(vaultEntry -> inBusyLock(busyLock, () -> {
- if (vaultEntry == null) {
- scheduleUpdateLowWatermarkBusy();
-
- return nullCompletedFuture();
- }
-
- HybridTimestamp lowWatermark =
ByteUtils.fromBytes(vaultEntry.value());
-
- return txManager.updateLowWatermark(lowWatermark)
- .thenApply(unused -> {
- this.lowWatermark.set(lowWatermark);
-
-
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
-
- return lowWatermark;
- });
- }))
- .whenComplete((lowWatermark, throwable) -> {
- if (throwable != null) {
- if (!(throwable instanceof NodeStoppingException))
{
- LOG.error("Error getting low watermark",
throwable);
+ public CompletableFuture<Void> start() {
+ return inBusyLockAsync(busyLock, () -> readLowWatermarkFromVault()
+ .thenCompose(lowWatermark -> inBusyLock(busyLock, () -> {
+ if (lowWatermark == null) {
+ LOG.info("Previous value of the low watermark was not
found, will schedule to update it");
+
+ scheduleUpdateLowWatermarkBusy();
+
+ return nullCompletedFuture();
+ }
+
+ LOG.info(
+ "Low watermark has been successfully retrieved
from the vault and is scheduled to be updated: {}",
+ lowWatermark
+ );
+
+ return txManager.updateLowWatermark(lowWatermark)
+ .thenRun(() -> inBusyLock(busyLock, () -> {
+ this.lowWatermark = lowWatermark;
+
+
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+ }));
+ }))
+ .whenComplete((unused, throwable) -> {
+ if (throwable != null && !(throwable instanceof
NodeStoppingException)) {
+ LOG.error("Error during the Watermark manager start",
throwable);
+
+ failureProcessor.process(new
FailureContext(CRITICAL_ERROR, throwable));
+
+ inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
Review Comment:
Why schedule if the node is likely to die?
##########
modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java:
##########
@@ -37,85 +36,58 @@
*/
public class InMemoryVaultService implements VaultService {
/** Map to store values. */
- private final NavigableMap<ByteArray, byte[]> storage = new TreeMap<>();
-
- /** Mutex. */
- private final Object mux = new Object();
+ private final NavigableMap<ByteArray, byte[]> storage =
synchronizedNavigableMap(new TreeMap<>());
@Override
public void start() {
// No-op.
}
- /** {@inheritDoc} */
@Override
public void close() {
// No-op.
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<VaultEntry> get(ByteArray key) {
- return supplyAsync(() -> {
- synchronized (mux) {
- byte[] value = storage.get(key);
+ public VaultEntry get(ByteArray key) {
+ byte[] value = storage.get(key);
- return value == null ? null : new VaultEntry(key,
storage.get(key));
- }
- });
+ return value == null ? null : new VaultEntry(key, value);
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val) {
- return runAsync(() -> {
- synchronized (mux) {
- storage.put(key, val);
- }
- });
+ public void put(ByteArray key, byte @Nullable [] val) {
+ storage.put(key, val);
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<Void> remove(ByteArray key) {
- return runAsync(() -> {
- synchronized (mux) {
- storage.remove(key);
- }
- });
+ public void remove(ByteArray key) {
+ storage.remove(key);
}
- /** {@inheritDoc} */
@Override
public Cursor<VaultEntry> range(ByteArray fromKey, ByteArray toKey) {
- Iterator<VaultEntry> it;
-
if (fromKey.compareTo(toKey) >= 0) {
- it = Collections.emptyIterator();
- } else {
- synchronized (mux) {
- it = storage.subMap(fromKey, toKey).entrySet().stream()
- .map(e -> new VaultEntry(new ByteArray(e.getKey()),
e.getValue()))
- .iterator();
- }
+ return CursorUtils.emptyCursor();
}
- return Cursor.fromBareIterator(it);
+ synchronized (storage) {
+ return storage.subMap(fromKey, toKey).entrySet().stream()
+ .map(e -> new VaultEntry(new ByteArray(e.getKey()),
e.getValue()))
+ .collect(collectingAndThen(toList(),
Cursor::fromIterable));
+ }
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) {
- return runAsync(() -> {
- synchronized (mux) {
- for (var entry : vals.entrySet()) {
- if (entry.getValue() == null) {
- storage.remove(entry.getKey());
- } else {
- storage.put(entry.getKey(), entry.getValue());
- }
+ public void putAll(Map<ByteArray, byte[]> vals) {
+ synchronized (storage) {
Review Comment:
same
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java:
##########
@@ -117,50 +121,47 @@ public LowWatermark(
/**
* Starts the watermark manager.
*/
- public void start() {
- inBusyLock(busyLock, () -> {
- vaultManager.get(LOW_WATERMARK_VAULT_KEY)
- .thenCompose(vaultEntry -> inBusyLock(busyLock, () -> {
- if (vaultEntry == null) {
- scheduleUpdateLowWatermarkBusy();
-
- return nullCompletedFuture();
- }
-
- HybridTimestamp lowWatermark =
ByteUtils.fromBytes(vaultEntry.value());
-
- return txManager.updateLowWatermark(lowWatermark)
- .thenApply(unused -> {
- this.lowWatermark.set(lowWatermark);
-
-
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
-
- return lowWatermark;
- });
- }))
- .whenComplete((lowWatermark, throwable) -> {
- if (throwable != null) {
- if (!(throwable instanceof NodeStoppingException))
{
- LOG.error("Error getting low watermark",
throwable);
+ public CompletableFuture<Void> start() {
Review Comment:
Why asI don't see the need for asynchronous code.
##########
modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultService.java:
##########
@@ -37,9 +36,10 @@ public interface VaultService extends ManuallyCloseable {
* Retrieves an entry for the given key.
*
* @param key Key. Cannot be {@code null}.
- * @return Future that resolves into an entry for the given key, or {@code
null} no such mapping exists.
+ * @return Entry for the given key, or {@code null} no such mapping exists.
*/
- CompletableFuture<VaultEntry> get(ByteArray key);
+ @Nullable
+ VaultEntry get(ByteArray key);
Review Comment:
```suggestion
@Nullable VaultEntry get(ByteArray key);
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java:
##########
@@ -117,50 +121,47 @@ public LowWatermark(
/**
* Starts the watermark manager.
*/
- public void start() {
- inBusyLock(busyLock, () -> {
- vaultManager.get(LOW_WATERMARK_VAULT_KEY)
- .thenCompose(vaultEntry -> inBusyLock(busyLock, () -> {
- if (vaultEntry == null) {
- scheduleUpdateLowWatermarkBusy();
-
- return nullCompletedFuture();
- }
-
- HybridTimestamp lowWatermark =
ByteUtils.fromBytes(vaultEntry.value());
-
- return txManager.updateLowWatermark(lowWatermark)
- .thenApply(unused -> {
- this.lowWatermark.set(lowWatermark);
-
-
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
-
- return lowWatermark;
- });
- }))
- .whenComplete((lowWatermark, throwable) -> {
- if (throwable != null) {
- if (!(throwable instanceof NodeStoppingException))
{
- LOG.error("Error getting low watermark",
throwable);
+ public CompletableFuture<Void> start() {
+ return inBusyLockAsync(busyLock, () -> readLowWatermarkFromVault()
+ .thenCompose(lowWatermark -> inBusyLock(busyLock, () -> {
+ if (lowWatermark == null) {
+ LOG.info("Previous value of the low watermark was not
found, will schedule to update it");
+
+ scheduleUpdateLowWatermarkBusy();
+
+ return nullCompletedFuture();
+ }
+
+ LOG.info(
+ "Low watermark has been successfully retrieved
from the vault and is scheduled to be updated: {}",
+ lowWatermark
+ );
+
+ return txManager.updateLowWatermark(lowWatermark)
+ .thenRun(() -> inBusyLock(busyLock, () -> {
+ this.lowWatermark = lowWatermark;
+
+
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
+ }));
+ }))
+ .whenComplete((unused, throwable) -> {
+ if (throwable != null && !(throwable instanceof
NodeStoppingException)) {
+ LOG.error("Error during the Watermark manager start",
throwable);
+
+ failureProcessor.process(new
FailureContext(CRITICAL_ERROR, throwable));
+
+ inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
+ }
+ })
+ );
+ }
- failureProcessor.process(new
FailureContext(CRITICAL_ERROR, throwable));
+ private CompletableFuture<HybridTimestamp> readLowWatermarkFromVault() {
Review Comment:
Why a complex asynchronous call?
It is enough to simply read the key from the vault.
##########
modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java:
##########
@@ -65,9 +63,10 @@ public void stop() {
* See {@link VaultService#get}.
*
* @param key Key. Cannot be {@code null}.
- * @return Future that resolves into an entry for the given key, or {@code
null} if no such mapping exists.
+ * @return Entry for the given key, or {@code null} if no such mapping
exists.
*/
- public CompletableFuture<VaultEntry> get(ByteArray key) {
+ @Nullable
+ public VaultEntry get(ByteArray key) {
Review Comment:
```suggestion
public @Nullable VaultEntry get(ByteArray key) {
```
##########
modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java:
##########
@@ -37,85 +36,58 @@
*/
public class InMemoryVaultService implements VaultService {
/** Map to store values. */
- private final NavigableMap<ByteArray, byte[]> storage = new TreeMap<>();
-
- /** Mutex. */
- private final Object mux = new Object();
+ private final NavigableMap<ByteArray, byte[]> storage =
synchronizedNavigableMap(new TreeMap<>());
@Override
public void start() {
// No-op.
}
- /** {@inheritDoc} */
@Override
public void close() {
// No-op.
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<VaultEntry> get(ByteArray key) {
- return supplyAsync(() -> {
- synchronized (mux) {
- byte[] value = storage.get(key);
+ public VaultEntry get(ByteArray key) {
Review Comment:
```suggestion
public @Nullable VaultEntry get(ByteArray key) {
```
##########
modules/vault/src/testFixtures/java/org/apache/ignite/internal/vault/inmemory/InMemoryVaultService.java:
##########
@@ -37,85 +36,58 @@
*/
public class InMemoryVaultService implements VaultService {
/** Map to store values. */
- private final NavigableMap<ByteArray, byte[]> storage = new TreeMap<>();
-
- /** Mutex. */
- private final Object mux = new Object();
+ private final NavigableMap<ByteArray, byte[]> storage =
synchronizedNavigableMap(new TreeMap<>());
@Override
public void start() {
// No-op.
}
- /** {@inheritDoc} */
@Override
public void close() {
// No-op.
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<VaultEntry> get(ByteArray key) {
- return supplyAsync(() -> {
- synchronized (mux) {
- byte[] value = storage.get(key);
+ public VaultEntry get(ByteArray key) {
+ byte[] value = storage.get(key);
- return value == null ? null : new VaultEntry(key,
storage.get(key));
- }
- });
+ return value == null ? null : new VaultEntry(key, value);
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<Void> put(ByteArray key, byte @Nullable [] val) {
- return runAsync(() -> {
- synchronized (mux) {
- storage.put(key, val);
- }
- });
+ public void put(ByteArray key, byte @Nullable [] val) {
+ storage.put(key, val);
}
- /** {@inheritDoc} */
@Override
- public CompletableFuture<Void> remove(ByteArray key) {
- return runAsync(() -> {
- synchronized (mux) {
- storage.remove(key);
- }
- });
+ public void remove(ByteArray key) {
+ storage.remove(key);
}
- /** {@inheritDoc} */
@Override
public Cursor<VaultEntry> range(ByteArray fromKey, ByteArray toKey) {
- Iterator<VaultEntry> it;
-
if (fromKey.compareTo(toKey) >= 0) {
- it = Collections.emptyIterator();
- } else {
- synchronized (mux) {
- it = storage.subMap(fromKey, toKey).entrySet().stream()
- .map(e -> new VaultEntry(new ByteArray(e.getKey()),
e.getValue()))
- .iterator();
- }
+ return CursorUtils.emptyCursor();
}
- return Cursor.fromBareIterator(it);
+ synchronized (storage) {
Review Comment:
It is not similar code; `Collections#synchronizedNavigableMap` has its own
mutex inside.
You may end up with parallel modifications from put/remove, for example.
I think we need to return the previous code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]