sashapolo commented on code in PR #1579:
URL: https://github.com/apache/ignite-3/pull/1579#discussion_r1087534666
##########
modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java:
##########
@@ -908,6 +909,32 @@ public static <T> List<T> collectStaticFields(Class<?>
sourceCls, Class<? extend
return result;
}
+ /**
+ * Cancels the future and runs a consumer on future's result if it was
completed before the cancellation.
+ * Does nothing if future is cancelled or completed exceptionally.
+ *
+ * @param future Future.
+ * @param consumer Consumer that accepts future's result.
+ * @param <T> Future's result type.
+ */
+ public static <T> void cancelOrConsume(CompletableFuture<T> future,
Consumer<T> consumer) {
+ assert future != null;
Review Comment:
Why do you need this assertion?
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -1224,6 +1224,8 @@ private void replayUpdates(long upperRevision) {
long minWatchRevision = watchProcessor.minWatchRevision().orElse(-1);
if (minWatchRevision == -1 || minWatchRevision > upperRevision) {
+ // No events to replay, we can start processing more recent events
from the event queue.
+ startWatchExecutor();
Review Comment:
Can you please add a test for this in the `AbstractKeyValueStorageTest`?
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -217,20 +217,24 @@ private Replica startReplicaInternal(
ReplicationGroupId replicaGrpId,
ReplicaListener listener
) {
+ Replica newReplica = new Replica(replicaGrpId, listener);
+
replicas.compute(replicaGrpId, (replicationGroupId, replicaFut) -> {
if (replicaFut == null) {
- replicaFut = CompletableFuture.completedFuture(new
Replica(replicaGrpId, listener));
-
- return replicaFut;
+ return CompletableFuture.completedFuture(newReplica);
} else {
- replicaFut.complete(new Replica(replicaGrpId, listener));
+ if (replicaFut.isDone() && !replicaFut.isCancelled() &&
!replicaFut.isCompletedExceptionally()) {
Review Comment:
When can this happen? Are you sure we can simply ignore the previous replica
value?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java:
##########
@@ -679,6 +694,14 @@ private Peer randomNode(@Nullable Peer excludedPeer) {
int lastPeerIndex = excludedPeer == null ? -1 :
peers0.indexOf(excludedPeer);
+ if (peers0.size() == 1) {
+ if (lastPeerIndex != -1) {
Review Comment:
Why do we even need to check this if the peer list has a single element?
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -217,20 +217,24 @@ private Replica startReplicaInternal(
ReplicationGroupId replicaGrpId,
ReplicaListener listener
) {
+ Replica newReplica = new Replica(replicaGrpId, listener);
+
replicas.compute(replicaGrpId, (replicationGroupId, replicaFut) -> {
if (replicaFut == null) {
- replicaFut = CompletableFuture.completedFuture(new
Replica(replicaGrpId, listener));
-
- return replicaFut;
+ return CompletableFuture.completedFuture(newReplica);
} else {
- replicaFut.complete(new Replica(replicaGrpId, listener));
+ if (replicaFut.isDone() && !replicaFut.isCancelled() &&
!replicaFut.isCompletedExceptionally()) {
+ return CompletableFuture.completedFuture(newReplica);
+ }
+
+ replicaFut.complete(newReplica);
return replicaFut;
}
});
// replicaFut is always completed here.
Review Comment:
This comment is no longer relevant
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]