[
https://issues.apache.org/jira/browse/KAFKA-20604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ankur Sinha reassigned KAFKA-20604:
-----------------------------------
Assignee: Ankur Sinha
> KIP-1348: Complete Built-in Exception Handler Symmetry with
> LogAndContinueProductionExceptionHandler
> ----------------------------------------------------------------------------------------------------
>
> Key: KAFKA-20604
> URL: https://issues.apache.org/jira/browse/KAFKA-20604
> Project: Kafka
> Issue Type: New Feature
> Components: streams
> Reporter: Ankur Sinha
> Assignee: Ankur Sinha
> Priority: Major
> Labels: KIP-1348
>
> This ticket tracks the implementation of
> [KIP-1348|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1348%3A+Complete+Built-in+Exception+Handler+Symmetry+with+LogAndContinueProductionExceptionHandler].
> h3. Problem
> Kafka Streams ships built-in log-and-continue exception handlers for
> deserialization ({{LogAndContinueExceptionHandler}}) and processing
> ({{LogAndContinueProcessingExceptionHandler}}), but not for
> production/serialization errors. Users who want to skip poison-pill records
> on the output side must write a custom {{ProductionExceptionHandler}} —
> something not required for the other two error types.
> Additionally, DLQ records produced by any exception handler are currently
> sent through the same {{RecordCollectorImpl.send()}} path as regular records.
> With a continue handler configured, this creates:
> * An *infinite loop* in the production path (DLQ failure triggers handler
> again, which produces another DLQ record, etc.)
> * *Silent data loss* in the deserialization/processing paths (DLQ failure is
> swallowed by the continue handler, original record is lost)
> h3. Changes
> This KIP adds:
> # *{{LogAndContinueProductionExceptionHandler}}* — new built-in handler that
> logs at WARN level and returns {{Response.resume()}}. Supports DLQ via
> {{errors.dead.letter.queue.topic.name}}.
> # *{{LogAndFailProductionExceptionHandler}}* — rename of
> {{DefaultProductionExceptionHandler}} for naming consistency.
> {{DefaultProductionExceptionHandler}} is deprecated as a subclass alias.
> # *{{sendDlqRecord()}}* method on {{RecordCollector}} interface — dedicated
> DLQ send path that bypasses the production exception handler, preventing the
> infinite loop and silent data loss described above. All four DLQ call sites
> ({{RecordCollectorImpl.handleException()}},
> {{RecordCollectorImpl.recordSendError()}},
> {{RecordDeserializer.handleDeserializationFailure()}}, and {{StreamTask}}
> processing error handler) are updated to use it.
> h3. Configuration
> No new configuration keys. Users opt in via existing configs:
> {code}
> production.exception.handler=org.apache.kafka.streams.errors.LogAndContinueProductionExceptionHandler
> errors.dead.letter.queue.topic.name=my-app-dlq
> {code}
> h3. Compatibility
> Fully backward compatible. Default behavior unchanged.
> {{DefaultProductionExceptionHandler}} remains as a deprecated alias. The
> internal default class reference in {{StreamsConfig}} is updated from
> {{DefaultProductionExceptionHandler}} to
> {{LogAndFailProductionExceptionHandler}} to avoid {{-Werror}} deprecation
> compile errors; runtime behavior is identical.
> h3. Files Changed
> *New files:*
> *
> {{streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProductionExceptionHandler.java}}
> *
> {{streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProductionExceptionHandler.java}}
> *Modified source:*
> *
> {{streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java}}
> — deprecated, extends LogAndFail
> * {{streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java}} —
> default class reference updated
> *
> {{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java}}
> — added {{sendDlqRecord()}}
> *
> {{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java}}
> — implemented {{sendDlqRecord()}}, updated DLQ send in {{handleException()}}
> and {{recordSendError()}}
> *
> {{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java}}
> — DLQ send updated to {{sendDlqRecord()}}
> *
> {{streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java}}
> — DLQ send updated to {{sendDlqRecord()}}
> *Modified tests:*
> *
> {{streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java}}
> — renamed existing DLQ tests, added 2 new LogAndContinue DLQ tests
> * {{streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java}} —
> added default handler test
> * {{streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java}} —
> added {{sendDlqRecord()}} mock
> *Modified docs:*
> * {{docs/streams/upgrade-guide.md}} — added 4.4.0 section
> * {{docs/streams/developer-guide/config-streams.md}} — updated handler
> references
--
This message was sent by Atlassian Jira
(v8.20.10#820010)