mimaison commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1639833636


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +169,34 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public List<ConfigValue> validate() {
+        Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);
+        Boolean syncGroupOffsetsValue = 
this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED);
+
+        List<ConfigValue> invalidConfigs = new ArrayList<>();
+        if (!emitCheckpointsValue && !syncGroupOffsetsValue) {
+            ConfigValue syncGroupOffsets = new 
ConfigValue(SYNC_GROUP_OFFSETS_ENABLED);
+            ConfigValue emitCheckpoints = new 
ConfigValue(EMIT_CHECKPOINTS_ENABLED);
+
+            String errorMessage = "MirrorCheckpointConnector can't run without 
both" + SYNC_GROUP_OFFSETS_ENABLED + ", " +
+                    EMIT_CHECKPOINTS_ENABLED + "set to false";
+            syncGroupOffsets.addErrorMessage(errorMessage);
+            emitCheckpoints.addErrorMessage(errorMessage);
+            invalidConfigs.add(syncGroupOffsets);
+            invalidConfigs.add(emitCheckpoints);
+        }
+
+        boolean configuredWithDependincesOnOffsetSyncs = emitCheckpointsValue 
|| syncGroupOffsetsValue;

Review Comment:
   There's a typo in `configuredWithDependincesOnOffsetSyncs`. I propose 
