kkonstantine commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r741409428



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched 
to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are 
submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for 
each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the 
record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link 
#removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} 
can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = 
LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;

Review comment:
       When I made the suggestion to use a single deque in order to avoid 
considerably changing the behavior we have today, I had forgotten that we 
explicitly set the producer retries to be infinite. 
   
   Having said that, my initial comment was not about finding a hypothetical 
connector that would break by this change in behavior, but more about whether 
it's an intuitive programming model to assume that while records are produced 
in sequence by a Connect task, only some of them have their offsets persisted, 
without bubbling up an error to the task or stalling offset commit overall for 
that task. 
   
   That might be true for most connectors that use different keys per task and 
consider each offset key (source partition) completely independent from another 
offset key. (I can see also how many connectors might use only a single or a 
few keys per task, since keys are not supposed to scale with the produce 
messages). But I wouldn't say that the implications of partition unavailability 
to how tasks commit offsets are that obvious to the connector developer.
   
   My assumption is that Connect sets infinite retries based on the opinion 
that Kafka is dependable enough that given some time for recovery it will bring 
all the partitions of a topic to a functioning state and similar delivery 
throughput. And that's a fair assumption to make. 
   
   If we don't consider backporting this fix, then multiple deques LGTM. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to