OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1609631370


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -51,58 +48,52 @@
 public class MirrorSourceTask extends SourceTask {
 
     private static final Logger log = 
LoggerFactory.getLogger(MirrorSourceTask.class);
-
-    private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
-
     private KafkaConsumer<byte[], byte[]> consumer;
-    private KafkaProducer<byte[], byte[]> offsetProducer;
     private String sourceClusterAlias;
-    private String offsetSyncsTopic;
     private Duration pollTimeout;
-    private long maxOffsetLag;
     private Map<TopicPartition, PartitionState> partitionStates;
     private ReplicationPolicy replicationPolicy;
     private MirrorSourceMetrics metrics;
     private boolean stopping = false;
-    private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new 
LinkedHashMap<>();
-    private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new 
LinkedHashMap<>();
-    private Semaphore outstandingOffsetSyncs;
     private Semaphore consumerAccess;
+    private OffsetSyncWriter offsetSyncWriter;
+    private boolean emitOffsetSyncEnabled;
 
     public MirrorSourceTask() {}
 
     // for testing
     MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, 
MirrorSourceMetrics metrics, String sourceClusterAlias,
                      ReplicationPolicy replicationPolicy, long maxOffsetLag, 
KafkaProducer<byte[], byte[]> producer,
                      Semaphore outstandingOffsetSyncs, Map<TopicPartition, 
PartitionState> partitionStates,
-                     String offsetSyncsTopic) {
+                     String offsetSyncsTopic, boolean emitOffsetSyncEnabled) {
         this.consumer = consumer;
         this.metrics = metrics;
         this.sourceClusterAlias = sourceClusterAlias;
         this.replicationPolicy = replicationPolicy;
-        this.maxOffsetLag = maxOffsetLag;
         consumerAccess = new Semaphore(1);
-        this.offsetProducer = producer;
-        this.outstandingOffsetSyncs = outstandingOffsetSyncs;
+        if (emitOffsetSyncEnabled) {
+            this.offsetSyncWriter = new OffsetSyncWriter(producer, 
offsetSyncsTopic, outstandingOffsetSyncs, maxOffsetLag);
+            offsetSyncWriter.clearPendingOffsetSyncs();
+        }
+        this.emitOffsetSyncEnabled = emitOffsetSyncEnabled;
         this.partitionStates = partitionStates;
-        this.offsetSyncsTopic = offsetSyncsTopic;
     }
 
     @Override
     public void start(Map<String, String> props) {
         MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
-        pendingOffsetSyncs.clear();
-        outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS);
+        emitOffsetSyncEnabled = config.emitOffsetSyncEnabled();
         consumerAccess = new Semaphore(1);  // let one thread at a time access 
the consumer
         sourceClusterAlias = config.sourceClusterAlias();
         metrics = config.metrics();
         pollTimeout = config.consumerPollTimeout();
-        maxOffsetLag = config.maxOffsetLag();
         replicationPolicy = config.replicationPolicy();
         partitionStates = new HashMap<>();
-        offsetSyncsTopic = config.offsetSyncsTopic();
+        if (this.emitOffsetSyncEnabled) {
+            offsetSyncWriter = new OffsetSyncWriter(config);
+            offsetSyncWriter.clearPendingOffsetSyncs();

Review Comment:
   Good point I did a translation and didn't consider this. removed it now



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -51,58 +48,52 @@
 public class MirrorSourceTask extends SourceTask {
 
     private static final Logger log = 
LoggerFactory.getLogger(MirrorSourceTask.class);
-
-    private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
-
     private KafkaConsumer<byte[], byte[]> consumer;
-    private KafkaProducer<byte[], byte[]> offsetProducer;
     private String sourceClusterAlias;
-    private String offsetSyncsTopic;
     private Duration pollTimeout;
-    private long maxOffsetLag;
     private Map<TopicPartition, PartitionState> partitionStates;
     private ReplicationPolicy replicationPolicy;
     private MirrorSourceMetrics metrics;
     private boolean stopping = false;
-    private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new 
LinkedHashMap<>();
-    private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new 
LinkedHashMap<>();
-    private Semaphore outstandingOffsetSyncs;
     private Semaphore consumerAccess;
+    private OffsetSyncWriter offsetSyncWriter;
+    private boolean emitOffsetSyncEnabled;
 
     public MirrorSourceTask() {}
 
     // for testing
     MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, 
MirrorSourceMetrics metrics, String sourceClusterAlias,
                      ReplicationPolicy replicationPolicy, long maxOffsetLag, 
KafkaProducer<byte[], byte[]> producer,
                      Semaphore outstandingOffsetSyncs, Map<TopicPartition, 
PartitionState> partitionStates,
-                     String offsetSyncsTopic) {
+                     String offsetSyncsTopic, boolean emitOffsetSyncEnabled) {
         this.consumer = consumer;
         this.metrics = metrics;
         this.sourceClusterAlias = sourceClusterAlias;
         this.replicationPolicy = replicationPolicy;
-        this.maxOffsetLag = maxOffsetLag;
         consumerAccess = new Semaphore(1);
-        this.offsetProducer = producer;
-        this.outstandingOffsetSyncs = outstandingOffsetSyncs;
+        if (emitOffsetSyncEnabled) {
+            this.offsetSyncWriter = new OffsetSyncWriter(producer, 
offsetSyncsTopic, outstandingOffsetSyncs, maxOffsetLag);
+            offsetSyncWriter.clearPendingOffsetSyncs();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to