tombentley commented on a change in pull request #10221: URL: https://github.com/apache/kafka/pull/10221#discussion_r658502529
########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java ########## @@ -246,4 +248,67 @@ public void testTargetAdminConfigWithSourcePrefix() { assertEquals(expectedAdminProps, connectorAdminProps); } + @Test + public void testOffsetSyncsTopic() { + // Invalid location + Map<String, String> connectorProps = makeProps("offset-syncs.topic.location", "something"); + try { + new MirrorConnectorConfig(connectorProps); + fail("Should have thrown ConfigException"); + } catch (ConfigException exc) { } // expected + + connectorProps.put("offset-syncs.topic.location", "source"); + MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps); + assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic()); + connectorProps.put("offset-syncs.topic.location", "target"); + config = new MirrorConnectorConfig(connectorProps); + assertEquals("mm2-offset-syncs.source1.internal", config.offsetSyncsTopic()); + // Default to source + connectorProps.remove("offset-syncs.topic.location"); + config = new MirrorConnectorConfig(connectorProps); + assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic()); + } + + @Test + public void testConsumerConfigsForOffsetSyncsTopic() { + Map<String, String> connectorProps = makeProps( + "source.max.partition.fetch.bytes", "1", + "target.heartbeat.interval.ms", "1", + "consumer.max.poll.interval.ms", "1", + "fetch.min.bytes", "1" + ); + MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps); + assertEquals(config.sourceConsumerConfig(), config.offsetSyncsTopicConsumerConfig()); + connectorProps.put("offset-syncs.topic.location", "target"); Review comment: Adding to `connectorProps` won't change the already instantiated `config`. ########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java ########## @@ -246,4 +248,67 @@ public void testTargetAdminConfigWithSourcePrefix() { assertEquals(expectedAdminProps, connectorAdminProps); } + @Test + public void testOffsetSyncsTopic() { + // Invalid location + Map<String, String> connectorProps = makeProps("offset-syncs.topic.location", "something"); + try { + new MirrorConnectorConfig(connectorProps); + fail("Should have thrown ConfigException"); + } catch (ConfigException exc) { } // expected Review comment: assertThrows ########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java ########## @@ -435,7 +438,40 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio assertEquals(0, records.count(), "consumer record size is not zero"); backupConsumer.close(); } - + + @Test + public void testOffsetSyncsTopicsOnTarget() throws Exception { + // move offset-syncs topics to target + mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".offset-syncs.topic.location", "target"); + // one way replication from primary to backup + mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); + + mm2Config = new MirrorMakerConfig(mm2Props); + + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + + // Ensure the offset syncs topic is created in the target cluster + waitForTopicCreated(backup.kafka(), "mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal"); + + produceMessages(primary, "test-topic-1"); + + // Check offsets are pushed to the checkpoint topic + Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( + "auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal"); + waitForCondition(() -> { + ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L)); + for (ConsumerRecord<byte[], byte[]> record : records) { + Checkpoint checkpoint = Checkpoint.deserializeRecord(record); + if ((PRIMARY_CLUSTER_ALIAS + ".test-topic-1").equals(checkpoint.topicPartition().topic())) { + return true; + } + } + return false; + }, 30_000, + "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + "test-topic-1" + ); Review comment: Perhaps it's overkill, but maybe we should also assert that offset syncs topic doesn't exist in the source cluster here at the end of the test? ########## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ########## @@ -202,6 +204,10 @@ private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; public static final long OFFSET_LAG_MAX_DEFAULT = 100L; + private static final String OFFSET_SYNCS_TOPIC_LOCATION = "offset-syncs.topic.location"; + private static final Object OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = SOURCE_CLUSTER_ALIAS_DEFAULT; Review comment: `String` not `Object`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org