sergey-chugunov-1985 commented on code in PR #12729:
URL: https://github.com/apache/ignite/pull/12729#discussion_r3208775218


##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3903,6 +3977,109 @@ else if (!failedNextNode && sndState != null && 
sndState.isBackward()) {
             }
         }
 
+        /** */
+        protected CrossRingMessageSendState 
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+            CrossRingMessageSendState recoveryState = new 
CrossRingMessageSendState();
+
+            // The corner node case. Next node is from neighbour DC. Another 
DC might be entierly unavailable.
+            // To prevent sequential nodes failure in current DC we have to 
guess whether we can reach neighbour DC.
+            // We ping nodes from another DC within the same timeot in 
parallel.

Review Comment:
   ```suggestion
               // We ping nodes from another DC within the same timeout in 
parallel.
   ```



##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3903,6 +3977,109 @@ else if (!failedNextNode && sndState != null && 
sndState.isBackward()) {
             }
         }
 
+        /** */
+        protected CrossRingMessageSendState 
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+            CrossRingMessageSendState recoveryState = new 
CrossRingMessageSendState();
+
+            // The corner node case. Next node is from neighbour DC. Another 
DC might be entierly unavailable.
+            // To prevent sequential nodes failure in current DC we have to 
guess whether we can reach neighbour DC.
+            // We ping nodes from another DC within the same timeot in 
parallel.
+            if (!F.isEmpty(locNode.dataCenterId())) {
+                assert !F.isEmpty(next.dataCenterId());
+
+                if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+                    Stream<TcpDiscoveryNode> otherDcsSrvrs = 
ring.serverNodes().stream()
+                        .filter(n -> 
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+                    synchronized (mux) {
+                        otherDcsSrvrs = otherDcsSrvrs.filter(n -> 
!failedNodes.containsKey(n));

Review Comment:
   We should call collect method under mux as stream may actually execute 
filter operation only on collection phase.



##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3903,6 +3977,109 @@ else if (!failedNextNode && sndState != null && 
sndState.isBackward()) {
             }
         }
 
+        /** */
+        protected CrossRingMessageSendState 
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+            CrossRingMessageSendState recoveryState = new 
CrossRingMessageSendState();
+
+            // The corner node case. Next node is from neighbour DC. Another 
DC might be entierly unavailable.
+            // To prevent sequential nodes failure in current DC we have to 
guess whether we can reach neighbour DC.
+            // We ping nodes from another DC within the same timeot in 
parallel.
+            if (!F.isEmpty(locNode.dataCenterId())) {
+                assert !F.isEmpty(next.dataCenterId());
+
+                if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+                    Stream<TcpDiscoveryNode> otherDcsSrvrs = 
ring.serverNodes().stream()
+                        .filter(n -> 
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+                    synchronized (mux) {
+                        otherDcsSrvrs = otherDcsSrvrs.filter(n -> 
!failedNodes.containsKey(n));
+                    }
+
+                    
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+                }
+            }
+
+            return recoveryState;
+        }
+
+        /** @return {@code True} if current node fails to recover the ring 
connection. */
+        private boolean checkConnectionRecoveryFailed(
+            CrossRingMessageSendState connRecoverState,
+            List<TcpDiscoveryNode> failedNodes
+        ) {
+            if (connRecoverState.timeout()) {
+                // Ensure of the ping pool release.
+                connRecoverState.stopRemoteDcPing();
+
+                segmentLocalNodeOnSendFail(failedNodes);
+
+                return true;
+            }
+
+            if (!connRecoverState.remoteDcPingStarted() || 
!connRecoverState.remoteDcPingFinished()
+                || connRecoverState.unavailableDCs != null)
+                return false;
+
+            // Remote DC statuses: avlive or not, Dc id -> true/false.
+            Collection<String> rmtDcIds = 
connRecoverState.rmtDcPingRes.keySet().stream().map(ClusterNode::dataCenterId)
+                .collect(Collectors.toSet());
+
+            Collection<String> failedDCs = U.newHashSet(rmtDcIds.size());
+
+            Map<String, Integer> availablePerDC = 
countPerDC(connRecoverState.availableNodes());
+            Map<String, Integer> unavailablePerDC = 
countPerDC(connRecoverState.unavailableNodes());
+
+            for (String dcId : rmtDcIds) {
+                int availCnt = 
Optional.ofNullable(availablePerDC.get(dcId)).orElse(0);
+                int unavailCnt = 
Optional.ofNullable(unavailablePerDC.get(dcId)).orElse(0);
+
+                if (availCnt <= unavailCnt)
+                    failedDCs.add(dcId);
+            }
+
+            String msg = "During the connection recovery, nodes ping of DCs '" 
+ String.join(", ", rmtDcIds)
+                + "' from current corner node has finished. Responded nodes: " 
+ connRecoverState.availableNodes()

Review Comment:
   ```suggestion
                   + "' from current edge node has finished. Responded nodes: " 
+ connRecoverState.availableNodes()
   ```



##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3903,6 +3977,109 @@ else if (!failedNextNode && sndState != null && 
sndState.isBackward()) {
             }
         }
 
+        /** */
+        protected CrossRingMessageSendState 
createConnectionRecoveryState(TcpDiscoveryNode newNextNode) {
+            CrossRingMessageSendState recoveryState = new 
CrossRingMessageSendState();
+
+            // The corner node case. Next node is from neighbour DC. Another 
DC might be entierly unavailable.
+            // To prevent sequential nodes failure in current DC we have to 
guess whether we can reach neighbour DC.
+            // We ping nodes from another DC within the same timeot in 
parallel.
+            if (!F.isEmpty(locNode.dataCenterId())) {
+                assert !F.isEmpty(next.dataCenterId());
+
+                if (!next.dataCenterId().equals(locNode.dataCenterId())) {
+                    Stream<TcpDiscoveryNode> otherDcsSrvrs = 
ring.serverNodes().stream()
+                        .filter(n -> 
!n.dataCenterId().equals(locNode.dataCenterId()));
+
+                    synchronized (mux) {
+                        otherDcsSrvrs = otherDcsSrvrs.filter(n -> 
!failedNodes.containsKey(n));
+                    }
+
+                    
recoveryState.pingRemoteDCs(otherDcsSrvrs.collect(toList()));
+                }
+            }
+
+            return recoveryState;
+        }
+
+        /** @return {@code True} if current node fails to recover the ring 
connection. */
+        private boolean checkConnectionRecoveryFailed(
+            CrossRingMessageSendState connRecoverState,
+            List<TcpDiscoveryNode> failedNodes
+        ) {
+            if (connRecoverState.timeout()) {
+                // Ensure of the ping pool release.
+                connRecoverState.stopRemoteDcPing();
+
+                segmentLocalNodeOnSendFail(failedNodes);
+
+                return true;
+            }
+
+            if (!connRecoverState.remoteDcPingStarted() || 
!connRecoverState.remoteDcPingFinished()
+                || connRecoverState.unavailableDCs != null)
+                return false;
+
+            // Remote DC statuses: avlive or not, Dc id -> true/false.

Review Comment:
   ```suggestion
               // Remote DC statuses: alive or not, Dc id -> true/false.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to