chia7712 commented on code in PR #20567:
URL: https://github.com/apache/kafka/pull/20567#discussion_r2742642083
##########
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java:
##########
@@ -163,6 +170,82 @@ public static Map<String, String> getMap(String propName,
String propValue) {
}
}
+ public static List<Endpoint> listenerListToEndPoints(List<String>
listeners, Map<ListenerName, SecurityProtocol> securityProtocolMap) {
+ return listenerListToEndPoints(listeners, securityProtocolMap, true);
+ }
+
+ public static List<Endpoint> listenerListToEndPoints(List<String>
listeners, Map<ListenerName, SecurityProtocol> securityProtocolMap, boolean
requireDistinctPorts) {
+ try {
+ List<Endpoint> endPoints =
SocketServerConfigs.listenerListToEndPoints(listeners, securityProtocolMap);
+ validate(endPoints, listeners, requireDistinctPorts);
+ return endPoints;
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Error creating
broker listeners from '%s': %s", listeners, e.getMessage()), e);
+ }
+ }
+
+ private static void validate(List<Endpoint> endPoints, List<String>
listeners, boolean requireDistinctPorts) {
+ long distinctListenerNames =
endPoints.stream().map(Endpoint::listener).distinct().count();
+ if (distinctListenerNames != endPoints.size()) {
+ throw new IllegalArgumentException("Each listener must have a
different name, listeners: " + listeners);
+ }
+
+ if (!requireDistinctPorts) return;
+
+ endPoints.stream()
+ .filter(ep -> ep.port() != 0) // filter port 0 for unit tests
+ .collect(Collectors.groupingBy(Endpoint::port))
+ .entrySet().stream()
+ .filter(entry -> entry.getValue().size() > 1)
+ .forEach(entry -> {
+ // Iterate through every grouping of duplicates by port to see
if they are valid
+ int port = entry.getKey();
+ List<Endpoint> eps = entry.getValue();
+ // Exception case, let's allow duplicate ports if one host is
on IPv4 and the other one is on IPv6
+ Map<Boolean, List<Endpoint>> partitionedByValidIp =
eps.stream()
+ .collect(Collectors.partitioningBy(ep -> ep.host() !=
null && INET_ADDRESS_VALIDATOR.isValid(ep.host())));
+
+ List<Endpoint> duplicatesWithIpHosts =
partitionedByValidIp.get(true);
+ List<Endpoint> duplicatesWithoutIpHosts =
partitionedByValidIp.get(false);
+
+ checkDuplicateListenerPorts(duplicatesWithoutIpHosts,
listeners);
+
+ if (duplicatesWithIpHosts.isEmpty()) {
Review Comment:
the empty body does bother me. Would you mind trying following style?
```java
if (duplicatesWithIpHosts.isEmpty()) return;
if (duplicatesWithIpHosts.size() == 2) {
String errorMessage = "If you have two listeners on the
same port then one needs to be IPv4 and the other IPv6, listeners: " +
listeners + ", port: " + port;
Endpoint ep1 = duplicatesWithIpHosts.get(0);
Endpoint ep2 = duplicatesWithIpHosts.get(1);
if (!validateOneIsIpv4AndOtherIpv6(ep1.host(),
ep2.host())) {
throw new IllegalArgumentException(errorMessage);
}
// If we reach this point it means that even though
duplicatesWithIpHosts in isolation can be valid, if
// there happens to be ANOTHER listener on this port
without an IP host (such as a null host) then its
// not valid.
if (!duplicatesWithoutIpHosts.isEmpty()) {
throw new IllegalArgumentException(errorMessage);
}
return;
}
// Having more than 2 duplicate endpoints doesn't make sense
since we only have 2 IP stacks (one is IPv4
// and the other is IPv6)
throw new IllegalArgumentException("Each listener must have
a different port unless exactly one listener has an IPv4 address and the other
IPv6 address, listeners: " + listeners + ", port: " + port);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]