tombentley commented on a change in pull request #10221:
URL: https://github.com/apache/kafka/pull/10221#discussion_r658502529



##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
##########
@@ -246,4 +248,67 @@ public void testTargetAdminConfigWithSourcePrefix() {
         assertEquals(expectedAdminProps, connectorAdminProps);
     }
 
+    @Test
+    public void testOffsetSyncsTopic() {
+        // Invalid location
+        Map<String, String> connectorProps = 
makeProps("offset-syncs.topic.location", "something");
+        try {
+            new MirrorConnectorConfig(connectorProps);
+            fail("Should have thrown ConfigException");
+        } catch (ConfigException exc) { } // expected
+
+        connectorProps.put("offset-syncs.topic.location", "source");
+        MirrorConnectorConfig config = new 
MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.target2.internal", 
config.offsetSyncsTopic());
+        connectorProps.put("offset-syncs.topic.location", "target");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.source1.internal", 
config.offsetSyncsTopic());
+        // Default to source
+        connectorProps.remove("offset-syncs.topic.location");
+        config = new MirrorConnectorConfig(connectorProps);
+        assertEquals("mm2-offset-syncs.target2.internal", 
config.offsetSyncsTopic());
+    }
+
+    @Test
+    public void testConsumerConfigsForOffsetSyncsTopic() {
+        Map<String, String> connectorProps = makeProps(
+                "source.max.partition.fetch.bytes", "1",
+                "target.heartbeat.interval.ms", "1",
+                "consumer.max.poll.interval.ms", "1",
+                "fetch.min.bytes", "1"
+        );
+        MirrorConnectorConfig config = new 
MirrorConnectorConfig(connectorProps);
+        assertEquals(config.sourceConsumerConfig(), 
config.offsetSyncsTopicConsumerConfig());
+        connectorProps.put("offset-syncs.topic.location", "target");

Review comment:
       Adding to `connectorProps` won't change the already instantiated 
`config`.

##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
##########
@@ -246,4 +248,67 @@ public void testTargetAdminConfigWithSourcePrefix() {
         assertEquals(expectedAdminProps, connectorAdminProps);
     }
 
+    @Test
+    public void testOffsetSyncsTopic() {
+        // Invalid location
+        Map<String, String> connectorProps = 
makeProps("offset-syncs.topic.location", "something");
+        try {
+            new MirrorConnectorConfig(connectorProps);
+            fail("Should have thrown ConfigException");
+        } catch (ConfigException exc) { } // expected

Review comment:
       assertThrows

##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##########
@@ -435,7 +438,40 @@ public void testOneWayReplicationWithAutoOffsetSync() 
throws InterruptedExceptio
         assertEquals(0, records.count(), "consumer record size is not zero");
         backupConsumer.close();
     }
-    
+
+    @Test
+    public void testOffsetSyncsTopicsOnTarget() throws Exception {
+        // move offset-syncs topics to target
+        mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + 
".offset-syncs.topic.location", "target");
+        // one way replication from primary to backup
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
+
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // Ensure the offset syncs topic is created in the target cluster
+        waitForTopicCreated(backup.kafka(), "mm2-offset-syncs." + 
PRIMARY_CLUSTER_ALIAS + ".internal");
+
+        produceMessages(primary, "test-topic-1");
+
+        // Check offsets are pushed to the checkpoint topic
+        Consumer<byte[], byte[]> backupConsumer = 
backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+                "auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + 
".checkpoints.internal");
+        waitForCondition(() -> {
+            ConsumerRecords<byte[], byte[]> records = 
backupConsumer.poll(Duration.ofSeconds(1L));
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
+                if ((PRIMARY_CLUSTER_ALIAS + 
".test-topic-1").equals(checkpoint.topicPartition().topic())) {
+                    return true;
+                }
+            }
+            return false;
+        }, 30_000,
+            "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + 
"test-topic-1"
+        );

Review comment:
       Perhaps it's overkill, but maybe we should also assert that offset syncs 
topic doesn't exist in the source cluster here at the end of the test?

##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -202,6 +204,10 @@
     private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
     public static final long OFFSET_LAG_MAX_DEFAULT = 100L;
 
+    private static final String OFFSET_SYNCS_TOPIC_LOCATION = 
"offset-syncs.topic.location";
+    private static final Object OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = 
SOURCE_CLUSTER_ALIAS_DEFAULT;

Review comment:
       `String` not `Object`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to