chia7712 commented on code in PR #20389: URL: https://github.com/apache/kafka/pull/20389#discussion_r2301926271
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java: ########## @@ -784,7 +783,7 @@ public List<Object> validValues(String name, Map<String, Object> parsedConfig) { for (PluginDesc<T> plugin : plugins()) { result.add(plugin.pluginClass()); } - return Collections.unmodifiableList(result); + return List.copyOf(result); Review Comment: ```java public List<Object> validValues(String name, Map<String, Object> parsedConfig) { return plugins().stream().map(p -> (Object) p.pluginClass()).toList(); } ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java: ########## @@ -253,7 +252,7 @@ public MetricGroupId(String groupName, Map<String, String> tags) { Objects.requireNonNull(groupName); Objects.requireNonNull(tags); this.groupName = groupName; - this.tags = Collections.unmodifiableMap(new LinkedHashMap<>(tags)); + this.tags = Map.copyOf(new LinkedHashMap<>(tags)); Review Comment: IIRC, `Map.copyOf` does not guarantee insertion order, so we should keep using `Collections.unmodifiableMap` instead. By the way, it would be useful to add comment to prevent unintended change in the future. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java: ########## @@ -58,7 +57,7 @@ protected Collection<Class<?>> regularResources() { @Override protected Collection<Class<?>> adminResources() { - return Collections.singletonList( + return List.of( Review Comment: ```java return List.of(LoggingResource.class); ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -137,7 +136,7 @@ public Future<Void> executeFailed(ProcessingContext<T> context, Stage stage, Cla // Visible for testing synchronized Future<Void> report(ProcessingContext<T> context) { if (reporters.size() == 1) { - return new WorkerErrantRecordReporter.ErrantRecordFuture(Collections.singletonList(reporters.iterator().next().report(context))); + return new WorkerErrantRecordReporter.ErrantRecordFuture(List.of(reporters.iterator().next().report(context))); Review Comment: ```java return new WorkerErrantRecordReporter.ErrantRecordFuture(List.of(reporters.get(0).report(context))); ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java: ########## @@ -134,11 +134,11 @@ private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> m for (String member : members) { Collection<String> connectors = connectorAssignments.get(member); Review Comment: ```java Collection<String> connectors = connectorAssignments.getOrDefault(member, List.of()); Collection<ConnectorTaskId> tasks = taskAssignments.getOrDefault(member, List.of()); ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java: ########## @@ -335,12 +333,12 @@ public static List<Path> pluginUrls(Path topPath) throws IOException { if (containsClassFiles) { if (archives.isEmpty()) { - return Collections.singletonList(topPath); + return List.of(topPath); } log.warn("Plugin path contains both java archives and class files. Returning only the" + " archives"); } - return Arrays.asList(archives.toArray(new Path[0])); + return List.of(archives.toArray(new Path[0])); Review Comment: ```java return List.copyOf(archives); ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java: ########## @@ -395,7 +394,7 @@ public ConnectMetricsRegistry(Set<String> tags) { connectorStatusMetrics.put(connectorUnassignedTaskCount, TaskStatus.State.UNASSIGNED); connectorStatusMetrics.put(connectorDestroyedTaskCount, TaskStatus.State.DESTROYED); connectorStatusMetrics.put(connectorRestartingTaskCount, TaskStatus.State.RESTARTING); - connectorStatusMetrics = Collections.unmodifiableMap(connectorStatusMetrics); + connectorStatusMetrics = Map.copyOf(connectorStatusMetrics); Review Comment: `connectorStatusMetrics` could be initialized with `Map.of`. Also, it could be a "final" variable ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java: ########## @@ -131,11 +131,11 @@ public void resume(TopicPartition... partitions) { throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized"); } try { - pausedPartitions.removeAll(Arrays.asList(partitions)); + List.of(partitions).forEach(pausedPartitions::remove); Review Comment: Could you please create a `List<TopicPartition>` at the begining and reuse it in the log message? Otherwise, the log message will jst call `toString` on the array object -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org