mimaison commented on a change in pull request #8921:
URL: https://github.com/apache/kafka/pull/8921#discussion_r448872352



##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -67,6 +67,8 @@
     protected static final String SYNC_TOPIC_ACLS = "sync.topic.acls";
     protected static final String EMIT_HEARTBEATS = "emit.heartbeats";
     protected static final String EMIT_CHECKPOINTS = "emit.checkpoints";
+    protected static final String AUTH_OFFSET_RESET = "auto.offset.reset";
+    protected static final String ENABLE_AUTO_COMMIT = "enable.auto.commit";

Review comment:
       Can we reuse `ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG` instead of 
defining a new constant?

##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -229,8 +235,8 @@ Duration adminTimeout() {
         props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
         props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
         props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
-        props.put("enable.auto.commit", "false");
-        props.put("auto.offset.reset", "earliest");
+        props.put(ENABLE_AUTO_COMMIT, "false");
+        props.put(AUTH_OFFSET_RESET, CONSUMER_AUTO_OFFSET_RESET);

Review comment:
       If a user sets 
`somesource->sometarget.consumer.auto.offset.reset=latest` that should be 
picked up by line 235-237.
   
   So instead of defining a new config on `MirrorConnectorConfig`, could we do 
something like:
   `props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");`
   
   WDYT?
   

##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -67,6 +67,8 @@
     protected static final String SYNC_TOPIC_ACLS = "sync.topic.acls";
     protected static final String EMIT_HEARTBEATS = "emit.heartbeats";
     protected static final String EMIT_CHECKPOINTS = "emit.checkpoints";
+    protected static final String AUTH_OFFSET_RESET = "auto.offset.reset";

Review comment:
       Can we reuse `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` instead of 
defining a new constant?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to