AndrewJSchofield commented on code in PR #16727:
URL: https://github.com/apache/kafka/pull/16727#discussion_r1698163085
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long
currentTimeMs) {
return pollResult;
}
+ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState
acknowledgeRequestState,
+ long currentTimeMs, boolean onCommitAsync) {
+ if (acknowledgeRequestState == null || (!closing &&
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+ &&
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ } else if (!acknowledgeRequestState.maybeExpire()) {
+ if
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+ log.debug("Skipping acknowledge request because previous
request to {} has not been processed",
+ acknowledgeRequestState.nodeId);
+ } else {
+ if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+ acknowledgeRequestState.onSendAttempt(currentTimeMs);
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+ }
+ }
+ } else {
+ // Fill in TimeoutException
+ for (TopicIdPartition tip :
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+ metricsManager
+
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+ acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+ }
+ acknowledgeRequestState.incompleteAcknowledgements.clear();
+ isAsyncDone = true;
Review Comment:
Should this line be conditional on `onCommitAsync` like the others?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -207,46 +212,32 @@ public void fetch(Map<TopicIdPartition, Acknowledgements>
acknowledgementsMap) {
}
/**
- * Process acknowledgeRequestStates and prepares a list of
acknowledgements to be sent in the poll().
+ * Process acknowledgeRequestStates and prepares a list of
acknowledgements to
+ * be sent in the poll().
*
* @param currentTimeMs the current time in ms.
*
* @return the PollResult containing zero or more acknowledgements.
*/
private PollResult processAcknowledgements(long currentTimeMs) {
List<UnsentRequest> unsentRequests = new ArrayList<>();
- Iterator<AcknowledgeRequestState> iterator =
acknowledgeRequestStates.iterator();
- while (iterator.hasNext()) {
- AcknowledgeRequestState acknowledgeRequestState = iterator.next();
- if (acknowledgeRequestState.isProcessed()) {
- iterator.remove();
- } else if (!acknowledgeRequestState.maybeExpire()) {
- if
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
- log.trace("Skipping acknowledge request because previous
request to {} has not been processed", acknowledgeRequestState.nodeId);
- } else {
- if (acknowledgeRequestState.canSendRequest(currentTimeMs))
{
- acknowledgeRequestState.onSendAttempt(currentTimeMs);
- UnsentRequest request =
acknowledgeRequestState.buildRequest(currentTimeMs);
- if (request != null) {
- unsentRequests.add(request);
- }
- }
- }
- } else {
- // Fill in TimeoutException
- for (TopicIdPartition tip :
acknowledgeRequestState.acknowledgementsMap.keySet()) {
-
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
- acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
Errors.REQUEST_TIMED_OUT);
- }
- iterator.remove();
- }
+ for (Pair<AcknowledgeRequestState> requestStates :
acknowledgeRequestStates.values()) {
+ isAsyncDone = false;
+ // For commitAsync
+ maybeBuildRequest(requestStates.getFirst(), currentTimeMs,
true).ifPresent(unsentRequests::add);
Review Comment:
I think the getFirst()/getSecond() could be better named
getAsyncRequest()/getSyncRequest() since that's what first and second actually
mean.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -378,32 +452,38 @@ public CompletableFuture<Void> acknowledgeOnClose(final
Map<TopicIdPartition, Ac
acknowledgementsMapForNode.put(tip, acknowledgements);
metricsManager.recordAcknowledgementSent(acknowledgements.size());
- log.debug("Added closing acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
+ log.debug("Added closing acknowledge request for
partition {} to node {}", tip.topicPartition(),
+ node.id());
resultCount.incrementAndGet();
}
}
- acknowledgeRequestStates.add(new
AcknowledgeRequestState(logContext,
- ShareConsumeRequestManager.class.getSimpleName() +
":3",
- deadlineMs,
- retryBackoffMs,
- retryBackoffMaxMs,
- sessionHandler,
- nodeId,
- acknowledgementsMapForNode,
- this::handleShareAcknowledgeCloseSuccess,
- this::handleShareAcknowledgeCloseFailure,
- resultHandler,
- true
- ));
+
+ acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null,
null));
+ // There can only be one commitSync()/close() happening at a
time. So per node,
Review Comment:
This is true but you could ensure that it's true in the code also.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long
currentTimeMs) {
return pollResult;
}
+ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState
acknowledgeRequestState,
+ long currentTimeMs, boolean onCommitAsync) {
+ if (acknowledgeRequestState == null || (!closing &&
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+ &&
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ } else if (!acknowledgeRequestState.maybeExpire()) {
+ if
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+ log.debug("Skipping acknowledge request because previous
request to {} has not been processed",
+ acknowledgeRequestState.nodeId);
+ } else {
+ if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+ acknowledgeRequestState.onSendAttempt(currentTimeMs);
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+ }
+ }
+ } else {
+ // Fill in TimeoutException
+ for (TopicIdPartition tip :
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+ metricsManager
+
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+ acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+ }
+ acknowledgeRequestState.incompleteAcknowledgements.clear();
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ }
+
+ private boolean areAnyAcknowledgementsLeft() {
+ boolean areAnyAcksLeft = false;
+ Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator =
acknowledgeRequestStates.entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Pair<AcknowledgeRequestState>>
acknowledgeRequestStatePair = iterator.next();
+ if
(isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getFirst())
+ &&
isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getSecond())) {
+ areAnyAcksLeft = true;
Review Comment:
This confuses me. It says "if there are no async or sync acks", but then it
sets `areAnyAcksLeft = true`. Seems that the logic and the names of the methods
could be a bit clearer.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long
currentTimeMs) {
return pollResult;
}
+ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState
acknowledgeRequestState,
+ long currentTimeMs, boolean onCommitAsync) {
+ if (acknowledgeRequestState == null || (!closing &&
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+ &&
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ } else if (!acknowledgeRequestState.maybeExpire()) {
+ if
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+ log.debug("Skipping acknowledge request because previous
request to {} has not been processed",
+ acknowledgeRequestState.nodeId);
+ } else {
+ if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+ acknowledgeRequestState.onSendAttempt(currentTimeMs);
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+ }
+ }
+ } else {
+ // Fill in TimeoutException
+ for (TopicIdPartition tip :
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+ metricsManager
+
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+ acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+ }
+ acknowledgeRequestState.incompleteAcknowledgements.clear();
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ }
+
+ private boolean areAnyAcknowledgementsLeft() {
+ boolean areAnyAcksLeft = false;
+ Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator =
acknowledgeRequestStates.entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Pair<AcknowledgeRequestState>>
acknowledgeRequestStatePair = iterator.next();
+ if
(isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getFirst())
+ &&
isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getSecond())) {
+ areAnyAcksLeft = true;
+ } else if (!closing) {
+
acknowledgeRequestStates.remove(acknowledgeRequestStatePair.getKey());
Review Comment:
Shouldn't this be `iterator.remove()`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -378,32 +452,38 @@ public CompletableFuture<Void> acknowledgeOnClose(final
Map<TopicIdPartition, Ac
acknowledgementsMapForNode.put(tip, acknowledgements);
metricsManager.recordAcknowledgementSent(acknowledgements.size());
- log.debug("Added closing acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
+ log.debug("Added closing acknowledge request for
partition {} to node {}", tip.topicPartition(),
+ node.id());
resultCount.incrementAndGet();
}
}
- acknowledgeRequestStates.add(new
AcknowledgeRequestState(logContext,
- ShareConsumeRequestManager.class.getSimpleName() +
":3",
- deadlineMs,
- retryBackoffMs,
- retryBackoffMaxMs,
- sessionHandler,
- nodeId,
- acknowledgementsMapForNode,
- this::handleShareAcknowledgeCloseSuccess,
- this::handleShareAcknowledgeCloseFailure,
- resultHandler,
- true
- ));
+
+ acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null,
null));
+ // There can only be one commitSync()/close() happening at a
time. So per node,
+ // there will be one acknowledge request state.
+ acknowledgeRequestStates.put(nodeId, new
Pair<>(acknowledgeRequestStates.get(nodeId).getFirst(),
+ new AcknowledgeRequestState(logContext,
+
ShareConsumeRequestManager.class.getSimpleName() + ":1",
Review Comment:
The name with `:3` at the end has been lost with this PR. Please restore
this.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -287,22 +338,29 @@ public CompletableFuture<Map<TopicIdPartition,
Acknowledgements>> commitSync(
acknowledgementsMapForNode.put(tip, acknowledgements);
metricsManager.recordAcknowledgementSent(acknowledgements.size());
- log.debug("Added sync acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
+ log.debug("Added sync acknowledge request for
partition {} to node {}", tip.topicPartition(),
+ node.id());
resultCount.incrementAndGet();
}
}
- acknowledgeRequestStates.add(new
AcknowledgeRequestState(logContext,
- ShareConsumeRequestManager.class.getSimpleName() +
":1",
- deadlineMs,
- retryBackoffMs,
- retryBackoffMaxMs,
- sessionHandler,
- nodeId,
- acknowledgementsMapForNode,
- this::handleShareAcknowledgeSuccess,
- this::handleShareAcknowledgeFailure,
- resultHandler
- ));
+ acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null,
null));
+
+ // There can only be one commitSync()/close() happening at a
time. So per node,
Review Comment:
This is true but it would be simple to ensure that using a bit of defensive
code. If we made an error in the code later, we might end up overwriting
something we didn't expect and not notice.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -324,44 +382,60 @@ public void commitAsync(final Map<TopicIdPartition,
Acknowledgements> acknowledg
Node node = cluster.nodeById(nodeId);
if (node != null) {
Map<TopicIdPartition, Acknowledgements>
acknowledgementsMapForNode = new HashMap<>();
+
+ acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null,
null));
+
for (TopicIdPartition tip :
sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements =
acknowledgementsMap.get(tip);
if (acknowledgements != null) {
acknowledgementsMapForNode.put(tip, acknowledgements);
metricsManager.recordAcknowledgementSent(acknowledgements.size());
- log.debug("Added async acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
+ log.debug("Added async acknowledge request for
partition {} to node {}", tip.topicPartition(),
+ node.id());
resultCount.incrementAndGet();
+ if (acknowledgeRequestStates.get(nodeId).getFirst() ==
null) {
Review Comment:
You should assign the result of
`acknowledgeRequestStates.get(nodeId).getFirst()` to a variable so that you can
avoid repeatedly evaluating this expression. I think you're doing it 3 times
when once would be enough and also make the code clearer.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long
currentTimeMs) {
return pollResult;
}
+ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState
acknowledgeRequestState,
+ long currentTimeMs, boolean onCommitAsync) {
+ if (acknowledgeRequestState == null || (!closing &&
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+ &&
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ } else if (!acknowledgeRequestState.maybeExpire()) {
+ if
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+ log.debug("Skipping acknowledge request because previous
request to {} has not been processed",
+ acknowledgeRequestState.nodeId);
+ } else {
+ if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+ acknowledgeRequestState.onSendAttempt(currentTimeMs);
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+ }
+ }
+ } else {
+ // Fill in TimeoutException
+ for (TopicIdPartition tip :
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+ metricsManager
+
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+ acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+ }
+ acknowledgeRequestState.incompleteAcknowledgements.clear();
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ }
+
+ private boolean areAnyAcknowledgementsLeft() {
+ boolean areAnyAcksLeft = false;
+ Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator =
acknowledgeRequestStates.entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Pair<AcknowledgeRequestState>>
acknowledgeRequestStatePair = iterator.next();
+ if
(isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getFirst())
+ &&
isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getSecond())) {
+ areAnyAcksLeft = true;
+ } else if (!closing) {
+
acknowledgeRequestStates.remove(acknowledgeRequestStatePair.getKey());
+ }
+ }
+ if (!acknowledgeRequestStates.isEmpty())
+ areAnyAcksLeft = true;
+ return areAnyAcksLeft;
+ }
+
+ private boolean isRequestStateEmpty(AcknowledgeRequestState
acknowledgeRequestState) {
Review Comment:
This could be clearer. I think `ars == null && ars.acksToSend.isEmpty &&
ars.ia.isEmpty() && ars.ifa.isEmpty()`.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long
currentTimeMs) {
return pollResult;
}
+ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState
acknowledgeRequestState,
+ long currentTimeMs, boolean onCommitAsync) {
+ if (acknowledgeRequestState == null || (!closing &&
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+ &&
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ } else if (!acknowledgeRequestState.maybeExpire()) {
+ if
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+ log.debug("Skipping acknowledge request because previous
request to {} has not been processed",
+ acknowledgeRequestState.nodeId);
+ } else {
+ if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+ acknowledgeRequestState.onSendAttempt(currentTimeMs);
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+ }
+ }
+ } else {
+ // Fill in TimeoutException
+ for (TopicIdPartition tip :
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+ metricsManager
+
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+ acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+ }
+ acknowledgeRequestState.incompleteAcknowledgements.clear();
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ }
+
+ private boolean areAnyAcknowledgementsLeft() {
+ boolean areAnyAcksLeft = false;
+ Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator =
acknowledgeRequestStates.entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Pair<AcknowledgeRequestState>>
acknowledgeRequestStatePair = iterator.next();
+ if
(isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getFirst())
+ &&
isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getSecond())) {
+ areAnyAcksLeft = true;
+ } else if (!closing) {
+
acknowledgeRequestStates.remove(acknowledgeRequestStatePair.getKey());
+ }
+ }
+ if (!acknowledgeRequestStates.isEmpty())
+ areAnyAcksLeft = true;
+ return areAnyAcksLeft;
+ }
+
+ private boolean isRequestStateEmpty(AcknowledgeRequestState
acknowledgeRequestState) {
Review Comment:
You could even make a method `boolean AcknowledgeRequestState.isEmpty()` so
that this code was not intimately aware of the three maps within.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]