[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1129642084 ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## @@ -324,11 +324,14 @@ public void disconnect(String nodeId) { log.info("Client requested disconnect from node {}", nodeId); selector.close(nodeId); long now = time.milliseconds(); -cancelInFlightRequests(nodeId, now, abortedSends); +cancelInFlightRequests(nodeId, now, abortedSends, false); connectionStates.disconnected(nodeId, now); } -private void cancelInFlightRequests(String nodeId, long now, Collection responses) { +private void cancelInFlightRequests(String nodeId, Review Comment: I'm marking this as "resolved" for now. Please reopen in protest if you strongly disagree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1129641185 ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## @@ -753,7 +756,8 @@ public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestH private void processDisconnection(List responses, Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1128855661 ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## @@ -1279,9 +1283,10 @@ public ClientResponse completed(AbstractResponse response, long timeMs) { false, null, null, response); } -public ClientResponse disconnected(long timeMs, AuthenticationException authenticationException) { +public ClientResponse disconnected(long timeMs, AuthenticationException authenticationException, boolean timedOut) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1127204081 ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## @@ -324,11 +324,14 @@ public void disconnect(String nodeId) { log.info("Client requested disconnect from node {}", nodeId); selector.close(nodeId); long now = time.milliseconds(); -cancelInFlightRequests(nodeId, now, abortedSends); +cancelInFlightRequests(nodeId, now, abortedSends, false); connectionStates.disconnected(nodeId, now); } -private void cancelInFlightRequests(String nodeId, long now, Collection responses) { +private void cancelInFlightRequests(String nodeId, Review Comment: I didn't want to introduce any inconsistency in this change as the rest of the code doesn't follow that pattern. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1123911988 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -641,7 +643,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons // thus it is not safe to reassign the sequence. failBatch(batch, response, batch.attempts() < this.retries); } -if (error.exception() instanceof InvalidMetadataException) { +if (error.exception() instanceof InvalidMetadataException || error.exception() instanceof TimeoutException) { Review Comment: @dajac I removed the `error.exception() instanceof TimeoutException` check in `Sender` as you suggested. The unit test for ensuring that the metadata is updated on request timeouts has been added to the pre-existing request timeout tests in `NetworkClientTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1117823636 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -641,7 +643,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons // thus it is not safe to reassign the sequence. failBatch(batch, response, batch.attempts() < this.retries); } -if (error.exception() instanceof InvalidMetadataException) { +if (error.exception() instanceof InvalidMetadataException || error.exception() instanceof TimeoutException) { Review Comment: @dajac For disconnected/timed out connections, this is the call stack: ```java Sender.poll() NetworkClient.poll() DefaultMetadataUpdater.handleServerDisconnect() Metadata.requestUpdate() However, `SenderTest` uses `MockClient` which doesn't have that same logic flow. So if I remove that clause of the `if` statement, the test, as written, fails. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1098825578 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -549,7 +549,13 @@ private boolean awaitNodeReady(Node node, FindCoordinatorRequest.CoordinatorType private void handleProduceResponse(ClientResponse response, Map batches, long now) { RequestHeader requestHeader = response.requestHeader(); int correlationId = requestHeader.correlationId(); -if (response.wasDisconnected()) { +if (response.wasTimedOut()) { +log.trace("Cancelled request with header {} due to node {} being disconnected due to timeout", Review Comment: Changed to: >Cancelled request with header {} due to the last request to node {} timed out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1082998126 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -2519,7 +2519,7 @@ public void testInflightBatchesExpireOnDeliveryTimeout() throws InterruptedExcep Map responseMap = new HashMap<>(); responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L)); -client.respond(new ProduceResponse(responseMap)); +client.respond(new ProduceResponse(responseMap), true, true); Review Comment: Ah! I reverted that change and added a dedicated `testMetadataRefreshOnRequestTimeout` test method to `SenderTest`. Let me know if that new test is sufficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1082997268 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -552,9 +552,11 @@ private void handleProduceResponse(ClientResponse response, Map
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1082997576 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -641,7 +643,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons // thus it is not safe to reassign the sequence. failBatch(batch, response, batch.attempts() < this.retries); } -if (error.exception() instanceof InvalidMetadataException) { +if (error.exception() instanceof InvalidMetadataException || error.exception() instanceof TimeoutException) { Review Comment: I believe it is necessary and added a comment to the code to explain why. LMK if that makes sense. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1057895364 ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## @@ -1284,6 +1292,11 @@ public ClientResponse disconnected(long timeMs, AuthenticationException authenti true, null, authenticationException, null); } +public ClientResponse timedOut(long timeMs, AuthenticationException authenticationException) { +return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, +true, true, null, authenticationException, null); Review Comment: Correct. I have added a check in the `ClientResponse` constructor to ensure that time outs are only set when also disconnected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1057895052 ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## @@ -346,8 +346,16 @@ private void cancelInFlightRequests(String nodeId, long now, Collection request.requestTimeoutMs) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org