C0urante commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1659130813


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -106,6 +108,17 @@ public void stop() {
         Utils.closeQuietly(targetAdminClient, "target admin client");
     }
 
+    @Override
+    public Config validate(Map<String, String> connectorConfigs) {
+        List<ConfigValue> configValues = 
super.validate(connectorConfigs).configValues();
+        new 
MirrorCheckpointConfig(connectorConfigs).validate().forEach(invalidConfig ->
+                configValues.stream()
+                        .filter(conf -> 
conf.name().equals(invalidConfig.name()))
+                        .forEach(conf -> 
invalidConfig.errorMessages().forEach(msg -> conf.addErrorMessage(msg))));

Review Comment:
   Holy Java 8 Batman!
   
   We don't need `forEach` here, can simplify a bit:
   ```suggestion
                           .forEach(conf -> 
conf.errorMessages().addAll(invalidConfig.errorMessages())));
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +173,29 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public List<ConfigValue> validate() {
+        List<ConfigValue> invalidConfigs = new ArrayList<>();
+
+        if (!this.getBoolean(EMIT_CHECKPOINTS_ENABLED) && 
!this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED)) {
+            Arrays.asList(new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED), new 
ConfigValue(EMIT_CHECKPOINTS_ENABLED))
+                    .forEach(configValue -> {
+                        configValue.addErrorMessage("MirrorCheckpointConnector 
can't run without both " + SYNC_GROUP_OFFSETS_ENABLED + ", " +
+                                EMIT_CHECKPOINTS_ENABLED + " set to false");
+                        invalidConfigs.add(configValue);
+                    });
+        }
+
+        if 
("false".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true")))
 {

Review Comment:
   Why is `getBoolean` used above, but the `EMIT_OFFSET_SYNCS_ENABLED` property 
is manually read and parsed here?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -319,11 +397,14 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, 
String topic, int numMes
             cluster.produce(topic, Integer.toString(i));
         }
     }
-
     private void awaitMirrorMakerStart(final MirrorMaker mm, final 
SourceAndTarget sourceAndTarget) throws InterruptedException {
+        awaitMirrorMakerStart(mm, sourceAndTarget, CONNECTOR_CLASSES);
+    }
+
+    private void awaitMirrorMakerStart(final MirrorMaker mm, final 
SourceAndTarget sourceAndTarget, final  List<Class<?>> connectorClasses) throws 
InterruptedException {

Review Comment:
   If we want to be fancy, we can use varargs here:
   ```suggestion
       private void awaitMirrorMakerStart(final MirrorMaker mm, final 
SourceAndTarget sourceAndTarget, final <Class<?>>... connectorClasses) throws 
InterruptedException {
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java:
##########
@@ -59,12 +59,13 @@ public void testMirrorCheckpointConnectorDisabled() {
 
         Set<String> knownConsumerGroups = new HashSet<>();
         knownConsumerGroups.add(CONSUMER_GROUP);
+        assertMirrorCheckpointConnectorDisabled(new 
MirrorCheckpointConnector(knownConsumerGroups, config));
+    }
+
+    private void 
assertMirrorCheckpointConnectorDisabled(MirrorCheckpointConnector connector) {

Review Comment:
   Is this change still necessary? Looks like it might have been left over from 
a previous draft?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -187,6 +194,77 @@ public void testSingleNodeCluster() throws Exception {
         }
     }
 
+    @Test
+    public void testClusterWithEmitOffsetDisabled() throws Exception {
+        Properties brokerProps = new Properties();
+        EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps);
+        EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps);
+
+        try (Admin adminB = clusterB.createAdminClient()) {
+
+            // Cluster aliases
+            final String a = "A";
+            final String b = "B";
+            final String ab = a + "->" + b;
+            final String ba = b + "->" + a;

Review Comment:
   Currently unused. We can either remove it, or if we want to be more explicit 
in our properties setup, we can use it to explicitly disable the b->a flow by 
setting `ba + ".enabled"` to false.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +173,29 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public List<ConfigValue> validate() {
+        List<ConfigValue> invalidConfigs = new ArrayList<>();
+
+        if (!this.getBoolean(EMIT_CHECKPOINTS_ENABLED) && 
!this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED)) {
+            Arrays.asList(new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED), new 
ConfigValue(EMIT_CHECKPOINTS_ENABLED))
+                    .forEach(configValue -> {
+                        configValue.addErrorMessage("MirrorCheckpointConnector 
can't run without both " + SYNC_GROUP_OFFSETS_ENABLED + ", " +
+                                EMIT_CHECKPOINTS_ENABLED + " set to false");
+                        invalidConfigs.add(configValue);
+                    });
+        }

Review Comment:
   I see the motivation here, but won't it become impossible for people to 
safely disable the checkpoint connector? It looks like the connector is failing 
validation in the new 
`DedicatedMirrorIntegrationTest::testClusterWithEmitOffsetDisabled` case, even 
though the test itself passes.
   
   Maybe we can skip validation if the connector is disabled (i.e., 
`getBoolean(ENABLED)` returns `false`)?
   
   Should we add a check 



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncWriter.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Semaphore;
+
+/**
+ * Used internally by MirrorMaker to write translated offsets into 
offset-syncs topic, with some buffering logic to limit the number of in-flight 
records.
+ */
+class OffsetSyncWriter implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(OffsetSyncWriter.class);
+    private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
+
+    private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new 
LinkedHashMap<>();
+    private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new 
LinkedHashMap<>();
+    private final Semaphore outstandingOffsetSyncs;
+    private final KafkaProducer<byte[], byte[]> offsetProducer;
+    private final String offsetSyncsTopic;
+    private final long maxOffsetLag;
+    private Map<TopicPartition, PartitionState> partitionStates = new 
HashMap<>();

Review Comment:
   Nit: can be final
   
   ```suggestion
       private final Map<TopicPartition, PartitionState> partitionStates = new 
HashMap<>();
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +173,29 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public List<ConfigValue> validate() {

Review Comment:
   We don't need to return a `List<ConfigValue>` here, we can just do something 
like a `Map<String, List<String>>` (mapping property names to error messages).



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +173,29 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public List<ConfigValue> validate() {
+        List<ConfigValue> invalidConfigs = new ArrayList<>();
+
+        if (!this.getBoolean(EMIT_CHECKPOINTS_ENABLED) && 
!this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED)) {
+            Arrays.asList(new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED), new 
ConfigValue(EMIT_CHECKPOINTS_ENABLED))
+                    .forEach(configValue -> {
+                        configValue.addErrorMessage("MirrorCheckpointConnector 
can't run without both " + SYNC_GROUP_OFFSETS_ENABLED + ", " +
+                                EMIT_CHECKPOINTS_ENABLED + " set to false");
+                        invalidConfigs.add(configValue);
+                    });
+        }
+
+        if 
("false".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true")))
 {
+            Arrays.asList(new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED), new 
ConfigValue(EMIT_CHECKPOINTS_ENABLED), new 
ConfigValue(EMIT_OFFSET_SYNCS_ENABLED))

Review Comment:
   I can see a rationale for reporting errors on all of these properties, but 
1) IMO it's overkill and a bit redundant, and 2) it's also misleading if either 
offset syncing or checkpointing are disabled.
   
   Can we just attach the error to the `emit.offset-syncs.enabled` property?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -197,79 +183,18 @@ public void commitRecord(SourceRecord record, 
RecordMetadata metadata) {
         long latency = System.currentTimeMillis() - record.timestamp();
         metrics.countRecord(topicPartition);
         metrics.replicationLatency(topicPartition, latency);
-        TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
-        long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
-        long downstreamOffset = metadata.offset();
-        maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, 
downstreamOffset);
-        // We may be able to immediately publish an offset sync that we've 
queued up here
-        firePendingOffsetSyncs();
-    }
-
-    // updates partition state and queues up OffsetSync if necessary
-    private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long 
upstreamOffset,
-                                       long downstreamOffset) {
-        PartitionState partitionState =
-            partitionStates.computeIfAbsent(topicPartition, x -> new 
PartitionState(maxOffsetLag));
-        OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, 
downstreamOffset);
-        if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            // Queue this sync for an immediate send, as downstream state is 
sufficiently stale
-            synchronized (this) {
-                delayedOffsetSyncs.remove(topicPartition);
-                pendingOffsetSyncs.put(topicPartition, offsetSync);
-            }
-            partitionState.reset();
-        } else {
-            // Queue this sync to be delayed until the next periodic offset 
commit
-            synchronized (this) {
-                delayedOffsetSyncs.put(topicPartition, offsetSync);
-            }
+        // Queue offset syncs only when offsetWriter is available
+        if (offsetSyncWriter != null) {
+            TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
+            long upstreamOffset = 
MirrorUtils.unwrapOffset(record.sourceOffset());
+            long downstreamOffset = metadata.offset();
+            MirrorSourceTask.PartitionState partitionState =
+                    partitionStates.computeIfAbsent(topicPartition, x -> new 
MirrorSourceTask.PartitionState(offsetSyncWriter.maxOffsetLag()));

Review Comment:
   💯 Thanks, looks great!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to