C0urante commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1659130813
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -106,6 +108,17 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } + @Override + public Config validate(Map<String, String> connectorConfigs) { + List<ConfigValue> configValues = super.validate(connectorConfigs).configValues(); + new MirrorCheckpointConfig(connectorConfigs).validate().forEach(invalidConfig -> + configValues.stream() + .filter(conf -> conf.name().equals(invalidConfig.name())) + .forEach(conf -> invalidConfig.errorMessages().forEach(msg -> conf.addErrorMessage(msg)))); Review Comment: Holy Java 8 Batman! We don't need `forEach` here, can simplify a bit: ```suggestion .forEach(conf -> conf.errorMessages().addAll(invalidConfig.errorMessages()))); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ########## @@ -166,6 +173,29 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } + public List<ConfigValue> validate() { + List<ConfigValue> invalidConfigs = new ArrayList<>(); + + if (!this.getBoolean(EMIT_CHECKPOINTS_ENABLED) && !this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED)) { + Arrays.asList(new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED), new ConfigValue(EMIT_CHECKPOINTS_ENABLED)) + .forEach(configValue -> { + configValue.addErrorMessage("MirrorCheckpointConnector can't run without both " + SYNC_GROUP_OFFSETS_ENABLED + ", " + + EMIT_CHECKPOINTS_ENABLED + " set to false"); + invalidConfigs.add(configValue); + }); + } + + if ("false".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true"))) { Review Comment: Why is `getBoolean` used above, but the `EMIT_OFFSET_SYNCS_ENABLED` property is manually read and parsed here? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ########## @@ -319,11 +397,14 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, String topic, int numMes cluster.produce(topic, Integer.toString(i)); } } - private void awaitMirrorMakerStart(final MirrorMaker mm, final SourceAndTarget sourceAndTarget) throws InterruptedException { + awaitMirrorMakerStart(mm, sourceAndTarget, CONNECTOR_CLASSES); + } + + private void awaitMirrorMakerStart(final MirrorMaker mm, final SourceAndTarget sourceAndTarget, final List<Class<?>> connectorClasses) throws InterruptedException { Review Comment: If we want to be fancy, we can use varargs here: ```suggestion private void awaitMirrorMakerStart(final MirrorMaker mm, final SourceAndTarget sourceAndTarget, final <Class<?>>... connectorClasses) throws InterruptedException { ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java: ########## @@ -59,12 +59,13 @@ public void testMirrorCheckpointConnectorDisabled() { Set<String> knownConsumerGroups = new HashSet<>(); knownConsumerGroups.add(CONSUMER_GROUP); + assertMirrorCheckpointConnectorDisabled(new MirrorCheckpointConnector(knownConsumerGroups, config)); + } + + private void assertMirrorCheckpointConnectorDisabled(MirrorCheckpointConnector connector) { Review Comment: Is this change still necessary? Looks like it might have been left over from a previous draft? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ########## @@ -187,6 +194,77 @@ public void testSingleNodeCluster() throws Exception { } } + @Test + public void testClusterWithEmitOffsetDisabled() throws Exception { + Properties brokerProps = new Properties(); + EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps); + EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps); + + try (Admin adminB = clusterB.createAdminClient()) { + + // Cluster aliases + final String a = "A"; + final String b = "B"; + final String ab = a + "->" + b; + final String ba = b + "->" + a; Review Comment: Currently unused. We can either remove it, or if we want to be more explicit in our properties setup, we can use it to explicitly disable the b->a flow by setting `ba + ".enabled"` to false. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ########## @@ -166,6 +173,29 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } + public List<ConfigValue> validate() { + List<ConfigValue> invalidConfigs = new ArrayList<>(); + + if (!this.getBoolean(EMIT_CHECKPOINTS_ENABLED) && !this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED)) { + Arrays.asList(new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED), new ConfigValue(EMIT_CHECKPOINTS_ENABLED)) + .forEach(configValue -> { + configValue.addErrorMessage("MirrorCheckpointConnector can't run without both " + SYNC_GROUP_OFFSETS_ENABLED + ", " + + EMIT_CHECKPOINTS_ENABLED + " set to false"); + invalidConfigs.add(configValue); + }); + } Review Comment: I see the motivation here, but won't it become impossible for people to safely disable the checkpoint connector? It looks like the connector is failing validation in the new `DedicatedMirrorIntegrationTest::testClusterWithEmitOffsetDisabled` case, even though the test itself passes. Maybe we can skip validation if the connector is disabled (i.e., `getBoolean(ENABLED)` returns `false`)? Should we add a check ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncWriter.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Semaphore; + +/** + * Used internally by MirrorMaker to write translated offsets into offset-syncs topic, with some buffering logic to limit the number of in-flight records. + */ +class OffsetSyncWriter implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(OffsetSyncWriter.class); + private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10; + + private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new LinkedHashMap<>(); + private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new LinkedHashMap<>(); + private final Semaphore outstandingOffsetSyncs; + private final KafkaProducer<byte[], byte[]> offsetProducer; + private final String offsetSyncsTopic; + private final long maxOffsetLag; + private Map<TopicPartition, PartitionState> partitionStates = new HashMap<>(); Review Comment: Nit: can be final ```suggestion private final Map<TopicPartition, PartitionState> partitionStates = new HashMap<>(); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ########## @@ -166,6 +173,29 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } + public List<ConfigValue> validate() { Review Comment: We don't need to return a `List<ConfigValue>` here, we can just do something like a `Map<String, List<String>>` (mapping property names to error messages). ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ########## @@ -166,6 +173,29 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } + public List<ConfigValue> validate() { + List<ConfigValue> invalidConfigs = new ArrayList<>(); + + if (!this.getBoolean(EMIT_CHECKPOINTS_ENABLED) && !this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED)) { + Arrays.asList(new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED), new ConfigValue(EMIT_CHECKPOINTS_ENABLED)) + .forEach(configValue -> { + configValue.addErrorMessage("MirrorCheckpointConnector can't run without both " + SYNC_GROUP_OFFSETS_ENABLED + ", " + + EMIT_CHECKPOINTS_ENABLED + " set to false"); + invalidConfigs.add(configValue); + }); + } + + if ("false".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true"))) { + Arrays.asList(new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED), new ConfigValue(EMIT_CHECKPOINTS_ENABLED), new ConfigValue(EMIT_OFFSET_SYNCS_ENABLED)) Review Comment: I can see a rationale for reporting errors on all of these properties, but 1) IMO it's overkill and a bit redundant, and 2) it's also misleading if either offset syncing or checkpointing are disabled. Can we just attach the error to the `emit.offset-syncs.enabled` property? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -197,79 +183,18 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) { long latency = System.currentTimeMillis() - record.timestamp(); metrics.countRecord(topicPartition); metrics.replicationLatency(topicPartition, latency); - TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); - long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); - long downstreamOffset = metadata.offset(); - maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset); - // We may be able to immediately publish an offset sync that we've queued up here - firePendingOffsetSyncs(); - } - - // updates partition state and queues up OffsetSync if necessary - private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset, - long downstreamOffset) { - PartitionState partitionState = - partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag)); - OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset); - if (partitionState.update(upstreamOffset, downstreamOffset)) { - // Queue this sync for an immediate send, as downstream state is sufficiently stale - synchronized (this) { - delayedOffsetSyncs.remove(topicPartition); - pendingOffsetSyncs.put(topicPartition, offsetSync); - } - partitionState.reset(); - } else { - // Queue this sync to be delayed until the next periodic offset commit - synchronized (this) { - delayedOffsetSyncs.put(topicPartition, offsetSync); - } + // Queue offset syncs only when offsetWriter is available + if (offsetSyncWriter != null) { + TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); + long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); + long downstreamOffset = metadata.offset(); + MirrorSourceTask.PartitionState partitionState = + partitionStates.computeIfAbsent(topicPartition, x -> new MirrorSourceTask.PartitionState(offsetSyncWriter.maxOffsetLag())); Review Comment: 💯 Thanks, looks great! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org