C0urante commented on a change in pull request #11775:
URL: https://github.com/apache/kafka/pull/11775#discussion_r818837032



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -58,23 +119,87 @@ public Object get(String key) {
         }
     }
 
-    private static final ConfigDef CONFIG = SourceConnectorConfig.configDef();
+    private final TransactionBoundary transactionBoundary;
+    private final Long transactionBoundaryInterval;
     private final EnrichedSourceConnectorConfig enrichedSourceConfig;
+    private final String offsetsTopic;
 
     public static ConfigDef configDef() {
+        ConfigDef.Validator atLeastZero = ConfigDef.Range.atLeast(0);
         int orderInGroup = 0;
         return new ConfigDef(ConnectorConfig.configDef())
-                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
-                        ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                .define(
+                        TOPIC_CREATION_GROUPS_CONFIG,
+                        ConfigDef.Type.LIST,
+                        Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(
+                                new ConfigDef.NonNullValidator(),
+                                ConfigDef.LambdaValidator.with(
+                                    (name, value) -> {
+                                        List<?> groupAliases = (List<?>) value;
+                                        if (groupAliases.size() > new 
HashSet<>(groupAliases).size()) {
+                                            throw new ConfigException(name, 
value, "Duplicate alias provided.");
+                                        }
+                                    },
+                                    () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW,
+                        TOPIC_CREATION_GROUPS_DOC,
+                        TOPIC_CREATION_GROUP,
+                        ++orderInGroup,
+                        ConfigDef.Width.LONG,
+                        TOPIC_CREATION_GROUPS_DISPLAY)
+                .define(
+                        EXACTLY_ONCE_SUPPORT_CONFIG,
+                        ConfigDef.Type.STRING,
+                        REQUESTED.toString(),
+                        
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSupportLevel.class)),

Review comment:
       Definitely agree! Some of the rough edges around, e.g., the consumer 
`isolation.level` property and its case-sensitive parsing have been a little 
troublesome. It'd be great to abstract+simplify some of the logic there and 
also add some user-facing consistency with how these types of properties are 
parsed.
   
   
[KIP-713](https://cwiki.apache.org/confluence/display/KAFKA/KIP-713%3A+Validation+of+Enums+in+configuration)
 may be the place to discuss this kind of effort. It's unclear how much we 
really care about protecting the various out-of-the-box `Validator` instances 
that we ship as "public interface", but I chose to err on the side of caution 
here since the second we add one for enums, people are going to start using it, 
so we'd better get it right the first time. This seems like exactly the sort of 
thing that the KIP process helps facilitate: making careful, 
measure-twice-cut-once changes to public-facing parts of the project.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to