mjsax commented on a change in pull request #10072: URL: https://github.com/apache/kafka/pull/10072#discussion_r574219892
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java ########## @@ -213,27 +214,26 @@ private void recordSendError(final String topic, final Exception exception, fina "indicating the task may be migrated out"; sendException.set(new TaskMigratedException(errorMessage, exception)); } else { - // TODO: KIP-572 handle `TimeoutException extends RetriableException` - // is seems inappropriate to pass `TimeoutException` into the `ProductionExceptionHander` - // -> should we add `TimeoutException` as `isFatalException` above (maybe not) ? - // -> maybe we should try to reset the task by throwing a `TaskCorruptedException` (including triggering `task.timeout.ms`) ? if (exception instanceof RetriableException) { errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " + "or the connection to broker was interrupted sending the request or receiving the response. " + "\nConsider overwriting `max.block.ms` and /or " + "`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors"; - } - - if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { - errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent."; - sendException.set(new StreamsException(errorMessage, exception)); + sendException.set( Review comment: Correct. Before this change, we would fail if the producer stops to retry and gives as a retryable exception. And if we can retry, we can throw a TCE. Thus, I would not say "we handle a retryable exception as timeout", but I would say, we stop failing but throw TCE on any retyable error (what is an orthogonal fix). However, because a timeout is a retryable error, we don't need to do any specific handling for timeout any longer. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org