`requireOffsetSyncs` as an alternative name.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +169,34 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public List<ConfigValue> validate() {
+        Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);
+        Boolean syncGroupOffsetsValue = 
this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED);
+
+        List<ConfigValue> invalidConfigs = new ArrayList<>();
+        if (!emitCheckpointsValue && !syncGroupOffsetsValue) {
+            ConfigValue syncGroupOffsets = new 
ConfigValue(SYNC_GROUP_OFFSETS_ENABLED);
+            ConfigValue emitCheckpoints = new 
ConfigValue(EMIT_CHECKPOINTS_ENABLED);
+
+            String errorMessage = "MirrorCheckpointConnector can't run without 
both" + SYNC_GROUP_OFFSETS_ENABLED + ", " +

Review Comment:
   I guess running the connector with both these feature disable does not make 
much sense, but is it an error?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +169,34 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public List<ConfigValue> validate() {
+        Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);

Review Comment:
   This can be boolean. Same below



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +169,34 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public List<ConfigValue> validate() {
+        Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);
+        Boolean syncGroupOffsetsValue = 
this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED);
+
+        List<ConfigValue> invalidConfigs = new ArrayList<>();
+        if (!emitCheckpointsValue && !syncGroupOffsetsValue) {
+            ConfigValue syncGroupOffsets = new 
ConfigValue(SYNC_GROUP_OFFSETS_ENABLED);
+            ConfigValue emitCheckpoints = new 
ConfigValue(EMIT_CHECKPOINTS_ENABLED);
+
+            String errorMessage = "MirrorCheckpointConnector can't run without 
both" + SYNC_GROUP_OFFSETS_ENABLED + ", " +
+                    EMIT_CHECKPOINTS_ENABLED + "set to false";
+            syncGroupOffsets.addErrorMessage(errorMessage);
+            emitCheckpoints.addErrorMessage(errorMessage);
+            invalidConfigs.add(syncGroupOffsets);
+            invalidConfigs.add(emitCheckpoints);
+        }
+
+        boolean configuredWithDependincesOnOffsetSyncs = emitCheckpointsValue 
|| syncGroupOffsetsValue;
+        if 
(!"true".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true"))
 & configuredWithDependincesOnOffsetSyncs) {
+            ConfigValue emitOffsetSync = new 
ConfigValue(EMIT_OFFSET_SYNCS_ENABLED);
+            emitOffsetSync.addErrorMessage("MirrorCheckpointConnector can't 
run with " + EMIT_OFFSET_SYNCS_ENABLED + " set to false while, " +
+                    EMIT_CHECKPOINTS_ENABLED  + "and/or" + 
SYNC_GROUP_OFFSETS_ENABLED + "set to true");

Review Comment:
   We need spaces before and after `and/or` and before `set`



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -430,11 +453,16 @@ void syncTopicConfigs()
     }
 
     private void createOffsetSyncsTopic() {
-        MirrorUtils.createSinglePartitionCompactedTopic(
-                config.offsetSyncsTopic(),
-                config.offsetSyncsTopicReplicationFactor(),
-                offsetSyncsAdminClient
-        );
+        if (config.emitOffsetSyncEnabled()) {
+            Admin offsetSyncsAdminClient = 
config.forwardingAdmin(config.offsetSyncsTopicAdminConfig());

Review Comment:
   `Admin` is `AutoCloseable` so we can use a try-with-resource block instead 
of explicitly closing the client below.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -131,7 +120,7 @@ public void stop() {
             log.warn("Interrupted waiting for access to consumer. Will try 
closing anyway."); 
         }
         Utils.closeQuietly(consumer, "source consumer");
-        Utils.closeQuietly(offsetProducer, "offset producer");
+        Utils.closeQuietly(offsetSyncWriter, "offset producer");

Review Comment:
   Can we adjust the name too?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -534,6 +537,121 @@ private void testOneWayReplicationWithOffsetSyncs(int 
offsetLagMax) throws Inter
         assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + 
".checkpoints.internal");
     }
 
+    @Test
+    public void testReplicationWithoutOffsetSync() throws Exception {

Review Comment:
   That's a pretty big test just to check the offset-syncs and checkpoints 
topics don't get created. Could we simplify it?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java:
##########
@@ -290,6 +290,11 @@ protected static ConfigDef config() {
                 .define(CLUSTERS_CONFIG, Type.LIST, Importance.HIGH, 
CLUSTERS_DOC)
                 .define(ENABLE_INTERNAL_REST_CONFIG, Type.BOOLEAN, false, 
Importance.HIGH, ENABLE_INTERNAL_REST_DOC)
                 .define(CONFIG_PROVIDERS_CONFIG, Type.LIST, 
Collections.emptyList(), Importance.LOW, CONFIG_PROVIDERS_DOC)
+                .define(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED,

Review Comment:
   Why are we defining the config here? Having it in MirrorSourceConfig should 
be enough.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +169,34 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public List<ConfigValue> validate() {
+        Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);
+        Boolean syncGroupOffsetsValue = 
this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED);
+
+        List<ConfigValue> invalidConfigs = new ArrayList<>();
+        if (!emitCheckpointsValue && !syncGroupOffsetsValue) {
+            ConfigValue syncGroupOffsets = new 
ConfigValue(SYNC_GROUP_OFFSETS_ENABLED);
+            ConfigValue emitCheckpoints = new 
ConfigValue(EMIT_CHECKPOINTS_ENABLED);
+
+            String errorMessage = "MirrorCheckpointConnector can't run without 
both" + SYNC_GROUP_OFFSETS_ENABLED + ", " +
+                    EMIT_CHECKPOINTS_ENABLED + "set to false";

Review Comment:
   We need spaces after `both` and before `set`



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +169,34 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public List<ConfigValue> validate() {

Review Comment:
   Can we add a test for this new method?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -220,6 +220,10 @@ boolean addSourceAliasToMetrics() {
         return getBoolean(ADD_SOURCE_ALIAS_TO_METRICS);
     }
 
+    boolean emitOffsetSyncEnabled() {

Review Comment:
   Should we name it `emitOffsetSyncsEnabled()` to match the configuration?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -52,57 +48,47 @@ public class MirrorSourceTask extends SourceTask {
 
     private static final Logger log = 
LoggerFactory.getLogger(MirrorSourceTask.class);
 
-    private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
-
     private KafkaConsumer<byte[], byte[]> consumer;
-    private KafkaProducer<byte[], byte[]> offsetProducer;
     private String sourceClusterAlias;
-    private String offsetSyncsTopic;
     private Duration pollTimeout;
-    private long maxOffsetLag;
     private Map<TopicPartition, PartitionState> partitionStates;
     private ReplicationPolicy replicationPolicy;
     private MirrorSourceMetrics metrics;
     private boolean stopping = false;
-    private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new 
LinkedHashMap<>();
-    private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new 
LinkedHashMap<>();
-    private Semaphore outstandingOffsetSyncs;
     private Semaphore consumerAccess;
+    private OffsetSyncWriter offsetSyncWriter;
 
     public MirrorSourceTask() {}
 
     // for testing
     MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, 
MirrorSourceMetrics metrics, String sourceClusterAlias,
                      ReplicationPolicy replicationPolicy, long maxOffsetLag, 
KafkaProducer<byte[], byte[]> producer,
                      Semaphore outstandingOffsetSyncs, Map<TopicPartition, 
PartitionState> partitionStates,
-                     String offsetSyncsTopic) {
+                     String offsetSyncsTopic, boolean emitOffsetSyncEnabled) {

Review Comment:
   Would it make sense to provide an `OffsetSyncWriter` instance instead of 
`KafkaProducer<byte[], byte[]>` to this constructor?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java:
##########
@@ -51,6 +51,30 @@ public class MirrorCheckpointConnectorTest {
     private static final String CONSUMER_GROUP = "consumer-group-1";
     private static final Map<String, ?> SOURCE_OFFSET = 
MirrorUtils.wrapOffset(0);
 
+    @Test
+    public void testEmitCheckpointsAndSyncGroupOffsetsBothDisabled() {
+        // disable the checkpoint emission
+        MirrorCheckpointConfig config = new MirrorCheckpointConfig(
+                makeProps("emit.checkpoints.enabled", "false",
+                        "sync.group.offsets.enabled", "false"));
+
+        Set<String> knownConsumerGroups = new HashSet<>();
+        knownConsumerGroups.add(CONSUMER_GROUP);
+        assertMirrorCheckpointConnectorDisabled(new 
MirrorCheckpointConnector(knownConsumerGroups, config));
+    }
+
+    @Test
+    public void testEmitOffsetSyncsDisabled() {

Review Comment:
   The value of this test is not clear. It's practically the same as the one 
just below. Basically all it does assert if that setting 
`sync.group.offsets.enabled=false` does not have any impact. 



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -234,6 +234,30 @@ public ConfigDef config() {
     @Override
     public org.apache.kafka.common.config.Config validate(Map<String, String> 
props) {
         List<ConfigValue> configValues = super.validate(props).configValues();
+        validateExactlyOnceConfigs(props, configValues);
+        validateEmitOffsetSyncConfigs(props, configValues);
+
+        return new org.apache.kafka.common.config.Config(configValues);
+    }
+
+    private static void validateEmitOffsetSyncConfigs(Map<String, String> 
props, List<ConfigValue> configValues) {

Review Comment:
   I'm not sure I understand why we have this method. Today you don't need to 
enable exactly once support for the connector to emit offset-syncs. I see no 
mention about this in the KIP, why are we making it a requirement ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to