[ 
https://issues.apache.org/jira/browse/KAFKA-20604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Sinha reassigned KAFKA-20604:
-----------------------------------

    Assignee: Ankur Sinha

> KIP-1348: Complete Built-in Exception Handler Symmetry with 
> LogAndContinueProductionExceptionHandler
> ----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-20604
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20604
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Ankur Sinha
>            Assignee: Ankur Sinha
>            Priority: Major
>              Labels: KIP-1348
>
> This ticket tracks the implementation of 
> [KIP-1348|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1348%3A+Complete+Built-in+Exception+Handler+Symmetry+with+LogAndContinueProductionExceptionHandler].
> h3. Problem
> Kafka Streams ships built-in log-and-continue exception handlers for 
> deserialization ({{LogAndContinueExceptionHandler}}) and processing 
> ({{LogAndContinueProcessingExceptionHandler}}), but not for 
> production/serialization errors. Users who want to skip poison-pill records 
> on the output side must write a custom {{ProductionExceptionHandler}} — 
> something not required for the other two error types.
> Additionally, DLQ records produced by any exception handler are currently 
> sent through the same {{RecordCollectorImpl.send()}} path as regular records. 
> With a continue handler configured, this creates:
> * An *infinite loop* in the production path (DLQ failure triggers handler 
> again, which produces another DLQ record, etc.)
> * *Silent data loss* in the deserialization/processing paths (DLQ failure is 
> swallowed by the continue handler, original record is lost)
> h3. Changes
> This KIP adds:
> # *{{LogAndContinueProductionExceptionHandler}}* — new built-in handler that 
> logs at WARN level and returns {{Response.resume()}}. Supports DLQ via 
> {{errors.dead.letter.queue.topic.name}}.
> # *{{LogAndFailProductionExceptionHandler}}* — rename of 
> {{DefaultProductionExceptionHandler}} for naming consistency. 
> {{DefaultProductionExceptionHandler}} is deprecated as a subclass alias.
> # *{{sendDlqRecord()}}* method on {{RecordCollector}} interface — dedicated 
> DLQ send path that bypasses the production exception handler, preventing the 
> infinite loop and silent data loss described above. All four DLQ call sites 
> ({{RecordCollectorImpl.handleException()}}, 
> {{RecordCollectorImpl.recordSendError()}}, 
> {{RecordDeserializer.handleDeserializationFailure()}}, and {{StreamTask}} 
> processing error handler) are updated to use it.
> h3. Configuration
> No new configuration keys. Users opt in via existing configs:
> {code}
> production.exception.handler=org.apache.kafka.streams.errors.LogAndContinueProductionExceptionHandler
> errors.dead.letter.queue.topic.name=my-app-dlq
> {code}
> h3. Compatibility
> Fully backward compatible. Default behavior unchanged. 
> {{DefaultProductionExceptionHandler}} remains as a deprecated alias. The 
> internal default class reference in {{StreamsConfig}} is updated from 
> {{DefaultProductionExceptionHandler}} to 
> {{LogAndFailProductionExceptionHandler}} to avoid {{-Werror}} deprecation 
> compile errors; runtime behavior is identical.
> h3. Files Changed
> *New files:*
> * 
> {{streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProductionExceptionHandler.java}}
> * 
> {{streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProductionExceptionHandler.java}}
> *Modified source:*
> * 
> {{streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java}}
>  — deprecated, extends LogAndFail
> * {{streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java}} — 
> default class reference updated
> * 
> {{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java}}
>  — added {{sendDlqRecord()}}
> * 
> {{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java}}
>  — implemented {{sendDlqRecord()}}, updated DLQ send in {{handleException()}} 
> and {{recordSendError()}}
> * 
> {{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java}}
>  — DLQ send updated to {{sendDlqRecord()}}
> * 
> {{streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java}}
>  — DLQ send updated to {{sendDlqRecord()}}
> *Modified tests:*
> * 
> {{streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java}}
>  — renamed existing DLQ tests, added 2 new LogAndContinue DLQ tests
> * {{streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java}} — 
> added default handler test
> * {{streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java}} — 
> added {{sendDlqRecord()}} mock
> *Modified docs:*
> * {{docs/streams/upgrade-guide.md}} — added 4.4.0 section
> * {{docs/streams/developer-guide/config-streams.md}} — updated handler 
> references



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to