AndrewJSchofield commented on code in PR #16727:
URL: https://github.com/apache/kafka/pull/16727#discussion_r1699932061
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -876,4 +1020,34 @@ public void completeIfEmpty() {
}
}
}
+
+ static class Pair<V> {
+ private V asyncRequest;
+ private V syncRequest;
+
+ public Pair(V first, V second) {
Review Comment:
nit: The arguments to the constructor should also be `asyncRequest` and
`syncRequest`
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -876,4 +1020,34 @@ public void completeIfEmpty() {
}
}
}
+
+ static class Pair<V> {
+ private V asyncRequest;
+ private V syncRequest;
+
+ public Pair(V first, V second) {
+ this.asyncRequest = first;
+ this.syncRequest = second;
+ }
+
+ public void setAsyncRequest(V asyncRequest) {
+ this.asyncRequest = asyncRequest;
+ }
+
+ public void setSecond(V second) {
Review Comment:
`setSyncRequest`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -332,22 +378,32 @@ public void commitAsync(final Map<TopicIdPartition,
Acknowledgements> acknowledg
metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added async acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
+ AcknowledgeRequestState asyncRequestState =
acknowledgeRequestStates.get(nodeId).getAsyncRequest();
+ if (asyncRequestState == null) {
+ acknowledgeRequestStates.replace(nodeId, new
Pair<>(new AcknowledgeRequestState(logContext,
Review Comment:
There's no need to replace or construct a new Pair here. Just use
`setAsyncRequest`.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,6 +242,57 @@ private PollResult processAcknowledgements(long
currentTimeMs) {
return pollResult;
}
+ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState
acknowledgeRequestState, long currentTimeMs, boolean onCommitAsync) {
Review Comment:
Thinking about this, I would like you to remove `isAsyncDone` entirely. If
this method returns `Optional.empty()`, then it means that no request was
generated. Then the caller can take an appropriate action based on its return
value, rather than using this piece of shared state which is more fragile.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long
currentTimeMs) {
return pollResult;
}
+ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState
acknowledgeRequestState,
+ long currentTimeMs, boolean onCommitAsync) {
+ if (acknowledgeRequestState == null || (!closing &&
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+ &&
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ } else if (!acknowledgeRequestState.maybeExpire()) {
+ if
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+ log.debug("Skipping acknowledge request because previous
request to {} has not been processed",
+ acknowledgeRequestState.nodeId);
+ } else {
+ if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+ acknowledgeRequestState.onSendAttempt(currentTimeMs);
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+ }
+ }
+ } else {
+ // Fill in TimeoutException
+ for (TopicIdPartition tip :
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+ metricsManager
+
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+ acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+ }
+ acknowledgeRequestState.incompleteAcknowledgements.clear();
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ }
+
+ private boolean areAnyAcknowledgementsLeft() {
+ boolean areAnyAcksLeft = false;
+ Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator =
acknowledgeRequestStates.entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Pair<AcknowledgeRequestState>>
acknowledgeRequestStatePair = iterator.next();
+ if
(isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getFirst())
+ &&
isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getSecond())) {
+ areAnyAcksLeft = true;
+ } else if (!closing) {
+
acknowledgeRequestStates.remove(acknowledgeRequestStatePair.getKey());
Review Comment:
I don't see this update yet.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,6 +242,57 @@ private PollResult processAcknowledgements(long
currentTimeMs) {
return pollResult;
}
+ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState
acknowledgeRequestState, long currentTimeMs, boolean onCommitAsync) {
+ if (acknowledgeRequestState == null || (!closing &&
acknowledgeRequestState.acknowledgementsToSend.isEmpty() &&
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ } else if (!acknowledgeRequestState.maybeExpire()) {
+ if
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+ log.debug("Skipping acknowledge request because previous
request to {} has not been processed", acknowledgeRequestState.nodeId);
+ } else {
+ if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+ acknowledgeRequestState.onSendAttempt(currentTimeMs);
+ if (onCommitAsync) {
Review Comment:
This part is wrong. `isAsyncDone` should only be set to true if
`onCommitAsync` is true AND we have not built a request.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -215,37 +215,20 @@ public void fetch(Map<TopicIdPartition, Acknowledgements>
acknowledgementsMap) {
*/
private PollResult processAcknowledgements(long currentTimeMs) {
List<UnsentRequest> unsentRequests = new ArrayList<>();
- Iterator<AcknowledgeRequestState> iterator =
acknowledgeRequestStates.iterator();
- while (iterator.hasNext()) {
- AcknowledgeRequestState acknowledgeRequestState = iterator.next();
- if (acknowledgeRequestState.isProcessed()) {
- iterator.remove();
- } else if (!acknowledgeRequestState.maybeExpire()) {
- if
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
- log.trace("Skipping acknowledge request because previous
request to {} has not been processed", acknowledgeRequestState.nodeId);
- } else {
- if (acknowledgeRequestState.canSendRequest(currentTimeMs))
{
- acknowledgeRequestState.onSendAttempt(currentTimeMs);
- UnsentRequest request =
acknowledgeRequestState.buildRequest(currentTimeMs);
- if (request != null) {
- unsentRequests.add(request);
- }
- }
- }
- } else {
- // Fill in TimeoutException
- for (TopicIdPartition tip :
acknowledgeRequestState.acknowledgementsMap.keySet()) {
-
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
- acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
Errors.REQUEST_TIMED_OUT);
- }
- iterator.remove();
- }
+ for (Pair<AcknowledgeRequestState> requestStates :
acknowledgeRequestStates.values()) {
+ isAsyncDone = false;
+ // For commitAsync
+ maybeBuildRequest(requestStates.getAsyncRequest(), currentTimeMs,
true).ifPresent(unsentRequests::add);
+
+ // Check to ensure we start processing commitSync/close only if
there are no commitAsync requests left to process.
+ if (isAsyncDone)
+ maybeBuildRequest(requestStates.getSyncRequest(),
currentTimeMs, false).ifPresent(unsentRequests::add);
}
PollResult pollResult = null;
if (!unsentRequests.isEmpty()) {
pollResult = new PollResult(unsentRequests);
- } else if (!acknowledgeRequestStates.isEmpty()) {
+ } else if (areAnyAcknowledgementsLeft()) {
Review Comment:
That means a side-effect of this predicate check in the else-if is to prune
any remaining acknowledge request states. That seems like a significant
side-effect to have which only runs conditionally. I would have make this part
of this check at all.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -382,22 +438,34 @@ public CompletableFuture<Void> acknowledgeOnClose(final
Map<TopicIdPartition, Ac
resultCount.incrementAndGet();
}
}
- acknowledgeRequestStates.add(new
AcknowledgeRequestState(logContext,
- ShareConsumeRequestManager.class.getSimpleName() +
":3",
- deadlineMs,
- retryBackoffMs,
- retryBackoffMaxMs,
- sessionHandler,
- nodeId,
- acknowledgementsMapForNode,
- this::handleShareAcknowledgeCloseSuccess,
- this::handleShareAcknowledgeCloseFailure,
- resultHandler,
- true
- ));
+
+ acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null,
null));
+
+ // Ensure there is no commitSync()/close() request already
present as they are blocking calls
+ // and only one request can be active at a time.
+ if (acknowledgeRequestStates.get(nodeId).getSyncRequest() !=
null) {
+ log.error("Attempt to call close() when there is an
existing sync request for node {}", node.id());
Review Comment:
The result needs to be considered here. Maybe in this case it's less
critical than for commitSync().
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -291,18 +325,27 @@ public CompletableFuture<Map<TopicIdPartition,
Acknowledgements>> commitSync(
resultCount.incrementAndGet();
}
}
- acknowledgeRequestStates.add(new
AcknowledgeRequestState(logContext,
- ShareConsumeRequestManager.class.getSimpleName() +
":1",
- deadlineMs,
- retryBackoffMs,
- retryBackoffMaxMs,
- sessionHandler,
- nodeId,
- acknowledgementsMapForNode,
- this::handleShareAcknowledgeSuccess,
- this::handleShareAcknowledgeFailure,
- resultHandler
- ));
+ acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null,
null));
+
+ // Ensure there is no commitSync()/close() request already
present as they are blocking calls
+ // and only one request can be active at a time.
+ if (acknowledgeRequestStates.get(nodeId).getSyncRequest() !=
null) {
+ log.error("Attempt to call commitSync() when there is an
existing sync request for node {}", node.id());
Review Comment:
Probably ought to have the resulting future completing exceptionally also.
Otherwise, it will look like the commitSync() completed normally when it will
not have been attempted and only the log line will reveal the bug.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -565,7 +640,7 @@ private void handleShareAcknowledgeFailure(Node fetchTarget,
TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
partition.partitionIndex(),
metadata.topicNames().get(topic.topicId()));
-
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
+
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
Errors.forException(error));
}));
} finally {
Review Comment:
I think we ought to call `acknowledgeRequestState.onFailedAttempt` or
`acknowledgeRequestState.onSuccessfulAttempt` for every single code path that
handles a response to an AcknowledgeRequestState. I think that's not true at
the moment but it's hard to tell because in some cases we do it in the
ShareConsumeRequestManager methods, and in some cases we do it in the
AcknowledgeRequestState methods. Please can you make this consistent to make it
easier to validate.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -520,31 +588,38 @@ private void handleShareAcknowledgeSuccess(Node
fetchTarget,
acknowledgeRequestState.onFailedAttempt(currentTimeMs);
if (response.error().exception() instanceof RetriableException
&& !closing) {
// We retry the request until the timer expires, unless we
are closing.
- } else {
- requestData.topics().forEach(topic ->
topic.partitions().forEach(partition -> {
- TopicIdPartition tip = new
TopicIdPartition(topic.topicId(),
- partition.partitionIndex(),
- metadata.topicNames().get(topic.topicId()));
-
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
-
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error());
- }));
-
- acknowledgeRequestState.processingComplete();
+ acknowledgeRequestState.retryRequest();
Review Comment:
Doesn't there need to be an else clause here for the situation in which we
are not retrying? That should `handleAcknowledgeErrorCode` and set the metrics
I think.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -291,18 +325,27 @@ public CompletableFuture<Map<TopicIdPartition,
Acknowledgements>> commitSync(
resultCount.incrementAndGet();
}
}
- acknowledgeRequestStates.add(new
AcknowledgeRequestState(logContext,
- ShareConsumeRequestManager.class.getSimpleName() +
":1",
- deadlineMs,
- retryBackoffMs,
- retryBackoffMaxMs,
- sessionHandler,
- nodeId,
- acknowledgementsMapForNode,
- this::handleShareAcknowledgeSuccess,
- this::handleShareAcknowledgeFailure,
- resultHandler
- ));
+ acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null,
null));
+
+ // Ensure there is no commitSync()/close() request already
present as they are blocking calls
+ // and only one request can be active at a time.
+ if (acknowledgeRequestStates.get(nodeId).getSyncRequest() !=
null) {
+ log.error("Attempt to call commitSync() when there is an
existing sync request for node {}", node.id());
+ } else {
+ // There can only be one commitSync()/close() happening at
a time. So per node, there will be one acknowledge request state representing
commitSync() and close().
+ acknowledgeRequestStates.put(nodeId, new
Pair<>(acknowledgeRequestStates.get(nodeId).getAsyncRequest(), new
AcknowledgeRequestState(logContext,
Review Comment:
There's no need to construct a new Pair here. You can use `setSyncRequest()`
instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]