tommystendahl commented on code in PR #4155: URL: https://github.com/apache/cassandra/pull/4155#discussion_r2108485012
########## src/java/org/apache/cassandra/locator/ReplicaPlans.java: ########## @@ -45,7 +49,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; Review Comment: I think the imports should be moved back. ########## src/java/org/apache/cassandra/locator/ReplicaPlans.java: ########## @@ -414,13 +446,86 @@ public static Collection<InetAddressAndPort> filterBatchlogEndpointsDynamic(bool if (result.contains(endpoint)) continue; + result.add(endpoint); } } return result; } + private static Optional<InetAddressAndPort> getInternalAddressAndPort(InetAddressAndPort endpoint) + { + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state == null) + { + logger.debug("No EndpointState found for endpoint: {}", endpoint); + return Optional.empty(); + } + + VersionedValue internal = state.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT); + if (internal == null || internal.value == null || internal.value.trim().isEmpty()) + { + logger.debug("No INTERNAL_ADDRESS_AND_PORT state for endpoint: {}", endpoint); + return Optional.empty(); + } + + try + { + InetAddressAndPort internalIp = InetAddressAndPort.getByName(internal.value.trim()); + return Optional.of(InetAddressAndPort.getByAddressOverrideDefaults(internalIp.getAddress(), internalIp.getPort())); + } + catch (UnknownHostException e) + { + logger.warn("Failed to parse INTERNAL_ADDRESS_AND_PORT [{}] for endpoint: {} due to {}", internal.value, endpoint, e.getMessage()); + return Optional.empty(); + } + } + @VisibleForTesting + public static class CacheEntry Review Comment: indentation ########## src/java/org/apache/cassandra/locator/ReplicaPlans.java: ########## @@ -295,10 +307,30 @@ private static ListMultimap<String, InetAddressAndPort> validate(boolean preferL for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries()) { InetAddressAndPort addr = entry.getValue(); - if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr)) - validated.put(entry.getKey(), entry.getValue()); + + // Skip local address and nodes marked down by FailureDetector + if (addr.equals(FBUtilities.getBroadcastAddressAndPort())) + continue; + + if (!isAlive.test(addr)) + continue; + + // Check for intra dc connectivity on private network. Cache the result for 30 seconds. + // Valid for topology where two newtwork interfaces are used. Review Comment: indentation ########## src/java/org/apache/cassandra/locator/ReplicaPlans.java: ########## @@ -295,10 +307,30 @@ private static ListMultimap<String, InetAddressAndPort> validate(boolean preferL for (Map.Entry<String, InetAddressAndPort> entry : endpoints.entries()) { InetAddressAndPort addr = entry.getValue(); - if (!addr.equals(FBUtilities.getBroadcastAddressAndPort()) && isAlive.test(addr)) - validated.put(entry.getKey(), entry.getValue()); + + // Skip local address and nodes marked down by FailureDetector + if (addr.equals(FBUtilities.getBroadcastAddressAndPort())) + continue; + + if (!isAlive.test(addr)) + continue; + + // Check for intra dc connectivity on private network. Cache the result for 30 seconds. + // Valid for topology where two newtwork interfaces are used. + Optional<InetAddressAndPort> maybeInternal = getInternalAddressAndPort(addr); + if (maybeInternal.isPresent()) { + InetAddressAndPort internal = maybeInternal.get(); + if (isReachableWithCache(internal, CACHE_TTL_MS)) { + validated.put(entry.getKey(), addr); + } + } else { + // No internal address means either it's a single-interface node or gossip isn't set up; + // trust isAlive (failure detector) in this case. + validated.put(entry.getKey(), addr); + } } + Review Comment: Remove line ########## src/java/org/apache/cassandra/locator/ReplicaPlans.java: ########## @@ -414,13 +446,86 @@ public static Collection<InetAddressAndPort> filterBatchlogEndpointsDynamic(bool if (result.contains(endpoint)) continue; + result.add(endpoint); } } return result; } + private static Optional<InetAddressAndPort> getInternalAddressAndPort(InetAddressAndPort endpoint) + { + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state == null) + { + logger.debug("No EndpointState found for endpoint: {}", endpoint); + return Optional.empty(); + } + + VersionedValue internal = state.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT); + if (internal == null || internal.value == null || internal.value.trim().isEmpty()) + { + logger.debug("No INTERNAL_ADDRESS_AND_PORT state for endpoint: {}", endpoint); + return Optional.empty(); + } + + try + { + InetAddressAndPort internalIp = InetAddressAndPort.getByName(internal.value.trim()); + return Optional.of(InetAddressAndPort.getByAddressOverrideDefaults(internalIp.getAddress(), internalIp.getPort())); + } + catch (UnknownHostException e) + { + logger.warn("Failed to parse INTERNAL_ADDRESS_AND_PORT [{}] for endpoint: {} due to {}", internal.value, endpoint, e.getMessage()); + return Optional.empty(); + } + } + @VisibleForTesting + public static class CacheEntry + { + boolean reachable; + long timestampMillis; + + public CacheEntry(boolean reachable, long timestampMillis) + { + this.reachable = reachable; + this.timestampMillis = timestampMillis; + } + } + + // Check reachability with cache + private static boolean isReachableWithCache(InetAddressAndPort address, int timeoutMs) { + if (address == null) + { + logger.debug("Null address provided to isReachableWithCache, treating as unreachable."); + return false; + } + + long now = FBUtilities.now().toEpochMilli(); + CacheEntry entry = reachabilityCache.get(address); + + if (entry != null && (now - entry.timestampMillis) < CACHE_TTL_MS) { + logger.trace("Using cached reachability for {}: {}", address, entry.reachable); + return entry.reachable; + } + + boolean reachable = isReachableOnce(address, timeoutMs); + reachabilityCache.put(address, new CacheEntry(reachable, now)); + logger.debug("Refreshed reachability for {}: {}", address, reachable); + return reachable; + } + + private static boolean isReachableOnce(InetAddressAndPort address, int timeoutMs) { Review Comment: { on new line ########## src/java/org/apache/cassandra/locator/ReplicaPlans.java: ########## @@ -414,13 +446,86 @@ public static Collection<InetAddressAndPort> filterBatchlogEndpointsDynamic(bool if (result.contains(endpoint)) continue; + result.add(endpoint); } } return result; } + private static Optional<InetAddressAndPort> getInternalAddressAndPort(InetAddressAndPort endpoint) + { + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state == null) + { + logger.debug("No EndpointState found for endpoint: {}", endpoint); + return Optional.empty(); + } + + VersionedValue internal = state.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT); + if (internal == null || internal.value == null || internal.value.trim().isEmpty()) + { + logger.debug("No INTERNAL_ADDRESS_AND_PORT state for endpoint: {}", endpoint); + return Optional.empty(); + } + + try + { + InetAddressAndPort internalIp = InetAddressAndPort.getByName(internal.value.trim()); + return Optional.of(InetAddressAndPort.getByAddressOverrideDefaults(internalIp.getAddress(), internalIp.getPort())); + } + catch (UnknownHostException e) + { + logger.warn("Failed to parse INTERNAL_ADDRESS_AND_PORT [{}] for endpoint: {} due to {}", internal.value, endpoint, e.getMessage()); + return Optional.empty(); + } + } + @VisibleForTesting + public static class CacheEntry + { + boolean reachable; + long timestampMillis; + + public CacheEntry(boolean reachable, long timestampMillis) + { + this.reachable = reachable; + this.timestampMillis = timestampMillis; + } + } + + // Check reachability with cache + private static boolean isReachableWithCache(InetAddressAndPort address, int timeoutMs) { Review Comment: { on new line. ########## src/java/org/apache/cassandra/locator/ReplicaPlans.java: ########## @@ -414,13 +446,86 @@ public static Collection<InetAddressAndPort> filterBatchlogEndpointsDynamic(bool if (result.contains(endpoint)) continue; + Review Comment: remove line ########## src/java/org/apache/cassandra/locator/ReplicaPlans.java: ########## @@ -269,7 +281,7 @@ public static ReplicaPlan.ForWrite forBatchlogWrite(boolean isAny) throws Unavai } @VisibleForTesting - public static Collection<InetAddressAndPort> filterBatchlogEndpoints(boolean preferLocalRack, String localRack, + public static Collection<InetAddressAndPort> filterBatchlogEndpoints(boolean preferLocalRack, String localRack, Review Comment: Wrong indentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org