sergey-chugunov-1985 commented on code in PR #12729:
URL: https://github.com/apache/ignite/pull/12729#discussion_r3208775218
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3903,6 +3977,109 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeot in
parallel.
Review Comment:
```suggestion
// We ping nodes from another DC within the same timeout in
parallel.
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3903,6 +3977,109 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeot in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
Review Comment:
We should call collect method under mux as stream may actually execute
filter operation only on collection phase.
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3903,6 +3977,109 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeot in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+ }
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoverState.timeout()) {
+ // Ensure of the ping pool release.
+ connRecoverState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoverState.remoteDcPingStarted() ||
!connRecoverState.remoteDcPingFinished()
+ || connRecoverState.unavailableDCs != null)
+ return false;
+
+ // Remote DC statuses: avlive or not, Dc id -> true/false.
+ Collection<String> rmtDcIds =
connRecoverState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
+ .collect(Collectors.toSet());
+
+ Collection<String> failedDCs = U.newHashSet(rmtDcIds.size());
+
+ Map<String, Integer> availablePerDC =
countPerDC(connRecoverState.availableNodes());
+ Map<String, Integer> unavailablePerDC =
countPerDC(connRecoverState.unavailableNodes());
+
+ for (String dcId : rmtDcIds) {
+ int availCnt =
Optional.ofNullable(availablePerDC.get(dcId)).orElse(0);
+ int unavailCnt =
Optional.ofNullable(unavailablePerDC.get(dcId)).orElse(0);
+
+ if (availCnt <= unavailCnt)
+ failedDCs.add(dcId);
+ }
+
+ String msg = "During the connection recovery, nodes ping of DCs '"
+ String.join(", ", rmtDcIds)
+ + "' from current corner node has finished. Responded nodes: "
+ connRecoverState.availableNodes()
Review Comment:
```suggestion
+ "' from current edge node has finished. Responded nodes: "
+ connRecoverState.availableNodes()
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3903,6 +3977,109 @@ else if (!failedNextNode && sndState != null &&
sndState.isBackward()) {
}
}
+ /** */
+ protected CrossRingMessageSendState
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+ CrossRingMessageSendState recoveryState = new
CrossRingMessageSendState();
+
+ // The corner node case. Next node is from neighbour DC. Another
DC might be entierly unavailable.
+ // To prevent sequential nodes failure in current DC we have to
guess whether we can reach neighbour DC.
+ // We ping nodes from another DC within the same timeot in
parallel.
+ if (!F.isEmpty(locNode.dataCenterId())) {
+ assert !F.isEmpty(next.dataCenterId());
+
+ if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+ Stream<TcpDiscoveryNode> otherDcsSrvrs =
ring.serverNodes().stream()
+ .filter(n ->
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+ synchronized (mux) {
+ otherDcsSrvrs = otherDcsSrvrs.filter(n ->
!failedNodes.containsKey(n));
+ }
+
+
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+ }
+ }
+
+ return recoveryState;
+ }
+
+ /** @return {@code True} if current node fails to recover the ring
connection. */
+ private boolean checkConnectionRecoveryFailed(
+ CrossRingMessageSendState connRecoverState,
+ List<TcpDiscoveryNode> failedNodes
+ ) {
+ if (connRecoverState.timeout()) {
+ // Ensure of the ping pool release.
+ connRecoverState.stopRemoteDcPing();
+
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return true;
+ }
+
+ if (!connRecoverState.remoteDcPingStarted() ||
!connRecoverState.remoteDcPingFinished()
+ || connRecoverState.unavailableDCs != null)
+ return false;
+
+ // Remote DC statuses: avlive or not, Dc id -> true/false.
Review Comment:
```suggestion
// Remote DC statuses: alive or not, Dc id -> true/false.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]