mimaison commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r495817863
########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java ########## @@ -119,4 +120,41 @@ public void testNonMutationOfConfigDef() { connectorConfigDef.names().contains(taskSpecificProperty) )); } + + @Test + public void testSourceConsumerConfig() { + Map<String, String> connectorProps = makeProps( + MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "max.poll.interval.ms", "120000" + ); + MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps); + Map<String, Object> connectorConsumerProps = config.sourceConsumerConfig(); + Map<String, Object> expectedConsumerProps = new HashMap<>(); + expectedConsumerProps.put("enable.auto.commit", "false"); + expectedConsumerProps.put("auto.offset.reset", "earliest"); + expectedConsumerProps.put("max.poll.interval.ms", "120000"); + assertEquals(expectedConsumerProps, connectorConsumerProps); + + // checking auto.offset.reset override works + connectorProps = makeProps( + MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "auto.offset.reset", "latest" + ); + config = new MirrorConnectorConfig(connectorProps); + connectorConsumerProps = config.sourceConsumerConfig(); + expectedConsumerProps.put("auto.offset.reset", "latest"); + expectedConsumerProps.remove("max.poll.interval.ms"); + assertEquals(expectedConsumerProps, connectorConsumerProps); + } + + @Test + public void testSourceProducerConfig() { + Map<String, String> connectorProps = makeProps( + MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX + "acks", "1" + ); + MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps); + Map<String, Object> connectorProducerProps = config.sourceProducerConfig(); + Map<String, Object> expectedProducerProps = new HashMap<>(); + expectedProducerProps.put("acks", "1"); + assertEquals(expectedProducerProps, connectorProducerProps); + } + Review comment: While we're at it, could we also add tests for `targetAdminConfig()` and `sourceAdminConfig()`? ########## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ########## @@ -199,8 +199,8 @@ protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; - protected static final String PRODUCER_CLIENT_PREFIX = "producer."; - protected static final String CONSUMER_CLIENT_PREFIX = "consumer."; + protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer."; + protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer."; Review comment: These changes are going to break existing users. For example, I have connectors with a few settings prefixed with `consumer.`. I wonder if we could keep the old behaviour (even if partially broken) while adding the proper prefixes ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org