chia7712 commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1662903374


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -106,6 +108,17 @@ public void stop() {
         Utils.closeQuietly(targetAdminClient, "target admin client");
     }
 
+    @Override
+    public Config validate(Map<String, String> connectorConfigs) {
+        List<ConfigValue> configValues = 
super.validate(connectorConfigs).configValues();
+        MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+                configValues.stream()
+                        .filter(conf -> conf.name().equals(config))
+                        .forEach(conf -> conf.errorMessages().add(errorMsg)));

Review Comment:
   could you please use `addErrorMessage` instead of `errorMessages().add`?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -106,6 +108,17 @@ public void stop() {
         Utils.closeQuietly(targetAdminClient, "target admin client");
     }
 
+    @Override
+    public Config validate(Map<String, String> connectorConfigs) {
+        List<ConfigValue> configValues = 
super.validate(connectorConfigs).configValues();
+        MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+                configValues.stream()
+                        .filter(conf -> conf.name().equals(config))

Review Comment:
   `EMIT_OFFSET_SYNCS_ENABLED` is in `MirrorSourceConfig`, so it is not a part 
of config def of `MirrorCheckpointConnector`. Hence, the error related to 
`EMIT_OFFSET_SYNCS_ENABLED` can't be propagated.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -234,6 +234,30 @@ public ConfigDef config() {
     @Override
     public org.apache.kafka.common.config.Config validate(Map<String, String> 
props) {
         List<ConfigValue> configValues = super.validate(props).configValues();
+        validateExactlyOnceConfigs(props, configValues);
+        validateEmitOffsetSyncConfigs(props, configValues);
+
+        return new org.apache.kafka.common.config.Config(configValues);
+    }
+
+    private static void validateEmitOffsetSyncConfigs(Map<String, String> 
props, List<ConfigValue> configValues) {
+        boolean offsetSyncsConfigured = props.keySet().stream()
+                .anyMatch(conf -> 
conf.startsWith(OFFSET_SYNCS_CLIENT_ROLE_PREFIX) || 
conf.startsWith(OFFSET_SYNCS_TOPIC_CONFIG_PREFIX));
+
+        if 
("false".equals(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)) && 
offsetSyncsConfigured) {

Review Comment:
   what if `EMIT_OFFSET_SYNCS_ENABLED=true` and `offsetSyncsConfigured=false`? 
Should we add error message for it?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -106,6 +108,17 @@ public void stop() {
         Utils.closeQuietly(targetAdminClient, "target admin client");
     }
 
+    @Override
+    public Config validate(Map<String, String> connectorConfigs) {
+        List<ConfigValue> configValues = 
super.validate(connectorConfigs).configValues();
+        MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, 
errorMsg) ->
+                configValues.stream()
+                        .filter(conf -> conf.name().equals(config))

Review Comment:
   The following test shows my concern:
   ```java
       @Test
       public void test() {
           Map<String, String> props = new HashMap<>();
           props.put(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED, "false");
           MirrorCheckpointConnector connector = new 
MirrorCheckpointConnector();
           Config config = connector.validate(props);
           assertEquals(1, config.configValues().stream().filter(c -> 
c.name().equals(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).count());
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to