denis-chudov commented on code in PR #2813:
URL: https://github.com/apache/ignite-3/pull/2813#discussion_r1386398224
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -367,10 +367,14 @@ private void
onPlacementDriverMessageReceived(NetworkMessage msg0, String sender
return;
}
+ assert correlationId != null;
+
var msg = (PlacementDriverReplicaMessage) msg0;
if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ LOG.info("Skipping a Placement Driver message, because the node is
stopping");
Review Comment:
Would be nice to log the message itself, also dont forget the dot in the end
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -220,10 +223,14 @@ private void onReplicaMessageReceived(NetworkMessage
message, String senderConsi
return;
}
+ assert correlationId != null;
+
ReplicaRequest request = (ReplicaRequest) message;
if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ LOG.info("Skipping a Replica message, because the node is
stopping");
Review Comment:
Would be nice to log the message itself, also dont forget the dot in the end
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -510,37 +532,57 @@ public CompletableFuture<Boolean>
stopReplica(ReplicationGroupId replicaGrpId) t
* Internal method for stopping a replica.
*
* @param replicaGrpId Replication group id.
+ * @param causalityToken Causality token.
* @return True if the replica is found and closed, false otherwise.
*/
- private CompletableFuture<Boolean> stopReplicaInternal(ReplicationGroupId
replicaGrpId) {
- CompletableFuture<Replica> removed = replicas.remove(replicaGrpId);
-
- if (removed != null) {
- if (!removed.isDone()) {
- removed.completeExceptionally(new ReplicaStoppingException(
- replicaGrpId,
- clusterNetSvc.topologyService().localMember()
- ));
+ private CompletableFuture<Boolean> stopReplicaInternal(ReplicationGroupId
replicaGrpId, long causalityToken) {
+ var isRemovedFuture = new CompletableFuture<Boolean>();
+
+ var eventParams = new LocalReplicaEventParameters(causalityToken,
replicaGrpId);
+
+ fireEvent(BEFORE_REPLICA_STOPPED, eventParams).whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.error("Error when notifying about BEFORE_REPLICA_STOPPED
event", e);
Review Comment:
```suggestion
LOG.error("Error when notifying about BEFORE_REPLICA_STOPPED
event.", e);
```
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -472,35 +481,48 @@ private void startReplicaInternal(
placementDriver
);
- replicas.compute(replicaGrpId, (replicationGroupId, replicaFut) -> {
- if (replicaFut == null) {
+ CompletableFuture<Replica> replicaFuture =
replicas.compute(replicaGrpId, (k, existingReplicaFuture) -> {
+ if (existingReplicaFuture == null ||
existingReplicaFuture.isDone()) {
+ assert existingReplicaFuture == null ||
isCompletedSuccessfully(existingReplicaFuture);
+
return completedFuture(newReplica);
} else {
- if (replicaFut.isDone() && !replicaFut.isCancelled() &&
!replicaFut.isCompletedExceptionally()) {
- return completedFuture(newReplica);
- }
-
- replicaFut.complete(newReplica);
+ existingReplicaFuture.complete(newReplica);
- return replicaFut;
+ return existingReplicaFuture;
}
});
+
+ var eventParams = new LocalReplicaEventParameters(causalityToken,
replicaGrpId);
+
+ return fireEvent(AFTER_REPLICA_STARTED, eventParams)
+ .exceptionally(e -> {
+ LOG.error("Error when notifying about
AFTER_REPLICA_STARTED event", e);
Review Comment:
```suggestion
LOG.error("Error when notifying about
AFTER_REPLICA_STARTED event.", e);
```
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -510,37 +532,57 @@ public CompletableFuture<Boolean>
stopReplica(ReplicationGroupId replicaGrpId) t
* Internal method for stopping a replica.
*
* @param replicaGrpId Replication group id.
+ * @param causalityToken Causality token.
* @return True if the replica is found and closed, false otherwise.
*/
- private CompletableFuture<Boolean> stopReplicaInternal(ReplicationGroupId
replicaGrpId) {
- CompletableFuture<Replica> removed = replicas.remove(replicaGrpId);
-
- if (removed != null) {
- if (!removed.isDone()) {
- removed.completeExceptionally(new ReplicaStoppingException(
- replicaGrpId,
- clusterNetSvc.topologyService().localMember()
- ));
+ private CompletableFuture<Boolean> stopReplicaInternal(ReplicationGroupId
replicaGrpId, long causalityToken) {
+ var isRemovedFuture = new CompletableFuture<Boolean>();
+
+ var eventParams = new LocalReplicaEventParameters(causalityToken,
replicaGrpId);
+
+ fireEvent(BEFORE_REPLICA_STOPPED, eventParams).whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.error("Error when notifying about BEFORE_REPLICA_STOPPED
event", e);
}
- if (!removed.isCompletedExceptionally()) {
- return removed
- .thenCompose(Replica::shutdown)
- .handle((notUsed, throwable) -> {
- if (throwable == null) {
- return true;
- } else {
- LOG.error("Failed to stop replica
[replicaGrpId={}]", throwable, replicaGrpId);
+ if (!busyLock.enterBusy()) {
+ isRemovedFuture.completeExceptionally(new
NodeStoppingException());
- return false;
- }
- });
+ return;
}
- return completedFuture(true);
- }
+ try {
+ replicas.compute(replicaGrpId, (grpId, replicaFuture) -> {
+ if (replicaFuture == null) {
+ isRemovedFuture.complete(false);
+ } else if (!replicaFuture.isDone()) {
+ ClusterNode localMember =
clusterNetSvc.topologyService().localMember();
+
+ replicaFuture.completeExceptionally(new
ReplicaStoppingException(grpId, localMember));
- return completedFuture(false);
+ isRemovedFuture.complete(true);
+ } else if (!isCompletedSuccessfully(replicaFuture)) {
+ isRemovedFuture.complete(true);
+ } else {
+ replicaFuture
+ .thenCompose(Replica::shutdown)
+ .whenComplete((notUsed, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Failed to stop replica
[replicaGrpId={}]", throwable, grpId);
Review Comment:
```suggestion
LOG.error("Failed to stop replica
[replicaGrpId={}].", throwable, grpId);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]