OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1617187677


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -105,6 +108,49 @@ public void stop() {
         Utils.closeQuietly(targetAdminClient, "target admin client");
     }
 
+    @Override
+    public Config validate(Map<String, String> props) {
+        List<ConfigValue> configValues = super.validate(props).configValues();
+        String emitCheckpointsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED_DEFAULT));
+        String syncGroupOffsetsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED_DEFAULT));
+        String emitOffsetSyncsValue = 
Optional.ofNullable(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).orElse(Boolean.toString(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED_DEFAULT));
+
+        if ("false".equals(emitCheckpointsValue) && 
"false".equals(syncGroupOffsetsValue)) {
+            ConfigValue syncGroupOffsets = configValues.stream().filter(prop 
-> MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED.equals(prop.name()))
+                    .findAny()
+                    .orElseGet(() -> {
+                        ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED);
+                        configValues.add(result);
+                        return result;
+                    });
+
+            ConfigValue emitCheckpoints = configValues.stream().filter(prop -> 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED.equals(prop.name()))
+                    .findAny()
+                    .orElseGet(() -> {
+                        ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED);
+                        configValues.add(result);
+                        return result;
+                    });
+
+            String errorMessage = "MirrorCheckpointConnector can't run with 
both" +
+                    MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED + ", " + 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED + "set to false";
+            syncGroupOffsets.addErrorMessage(errorMessage);
+            emitCheckpoints.addErrorMessage(errorMessage);
+        }
+        if ("false".equals(emitOffsetSyncsValue) && 
("true".equals(emitCheckpointsValue) || "true".equals(syncGroupOffsetsValue))) {
+            ConfigValue emitOffsetSyncs = configValues.stream().filter(prop -> 
MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED.equals(prop.name()))
+                    .findAny()
+                    .orElseGet(() -> {
+                        ConfigValue result = new 
ConfigValue(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED);
+                        configValues.add(result);
+                        return result;
+                    });
+
+            emitOffsetSyncs.addErrorMessage("MirrorCheckpointConnector can't 
run while MirrorSourceConnector configured with" +
+                    MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED + "set to 
false");
+        }
+        return new Config(configValues);

Review Comment:
   Just want to clarify some points regarding connect:
   1. `Connector::validate` get called by the runtime before `Connector::start` 
which will get us to fail faster
   2. `Connector::validate` get called in 
`AbstractHerder::validateConnectorConfig` which handles connector/task 
lifecycle tracking. 
   3. While `Connector::start` get called during the reconfiguration of the 
plugin in connect as well as at the start
   
   So we can't move the validation to `Connector::start` and we can't move the 
initialisation of `MirrorConnectorConfig` into `Connector::validate`. 
   
   If we want to use `.getBoolean` then I would suggest having a look into my 
last commit where I temporary initialised a connect configs to use the getters 
from `AbstractConfig` which all mirror connector configs extends. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to