cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1624238381
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState
{
*/
abstract CompletableFuture<?> future();
- /**
- * Complete the request future with a TimeoutException if the request
timeout has been
- * reached, based on the provided current time.
- */
- void maybeExpire(long currentTimeMs) {
- if (retryTimeoutExpired(currentTimeMs)) {
- removeRequest();
- isExpired = true;
- future().completeExceptionally(new
TimeoutException(requestDescription() +
- " could not complete before timeout expired."));
- }
- }
-
/**
* Build request with the given builder, including response handling
logic.
*/
NetworkClientDelegate.UnsentRequest
buildRequestWithResponseHandling(final AbstractRequest.Builder<?> builder) {
NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
builder,
- coordinatorRequestManager.coordinator());
+ coordinatorRequestManager.coordinator(),
+ time.timer(requestTimeoutMs)
+ );
request.whenComplete(
(response, throwable) -> {
long currentTimeMs = request.handler().completionTimeMs();
Review Comment:
Not directly related to this PR: Why is this variable called `currentTimeMs`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer}
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+ private final Timer timer;
+
+ public TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs,
+ final Timer timer) {
+ super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+ this.timer = timer;
+ }
+
+ // Visible for testing
Review Comment:
This does not seem to be true. The constructor is not called in tests and
also the constructors that call this constructor are not called in tests. That
is one reason I do not like this kind of comments, because they easily start to
lie.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -235,7 +235,6 @@ public void addAll(final List<UnsentRequest> requests) {
public void add(final UnsentRequest r) {
Objects.requireNonNull(r);
- r.setTimer(this.time, this.requestTimeoutMs);
Review Comment:
I guess it does not make a big difference if the timer is set when the
unsent request is created vs. when the unsent request is added to the network
client, right?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -432,7 +436,7 @@ private void commitSyncWithRetries(OffsetCommitRequestState
requestAttempt,
result.complete(null);
} else {
if (error instanceof RetriableException) {
- if (error instanceof TimeoutException &&
requestAttempt.isExpired) {
+ if (error instanceof TimeoutException &&
requestAttempt.isExpired()) {
Review Comment:
Why do we only check expiration in case of a `TimeoutException` but not for
other retriable exceptions?
The `TopicMetadataRequestManager` handles this differently.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -960,7 +960,7 @@ void maybeReconcile() {
// best effort to commit the offsets in the case where the epoch might
have changed while
// the current reconciliation is in process. Note this is using the
rebalance timeout as
// it is the limit enforced by the broker to complete the
reconciliation process.
- commitResult =
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs));
+ commitResult =
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));
Review Comment:
Not directly related to this PR: is it OK to use the rebalance timeout here
instead of using a timer that computes the remaining time until the rebalance
timeout is expired?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]