junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1830199934
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -51,21 +55,26 @@ public class DelayedShareFetch extends DelayedOperation {
private static final Logger log =
LoggerFactory.getLogger(DelayedShareFetch.class);
- private final ShareFetchData shareFetchData;
+ private final ShareFetchData partitionsToComplete;
private final ReplicaManager replicaManager;
- private Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionDataFromTryComplete;
+ private Map<TopicIdPartition, FetchRequest.PartitionData>
partitionsAcquired;
+ private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
private final SharePartitionManager sharePartitionManager;
+ private final Map<TopicIdPartition, SharePartition> sharePartitions;
DelayedShareFetch(
- ShareFetchData shareFetchData,
+ ShareFetchData partitionsToComplete,
Review Comment:
This is actually a super set of the partitions to complete. So, it's better
to just keep the name shareFetchData. It would be useful to add a comment that
the partitions to be completed are given by sharePartitions and is a subset of
shareFetchData.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -51,21 +55,26 @@ public class DelayedShareFetch extends DelayedOperation {
private static final Logger log =
LoggerFactory.getLogger(DelayedShareFetch.class);
- private final ShareFetchData shareFetchData;
+ private final ShareFetchData partitionsToComplete;
private final ReplicaManager replicaManager;
- private Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionDataFromTryComplete;
+ private Map<TopicIdPartition, FetchRequest.PartitionData>
partitionsAcquired;
+ private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
private final SharePartitionManager sharePartitionManager;
+ private final Map<TopicIdPartition, SharePartition> sharePartitions;
DelayedShareFetch(
- ShareFetchData shareFetchData,
+ ShareFetchData partitionsToComplete,
ReplicaManager replicaManager,
- SharePartitionManager sharePartitionManager) {
- super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
- this.shareFetchData = shareFetchData;
+ SharePartitionManager sharePartitionManager,
+ Map<TopicIdPartition, SharePartition> sharePartitions) {
+ super(partitionsToComplete.fetchParams().maxWaitMs, Option.empty());
+ this.partitionsToComplete = partitionsToComplete;
this.replicaManager = replicaManager;
- this.topicPartitionDataFromTryComplete = new LinkedHashMap<>();
+ this.partitionsAcquired = new LinkedHashMap<>();
Review Comment:
Why is this a LinkedHashMap instead of just a HashMap?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +147,49 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
- if (!topicPartitionDataFromTryComplete.isEmpty()) {
+ try {
+ if (!topicPartitionData.isEmpty()) {
+ // In case, fetch offset metadata doesn't exist for one or
more topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata
and update the fetch offset metadata for
+ // those topic partitions.
+ Map<TopicIdPartition, LogReadResult>
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
+ maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
+ if
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) ||
isMinBytesSatisfied(topicPartitionData)) {
+ partitionsAcquired = topicPartitionData;
+ partitionsAlreadyFetched = replicaManagerReadResponse;
+ boolean completedByMe = forceComplete();
+ // If invocation of forceComplete is not successful, then
that means the request is already completed
+ // hence release the acquired locks.
+ if (!completedByMe) {
+ releasePartitionLocks(partitionsAcquired.keySet());
+ partitionsAlreadyFetched.clear();
+ partitionsAcquired.clear();
Review Comment:
We can skip clearing these two maps here since the operation is completed at
this point. In `onComplete()`, we don't clear these two maps. So, this will
make the behavior more consistent.
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +233,139 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
- topicIdPartitions.forEach(tp -> {
- SharePartition sharePartition =
sharePartitionManager.sharePartition(groupId, tp);
- if (sharePartition == null) {
- log.error("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), tp);
- return;
+ private Map<TopicIdPartition, LogReadResult>
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+ SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
+ if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, partitionData);
}
+ });
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ // We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
+ return readFromLog(missingFetchOffsetMetadataTopicPartitions);
+ }
+
+ private void maybeUpdateFetchOffsetMetadata(
+ Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
+ for (Map.Entry<TopicIdPartition, LogReadResult> entry :
replicaManagerReadResponseData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
+ LogReadResult replicaManagerLogReadResult = entry.getValue();
+ if (replicaManagerLogReadResult.error().code() !=
Errors.NONE.code()) {
+ log.debug("Replica manager read log result {} errored out for
topic partition {}",
+ replicaManagerLogReadResult, topicIdPartition);
+ continue;
+ }
+
sharePartition.updateFetchOffsetMetadata(Optional.of(replicaManagerLogReadResult.info().fetchOffsetMetadata));
+ }
+ }
+
+ private boolean isMinBytesSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ long accumulatedSize = 0;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ FetchRequest.PartitionData partitionData = entry.getValue();
+ LogOffsetMetadata endOffsetMetadata =
endOffsetMetadataForTopicPartition(topicIdPartition);
+
+ if (endOffsetMetadata == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+ continue;
+
+ SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
+
+ Optional<LogOffsetMetadata> optionalFetchOffsetMetadata =
sharePartition.fetchOffsetMetadata();
+ if (optionalFetchOffsetMetadata.isEmpty() ||
optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+ continue;
+ LogOffsetMetadata fetchOffsetMetadata =
optionalFetchOffsetMetadata.get();
+
+ if (fetchOffsetMetadata.messageOffset >
endOffsetMetadata.messageOffset) {
+ log.debug("Satisfying delayed share fetch request for group
{}, member {} since it is fetching later segments of " +
+ "topicIdPartition {}", partitionsToComplete.groupId(),
partitionsToComplete.memberId(), topicIdPartition);
+ return true;
+ } else if (fetchOffsetMetadata.messageOffset <
endOffsetMetadata.messageOffset) {
+ if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata)) {
+ // This can happen when the fetch operation is falling
behind the current segment or the partition
+ // has just rolled a new segment.
+ log.debug("Satisfying delayed share fetch request for
group {}, member {} immediately since it is fetching older " +
+ "segments of topicIdPartition {}",
partitionsToComplete.groupId(), partitionsToComplete.memberId(),
topicIdPartition);
+ return true;
+ } else if
(fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
+ // we take the partition fetch size as upper bound when
accumulating the bytes.
+ long bytesAvailable =
Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata),
partitionData.maxBytes);
+ accumulatedSize += bytesAvailable;
+ }
+ }
+ }
+ return accumulatedSize >= partitionsToComplete.fetchParams().minBytes;
+ }
+
+ private LogOffsetMetadata
endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
+ Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+ LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
+ // The FetchIsolation type that we use for share fetch is
FetchIsolation.HIGH_WATERMARK. In the future, we can
+ // extend it to support other FetchIsolation types.
+ FetchIsolation isolationType =
partitionsToComplete.fetchParams().isolation;
+ if (isolationType == FetchIsolation.LOG_END)
+ return offsetSnapshot.logEndOffset;
+ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
+ return offsetSnapshot.highWatermark;
+ else
+ return offsetSnapshot.lastStableOffset;
+
+ }
+
+ private Map<TopicIdPartition, LogReadResult>
readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
+ partitionsToComplete.fetchParams(),
+ CollectionConverters.asScala(
+ topicPartitionData.entrySet().stream().map(entry ->
+ new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
+ ),
+ QuotaFactory.UNBOUNDED_QUOTA,
+ true);
+
+ Map<TopicIdPartition, LogReadResult> responseData = new HashMap<>();
+ responseLogResult.foreach(tpLogResult -> {
+ responseData.put(tpLogResult._1(), tpLogResult._2());
+ return BoxedUnit.UNIT;
+ });
+
+ log.trace("Data successfully retrieved by replica manager: {}",
responseData);
+ return responseData;
+ }
+
+ private boolean anyTopicIdPartitionHasLogReadError(Map<TopicIdPartition,
LogReadResult> replicaManagerReadResponse) {
+ for (LogReadResult logReadResult :
replicaManagerReadResponse.values()) {
+ if (logReadResult.error().code() != Errors.NONE.code()) {
+ return true;
+ }
+ }
+ return false;
Review Comment:
This can be a bit simpler.
```
return replicaManagerReadResponse.values().stream()
.anyMatch(logReadResult -> logReadResult.error().code() !=
Errors.NONE.code());
```
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +233,139 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
- topicIdPartitions.forEach(tp -> {
- SharePartition sharePartition =
sharePartitionManager.sharePartition(groupId, tp);
- if (sharePartition == null) {
- log.error("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), tp);
- return;
+ private Map<TopicIdPartition, LogReadResult>
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+ topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+ SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
+ if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, partitionData);
}
+ });
+ if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ // We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
+ return readFromLog(missingFetchOffsetMetadataTopicPartitions);
+ }
+
+ private void maybeUpdateFetchOffsetMetadata(
+ Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
+ for (Map.Entry<TopicIdPartition, LogReadResult> entry :
replicaManagerReadResponseData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
+ LogReadResult replicaManagerLogReadResult = entry.getValue();
+ if (replicaManagerLogReadResult.error().code() !=
Errors.NONE.code()) {
+ log.debug("Replica manager read log result {} errored out for
topic partition {}",
+ replicaManagerLogReadResult, topicIdPartition);
+ continue;
+ }
+
sharePartition.updateFetchOffsetMetadata(Optional.of(replicaManagerLogReadResult.info().fetchOffsetMetadata));
+ }
+ }
+
+ private boolean isMinBytesSatisfied(Map<TopicIdPartition,
FetchRequest.PartitionData> topicPartitionData) {
+ long accumulatedSize = 0;
+ for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry :
topicPartitionData.entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ FetchRequest.PartitionData partitionData = entry.getValue();
+ LogOffsetMetadata endOffsetMetadata =
endOffsetMetadataForTopicPartition(topicIdPartition);
+
+ if (endOffsetMetadata == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+ continue;
+
+ SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
+
+ Optional<LogOffsetMetadata> optionalFetchOffsetMetadata =
sharePartition.fetchOffsetMetadata();
+ if (optionalFetchOffsetMetadata.isEmpty() ||
optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+ continue;
+ LogOffsetMetadata fetchOffsetMetadata =
optionalFetchOffsetMetadata.get();
+
+ if (fetchOffsetMetadata.messageOffset >
endOffsetMetadata.messageOffset) {
+ log.debug("Satisfying delayed share fetch request for group
{}, member {} since it is fetching later segments of " +
+ "topicIdPartition {}", partitionsToComplete.groupId(),
partitionsToComplete.memberId(), topicIdPartition);
+ return true;
+ } else if (fetchOffsetMetadata.messageOffset <
endOffsetMetadata.messageOffset) {
+ if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata)) {
+ // This can happen when the fetch operation is falling
behind the current segment or the partition
+ // has just rolled a new segment.
+ log.debug("Satisfying delayed share fetch request for
group {}, member {} immediately since it is fetching older " +
+ "segments of topicIdPartition {}",
partitionsToComplete.groupId(), partitionsToComplete.memberId(),
topicIdPartition);
+ return true;
+ } else if
(fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
+ // we take the partition fetch size as upper bound when
accumulating the bytes.
+ long bytesAvailable =
Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata),
partitionData.maxBytes);
+ accumulatedSize += bytesAvailable;
+ }
+ }
+ }
+ return accumulatedSize >= partitionsToComplete.fetchParams().minBytes;
+ }
+
+ private LogOffsetMetadata
endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
+ Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+ LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
+ // The FetchIsolation type that we use for share fetch is
FetchIsolation.HIGH_WATERMARK. In the future, we can
+ // extend it to support other FetchIsolation types.
+ FetchIsolation isolationType =
partitionsToComplete.fetchParams().isolation;
+ if (isolationType == FetchIsolation.LOG_END)
+ return offsetSnapshot.logEndOffset;
+ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
+ return offsetSnapshot.highWatermark;
+ else
+ return offsetSnapshot.lastStableOffset;
+
+ }
+
+ private Map<TopicIdPartition, LogReadResult>
readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
+ Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
+ partitionsToComplete.fetchParams(),
+ CollectionConverters.asScala(
+ topicPartitionData.entrySet().stream().map(entry ->
+ new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
+ ),
+ QuotaFactory.UNBOUNDED_QUOTA,
+ true);
+
+ Map<TopicIdPartition, LogReadResult> responseData = new HashMap<>();
+ responseLogResult.foreach(tpLogResult -> {
+ responseData.put(tpLogResult._1(), tpLogResult._2());
+ return BoxedUnit.UNIT;
+ });
+
+ log.trace("Data successfully retrieved by replica manager: {}",
responseData);
+ return responseData;
+ }
+
+ private boolean anyTopicIdPartitionHasLogReadError(Map<TopicIdPartition,
LogReadResult> replicaManagerReadResponse) {
Review Comment:
anyTopicIdPartitionHasLogReadError => anyPartitionHasLogReadError ?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +147,49 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
- if (!topicPartitionDataFromTryComplete.isEmpty()) {
+ try {
+ if (!topicPartitionData.isEmpty()) {
+ // In case, fetch offset metadata doesn't exist for one or
more topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata
and update the fetch offset metadata for
+ // those topic partitions.
+ Map<TopicIdPartition, LogReadResult>
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
+ maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
+ if
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) ||
isMinBytesSatisfied(topicPartitionData)) {
+ partitionsAcquired = topicPartitionData;
+ partitionsAlreadyFetched = replicaManagerReadResponse;
+ boolean completedByMe = forceComplete();
+ // If invocation of forceComplete is not successful, then
that means the request is already completed
+ // hence release the acquired locks.
+ if (!completedByMe) {
+ releasePartitionLocks(partitionsAcquired.keySet());
+ partitionsAlreadyFetched.clear();
+ partitionsAcquired.clear();
+ }
+ return completedByMe;
+ } else {
+ log.debug("minBytes is not satisfied for the share fetch
request for group {}, member {}, " +
+ "topic partitions {}",
partitionsToComplete.groupId(), partitionsToComplete.memberId(),
+ sharePartitions.keySet());
+ releasePartitionLocks(topicPartitionData.keySet());
+ }
+ } else {
+ log.trace("Can't acquire records for any partition in the
share fetch request for group {}, member {}, " +
+ "topic partitions {}", partitionsToComplete.groupId(),
partitionsToComplete.memberId(),
+ sharePartitions.keySet());
+ }
+ return false;
+ } catch (Exception e) {
+ log.error("Error processing delayed share fetch request", e);
boolean completedByMe = forceComplete();
// If invocation of forceComplete is not successful, then that
means the request is already completed
// hence release the acquired locks.
if (!completedByMe) {
- releasePartitionLocks(shareFetchData.groupId(),
topicPartitionDataFromTryComplete.keySet());
+ releasePartitionLocks(topicPartitionData.keySet());
Review Comment:
This part of the exception is still quite confusing to me. If we hit the
exception, should we release the locks before calling forceComplete()?
Otherwise, `forceComplete` will try to acquire the same locks again, but can't.
It will also help to narrow the try/catch to where the exception can be thrown.
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -574,6 +575,7 @@ void processShareFetch(ShareFetchData shareFetchData) {
// Initialize lazily, if required.
Map<TopicIdPartition, Throwable> erroneous = null;
Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new HashSet<>();
+ Map<TopicIdPartition, SharePartition> sharePartitions = new
LinkedHashMap<>();
Review Comment:
Why is this a LinkedHashMap instead of just a HashMap?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +233,139 @@ Map<TopicIdPartition, FetchRequest.PartitionData>
acquirablePartitions() {
return topicPartitionData;
}
- private void releasePartitionLocks(String groupId, Set<TopicIdPartition>
topicIdPartitions) {
- topicIdPartitions.forEach(tp -> {
- SharePartition sharePartition =
sharePartitionManager.sharePartition(groupId, tp);
- if (sharePartition == null) {
- log.error("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), tp);
- return;
+ private Map<TopicIdPartition, LogReadResult>
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
+ Map<TopicIdPartition, FetchRequest.PartitionData>
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
Review Comment:
missingFetchOffsetMetadataTopicPartitions =>
partitionsMissingFetchOffsetMetadata?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -80,64 +89,56 @@ public void onExpiration() {
@Override
public void onComplete() {
log.trace("Completing the delayed share fetch request for group {},
member {}, "
- + "topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(),
- topicPartitionDataFromTryComplete.keySet());
+ + "topic partitions {}", partitionsToComplete.groupId(),
partitionsToComplete.memberId(),
+ partitionsAcquired.keySet());
- if (shareFetchData.future().isDone())
+ if (partitionsToComplete.future().isDone())
return;
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we
have any partitions to fetch.
- if (topicPartitionDataFromTryComplete.isEmpty())
+ if (partitionsAcquired.isEmpty())
topicPartitionData = acquirablePartitions();
// tryComplete invoked forceComplete, so we can use the data from
tryComplete.
else
- topicPartitionData = topicPartitionDataFromTryComplete;
+ topicPartitionData = partitionsAcquired;
if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we complete
the request with an empty response.
- shareFetchData.future().complete(Collections.emptyMap());
+ partitionsToComplete.future().complete(Collections.emptyMap());
return;
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}",
- topicPartitionData, shareFetchData.groupId(),
shareFetchData.fetchParams());
+ topicPartitionData, partitionsToComplete.groupId(),
partitionsToComplete.fetchParams());
try {
- Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchData.fetchParams(),
- CollectionConverters.asScala(
- topicPartitionData.entrySet().stream().map(entry ->
- new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
- ),
- QuotaFactory.UNBOUNDED_QUOTA,
- true);
-
- Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
- responseLogResult.foreach(tpLogResult -> {
- TopicIdPartition topicIdPartition = tpLogResult._1();
- LogReadResult logResult = tpLogResult._2();
- FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
- responseData.put(topicIdPartition, fetchPartitionData);
- return BoxedUnit.UNIT;
- });
-
- log.trace("Data successfully retrieved by replica manager: {}",
responseData);
- Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result
=
- ShareFetchUtils.processFetchResponse(shareFetchData,
responseData, sharePartitionManager, replicaManager);
- shareFetchData.future().complete(result);
+ Map<TopicIdPartition, LogReadResult> responseData;
+ if (partitionsAlreadyFetched.isEmpty())
+ responseData = readFromLog(topicPartitionData);
+ else
+ // There shouldn't be a case when we have a
partitionsAlreadyFetched value here and this variable is getting
+ // updated in a different tryComplete thread.
Review Comment:
Hmm, Apoorv had a good point in his comment
(https://github.com/apache/kafka/pull/17539#discussion_r1828579315). There
seems to be a potential problem. It's possible that thread1 calls
`tryComplete`, finds `completed` to be false, and is about to set
`partitionsAlreadyFetched`. The expiration thread then calls `forceComplete`
and sets `completed` to true and proceeds to here. Now, thread1 continues and
updates `partitionsAlreadyFetched`. The expiration thread will pick up the
wrong `partitionsAlreadyFetched`.
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -64,11 +64,7 @@ static Map<TopicIdPartition,
ShareFetchResponseData.PartitionData> processFetchR
TopicIdPartition topicIdPartition = entry.getKey();
FetchPartitionData fetchPartitionData = entry.getValue();
- SharePartition sharePartition =
sharePartitionManager.sharePartition(shareFetchData.groupId(),
topicIdPartition);
- if (sharePartition == null) {
- log.error("Encountered null share partition for groupId={},
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
- continue;
- }
+ SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
Review Comment:
This is an existing issue. In the line below, we are logging for each
partition, but shareFetchData contains the full request.
```
log.trace("Acquired records for topicIdPartition: {} with
share fetch data: {}, records: {}",
topicIdPartition, shareFetchData, shareAcquiredRecords);
```
##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -107,23 +109,130 @@ public void
testDelayedShareFetchTryCompleteReturnsFalse() {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
- SharePartitionManager sharePartitionManager =
mock(SharePartitionManager.class);
- when(sharePartitionManager.sharePartition(groupId,
tp0)).thenReturn(sp0);
- when(sharePartitionManager.sharePartition(groupId,
tp1)).thenReturn(sp1);
+ Map<TopicIdPartition, SharePartition> sharePartitions = new
HashMap<>();
+ sharePartitions.put(tp0, sp0);
+ sharePartitions.put(tp1, sp1);
ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
- DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
+ DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
- .withSharePartitionManager(sharePartitionManager)
- .build();
+ .withSharePartitions(sharePartitions)
+ .build());
// Since there is no partition that can be acquired, tryComplete
should return false.
assertFalse(delayedShareFetch.tryComplete());
assertFalse(delayedShareFetch.isCompleted());
+ Mockito.verify(delayedShareFetch,
times(0)).releasePartitionLocks(any());
+ }
+
+ @Test
+ public void
testTryCompleteReturnsFalseWhenMinBytesNotSatisfiedOnFirstFetch() {
+ String groupId = "grp";
+ Uuid topicId = Uuid.randomUuid();
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ TopicIdPartition tp0 = new TopicIdPartition(topicId, new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(topicId, new
TopicPartition("foo", 1));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ SharePartition sp1 = mock(SharePartition.class);
+
+ when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+
+ Map<TopicIdPartition, SharePartition> sharePartitions = new
HashMap<>();
+ sharePartitions.put(tp0, sp0);
+ sharePartitions.put(tp1, sp1);
+
+ ShareFetchData shareFetchData = new ShareFetchData(
+ new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+ 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
+ new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
+
+ when(sp0.canAcquireRecords()).thenReturn(true);
+ when(sp1.canAcquireRecords()).thenReturn(false);
+ when(sp0.acquire(any(), anyInt(), any())).thenReturn(
+ ShareAcquiredRecords.fromAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
+
+ // We are testing the case when the share partition is getting fetched
for the first time, so for the first time
+ // the fetchOffsetMetadata will return empty. Post the readFromLog
call, the fetchOffsetMetadata will be
+ // populated for the share partition, which has 1 as the positional
difference, so it doesn't satisfy the minBytes(2).
+ when(sp0.fetchOffsetMetadata())
+ .thenReturn(Optional.empty())
+ .thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
+ LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1);
+ mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
+
+ doAnswer(invocation ->
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
+
+ DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
+ .withShareFetchData(shareFetchData)
+ .withSharePartitions(sharePartitions)
+ .withReplicaManager(replicaManager)
+ .build());
+ assertFalse(delayedShareFetch.isCompleted());
+
+ // Since sp1 cannot be acquired, tryComplete should return false.
+ assertFalse(delayedShareFetch.tryComplete());
+ assertFalse(delayedShareFetch.isCompleted());
+ Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
+ }
+
+ @Test
+ public void
testTryCompleteReturnsFalseWhenMinBytesNotSatisfiedOnLatestFetch() {
Review Comment:
testTryCompleteReturnsFalseWhenMinBytesNotSatisfiedOnLatestFetch =>
testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch ?
##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -107,23 +109,130 @@ public void
testDelayedShareFetchTryCompleteReturnsFalse() {
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
- SharePartitionManager sharePartitionManager =
mock(SharePartitionManager.class);
- when(sharePartitionManager.sharePartition(groupId,
tp0)).thenReturn(sp0);
- when(sharePartitionManager.sharePartition(groupId,
tp1)).thenReturn(sp1);
+ Map<TopicIdPartition, SharePartition> sharePartitions = new
HashMap<>();
+ sharePartitions.put(tp0, sp0);
+ sharePartitions.put(tp1, sp1);
ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS,
groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false);
- DelayedShareFetch delayedShareFetch =
DelayedShareFetchBuilder.builder()
+ DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetchData)
- .withSharePartitionManager(sharePartitionManager)
- .build();
+ .withSharePartitions(sharePartitions)
+ .build());
// Since there is no partition that can be acquired, tryComplete
should return false.
assertFalse(delayedShareFetch.tryComplete());
assertFalse(delayedShareFetch.isCompleted());
+ Mockito.verify(delayedShareFetch,
times(0)).releasePartitionLocks(any());
+ }
+
+ @Test
+ public void
testTryCompleteReturnsFalseWhenMinBytesNotSatisfiedOnFirstFetch() {
Review Comment:
testTryCompleteReturnsFalseWhenMinBytesNotSatisfiedOnFirstFetch =>
testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]