AndrewJSchofield commented on code in PR #16727:
URL: https://github.com/apache/kafka/pull/16727#discussion_r1698096040
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -50,20 +50,21 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
- * {@code ShareConsumeRequestManager} is responsible for generating {@link
ShareFetchRequest} and
- * {@link ShareAcknowledgeRequest} to fetch and acknowledge records being
delivered for a consumer
+ * {@code ShareConsumeRequestManager} is responsible for generating
Review Comment:
nit: Comment formatting messed up without changing the works at all.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1022,9 +1194,12 @@ private List<UnsentRequest>
removeUnsentRequestByNode(Node node) {
@Override
protected void checkDisconnects(final long currentTimeMs) {
- // any disconnects affecting requests that have already been
transmitted will be handled
- // by NetworkClient, so we just need to check whether connections
for any of the unsent
- // requests have been disconnected; if they have, then we complete
the corresponding future
+ // any disconnects affecting requests that have already been
transmitted will be
Review Comment:
formatting
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long
currentTimeMs) {
return pollResult;
}
+ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState
acknowledgeRequestState,
+ long currentTimeMs, boolean onCommitAsync) {
+ if (acknowledgeRequestState == null || (!closing &&
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+ &&
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ } else if (!acknowledgeRequestState.maybeExpire()) {
+ if
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+ log.debug("Skipping acknowledge request because previous
request to {} has not been processed",
+ acknowledgeRequestState.nodeId);
+ } else {
+ if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+ acknowledgeRequestState.onSendAttempt(currentTimeMs);
+ if (onCommitAsync) {
+ isAsyncDone = true;
+ }
+ return
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+ }
+ }
+ } else {
+ // Fill in TimeoutException
+ for (TopicIdPartition tip :
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+ metricsManager
+
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+ acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+ }
+ acknowledgeRequestState.incompleteAcknowledgements.clear();
+ isAsyncDone = true;
+ }
+ return Optional.empty();
+ }
+
+ private boolean areAnyAcknowledgementsLeft() {
+ boolean areAnyAcksLeft = false;
+ Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator =
acknowledgeRequestStates.entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Pair<AcknowledgeRequestState>>
acknowledgeRequestStatePair = iterator.next();
+ if
(isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getFirst())
+ &&
isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getSecond())) {
+ areAnyAcksLeft = true;
+ } else if (!closing) {
+
acknowledgeRequestStates.remove(acknowledgeRequestStatePair.getKey());
+ }
+ }
+ if (!acknowledgeRequestStates.isEmpty())
+ areAnyAcksLeft = true;
+ return areAnyAcksLeft;
+ }
+
+ private boolean isRequestStateEmpty(AcknowledgeRequestState
acknowledgeRequestState) {
+ return acknowledgeRequestState == null
+ || !acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+ ||
!acknowledgeRequestState.incompleteAcknowledgements.isEmpty()
+ || !acknowledgeRequestState.inFlightAcknowledgements.isEmpty();
+ }
+
/**
* Enqueue an AcknowledgeRequestState to be picked up on the next poll
*
* @param acknowledgementsMap The acknowledgements to commit
- * @param deadlineMs Time until which the request will be retried
if it fails with
+ * @param deadlineMs Time until which the request will be retried
if it
Review Comment:
Formatting
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -449,7 +531,8 @@ private void handleShareFetchSuccess(Node fetchTarget,
}
acks.setAcknowledgeErrorCode(Errors.forCode(partitionData.acknowledgeErrorCode()));
Map<TopicIdPartition, Acknowledgements> acksMap =
Collections.singletonMap(tip, acks);
- ShareAcknowledgementCommitCallbackEvent event = new
ShareAcknowledgementCommitCallbackEvent(acksMap);
+ ShareAcknowledgementCommitCallbackEvent event = new
ShareAcknowledgementCommitCallbackEvent(
Review Comment:
If you want to make the line shorter, it's more readable to break it before
"new".
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1034,7 +1209,8 @@ protected void checkDisconnects(final long currentTimeMs)
{
FutureCompletionHandler handler =
unsentRequest.handler();
AuthenticationException authenticationException =
client.authenticationException(node);
long startMs = unsentRequest.timer().currentTimeMs() -
unsentRequest.timer().elapsedMs();
- handler.onComplete(new
ClientResponse(makeHeader(unsentRequest.requestBuilder().latestAllowedVersion()),
+ handler.onComplete(new ClientResponse(
+
makeHeader(unsentRequest.requestBuilder().latestAllowedVersion()),
Review Comment:
nit: This would be neater with the `new ClientResponse(` on the new line in
its entirety.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -207,46 +212,32 @@ public void fetch(Map<TopicIdPartition, Acknowledgements>
acknowledgementsMap) {
}
/**
- * Process acknowledgeRequestStates and prepares a list of
acknowledgements to be sent in the poll().
+ * Process acknowledgeRequestStates and prepares a list of
acknowledgements to
+ * be sent in the poll().
*
* @param currentTimeMs the current time in ms.
*
* @return the PollResult containing zero or more acknowledgements.
*/
private PollResult processAcknowledgements(long currentTimeMs) {
List<UnsentRequest> unsentRequests = new ArrayList<>();
- Iterator<AcknowledgeRequestState> iterator =
acknowledgeRequestStates.iterator();
- while (iterator.hasNext()) {
- AcknowledgeRequestState acknowledgeRequestState = iterator.next();
- if (acknowledgeRequestState.isProcessed()) {
- iterator.remove();
- } else if (!acknowledgeRequestState.maybeExpire()) {
- if
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
- log.trace("Skipping acknowledge request because previous
request to {} has not been processed", acknowledgeRequestState.nodeId);
- } else {
- if (acknowledgeRequestState.canSendRequest(currentTimeMs))
{
- acknowledgeRequestState.onSendAttempt(currentTimeMs);
- UnsentRequest request =
acknowledgeRequestState.buildRequest(currentTimeMs);
- if (request != null) {
- unsentRequests.add(request);
- }
- }
- }
- } else {
- // Fill in TimeoutException
- for (TopicIdPartition tip :
acknowledgeRequestState.acknowledgementsMap.keySet()) {
-
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
- acknowledgeRequestState.handleAcknowledgeErrorCode(tip,
Errors.REQUEST_TIMED_OUT);
- }
- iterator.remove();
- }
+ for (Pair<AcknowledgeRequestState> requestStates :
acknowledgeRequestStates.values()) {
+ isAsyncDone = false;
+ // For commitAsync
+ maybeBuildRequest(requestStates.getFirst(), currentTimeMs,
true).ifPresent(unsentRequests::add);
+
+ // Check to ensure we start processing commitSync/close only if
there are no
+ // commitAsync requests left to process.
+ if (isAsyncDone)
+ maybeBuildRequest(requestStates.getSecond(), currentTimeMs,
false).ifPresent(unsentRequests::add);
}
PollResult pollResult = null;
if (!unsentRequests.isEmpty()) {
pollResult = new PollResult(unsentRequests);
- } else if (!acknowledgeRequestStates.isEmpty()) {
- // Return empty result until all the acknowledgement request
states are processed
+ } else if (areAnyAcknowledgementsLeft()) {
+ // Return empty result until all the acknowledgement request
states are
Review Comment:
formatting
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -50,20 +50,21 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
- * {@code ShareConsumeRequestManager} is responsible for generating {@link
ShareFetchRequest} and
- * {@link ShareAcknowledgeRequest} to fetch and acknowledge records being
delivered for a consumer
+ * {@code ShareConsumeRequestManager} is responsible for generating
Review Comment:
nit: Comment formatting messed up without changing the words at all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]