apoorvmittal10 commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1812514360
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1475,6 +1486,10 @@ private Optional<Throwable> acknowledgeCompleteBatch(
return Optional.empty();
}
+ LogOffsetMetadata latestFetchOffsetMetadata() {
Review Comment:
Is it need to be default scoped?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -303,6 +310,7 @@ public static RecordState forId(byte id) {
this.partitionState = SharePartitionState.EMPTY;
this.replicaManager = replicaManager;
this.groupConfigManager = groupConfigManager;
+ latestFetchOffsetMetadata = null;
Review Comment:
Isn't by default it will be null, do we need to define it here?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -518,7 +526,8 @@ public long nextFetchOffset() {
*/
public List<AcquiredRecords> acquire(
String memberId,
- FetchPartitionData fetchPartitionData
+ FetchPartitionData fetchPartitionData,
+ LogOffsetMetadata fetchOffsetMetadata
Review Comment:
You made a new class for this, shouldn't we pass that here?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -532,6 +541,8 @@ public List<AcquiredRecords> acquire(
RecordBatch firstBatch =
fetchPartitionData.records.batches().iterator().next();
lock.writeLock().lock();
try {
+ // Update the latest fetch offset metadata for any future queries.
+ this.latestFetchOffsetMetadata = fetchOffsetMetadata;
Review Comment:
So in a fetch result of 10 batches we should store the last batch info, is
that correct? If yes then shouldn't name of variable be appropriately defined?
##########
core/src/main/java/kafka/server/share/FetchPartitionOffsetData.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.share;
Review Comment:
Can it reside in share module under `fetch`?
##########
core/src/main/java/kafka/server/share/FetchPartitionOffsetData.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.share;
+
+import org.apache.kafka.server.storage.log.FetchPartitionData;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+
+import java.util.Objects;
+
+public class FetchPartitionOffsetData {
+ private final FetchPartitionData fetchPartitionData;
+ private final LogOffsetMetadata logOffsetMetadata;
+
+ public FetchPartitionOffsetData(FetchPartitionData fetchPartitionData,
LogOffsetMetadata logOffsetMetadata) {
+ this.fetchPartitionData = fetchPartitionData;
+ this.logOffsetMetadata = logOffsetMetadata;
+ }
+
+ public FetchPartitionData fetchPartitionData() {
+ return fetchPartitionData;
+ }
+
+ public LogOffsetMetadata logOffsetMetadata() {
+ return logOffsetMetadata;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FetchPartitionOffsetData that = (FetchPartitionOffsetData) o;
+ return fetchPartitionData.equals(that.fetchPartitionData) &&
logOffsetMetadata == that.logOffsetMetadata;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fetchPartitionData, logOffsetMetadata);
+ }
Review Comment:
Why do we need this?
##########
core/src/main/java/kafka/server/share/FetchPartitionOffsetData.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.share;
+
+import org.apache.kafka.server.storage.log.FetchPartitionData;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+
+import java.util.Objects;
+
+public class FetchPartitionOffsetData {
+ private final FetchPartitionData fetchPartitionData;
+ private final LogOffsetMetadata logOffsetMetadata;
+
+ public FetchPartitionOffsetData(FetchPartitionData fetchPartitionData,
LogOffsetMetadata logOffsetMetadata) {
+ this.fetchPartitionData = fetchPartitionData;
+ this.logOffsetMetadata = logOffsetMetadata;
+ }
+
+ public FetchPartitionData fetchPartitionData() {
+ return fetchPartitionData;
+ }
+
+ public LogOffsetMetadata logOffsetMetadata() {
+ return logOffsetMetadata;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FetchPartitionOffsetData that = (FetchPartitionOffsetData) o;
+ return fetchPartitionData.equals(that.fetchPartitionData) &&
logOffsetMetadata == that.logOffsetMetadata;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fetchPartitionData, logOffsetMetadata);
+ }
+
+ @Override
+ public String toString() {
+ return "FetchPartitionOffsetData(fetchPartitionData=" +
fetchPartitionData +
Review Comment:
Will this log line print anything meaningful? I don't see toString in
`FetchPartitionData`. Should we return what's required here?
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -84,7 +88,8 @@ static Map<TopicIdPartition,
ShareFetchResponseData.PartitionData> processFetchR
partitionData.setErrorMessage(Errors.NONE.message());
}
} else {
- List<AcquiredRecords> acquiredRecords =
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData);
+ List<AcquiredRecords> acquiredRecords =
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData,
+ logOffsetMetadata);
Review Comment:
Merge with previous line.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +217,29 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ boolean isMinBytesCriteriaSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
Review Comment:
Should it be default scoped? If used for tests then please write // Visible
for testing.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +217,29 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
+ boolean isMinBytesCriteriaSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ AtomicLong accumulatedSize = new AtomicLong(0);
+ topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+ Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
Review Comment:
How the exception will be handled with this method? Do we have a test case
when exception is thrown by this replica manager call?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]