[ 
https://issues.apache.org/jira/browse/KAFKA-20604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Sinha updated KAFKA-20604:
--------------------------------
    Description: 
*This ticket tracks the implementation of 
[KIP-1348|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1348%3A+Complete+Built-in+Exception+Handler+Symmetry+with+LogAndContinueProductionExceptionHandler].
 Don't Start before the KIP solution is aprroved.*
h3. Problem

Kafka Streams ships built-in log-and-continue exception handlers for 
deserialization ({{{}LogAndContinueExceptionHandler{}}}) and processing 
({{{}LogAndContinueProcessingExceptionHandler{}}}), but not for 
production/serialization errors. Users who want to skip poison-pill records on 
the output side must write a custom {{ProductionExceptionHandler}} — something 
not required for the other two error types.

Additionally, DLQ records produced by any exception handler are currently sent 
through the same {{RecordCollectorImpl.send()}} path as regular records. With a 
continue handler configured, this creates:
 * An *infinite loop* in the production path (DLQ failure triggers handler 
again, which produces another DLQ record, etc.)
 * *Silent data loss* in the deserialization/processing paths (DLQ failure is 
swallowed by the continue handler, original record is lost)

h3. Changes

This KIP adds:
 # *{{LogAndContinueProductionExceptionHandler}}* — new built-in handler that 
logs at WARN level and returns {{{}Response.resume(){}}}. Supports DLQ via 
{{{}errors.dead.letter.queue.topic.name{}}}.
 # *{{LogAndFailProductionExceptionHandler}}* — rename of 
{{DefaultProductionExceptionHandler}} for naming consistency. 
{{DefaultProductionExceptionHandler}} is deprecated as a subclass alias.
 # *sendDlqRecord()* method on RecordCollector interface — dedicated DLQ send 
path that bypasses the production exception handler, preventing the infinite 
loop and silent data loss described above. All five DLQ call sites 
(RecordCollectorImpl.handleException(), RecordCollectorImpl.recordSendError(), 
RecordDeserializer.handleDeserializationFailure(), StreamTask processing error 
handler, and ProcessorNode.process() processing error handler) are updated to 
use it.

h3. Configuration

No new configuration keys. Users opt in via existing configs:
{code:java}
production.exception.handler=org.apache.kafka.streams.errors.LogAndContinueProductionExceptionHandler
errors.dead.letter.queue.topic.name=my-app-dlq
{code}
h3. Compatibility

Fully backward compatible. Default behavior unchanged. 
{{DefaultProductionExceptionHandler}} remains as a deprecated alias. The 
internal default class reference in {{StreamsConfig}} is updated from 
{{DefaultProductionExceptionHandler}} to 
{{LogAndFailProductionExceptionHandler}} to avoid {{-Werror}} deprecation 
compile errors; runtime behavior is identical.
h3. Files Changed

*New files:*
 * 
{{streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProductionExceptionHandler.java}}
 * 
{{streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProductionExceptionHandler.java}}

*Modified source:*
 * 
{{streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java}}
 — deprecated, extends LogAndFail
 * {{streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java}} — 
default class reference updated
 * 
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java}}
 — added {{sendDlqRecord()}}
 * 
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java}}
 — implemented {{{}sendDlqRecord(){}}}, updated DLQ send in 
{{handleException()}} and {{recordSendError()}}
 * 
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java}}
 — DLQ send updated to {{sendDlqRecord()}}
 * 
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java}}
 — DLQ send updated to {{sendDlqRecord()}}
 * ProcessorNode.java — DLQ send updated to sendDlqRecord()

*Modified tests:*
 * 
{{streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java}}
 — renamed existing DLQ tests, added 2 new LogAndContinue DLQ tests
 * {{streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java}} — 
added default handler test
 * {{streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java}} — 
added {{sendDlqRecord()}} mock
 * KeyValueStoreTestDriver.java — updated from 
DefaultProductionExceptionHandler to LogAndFailProductionExceptionHandler

*Modified docs:*
 * {{docs/streams/upgrade-guide.md}} — added 4.4.0 section
 * {{docs/streams/developer-guide/config-streams.md}} — updated handler 
references

  was:
This ticket tracks the implementation of 
[KIP-1348|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1348%3A+Complete+Built-in+Exception+Handler+Symmetry+with+LogAndContinueProductionExceptionHandler].
h3. Problem

Kafka Streams ships built-in log-and-continue exception handlers for 
deserialization ({{{}LogAndContinueExceptionHandler{}}}) and processing 
({{{}LogAndContinueProcessingExceptionHandler{}}}), but not for 
production/serialization errors. Users who want to skip poison-pill records on 
the output side must write a custom {{ProductionExceptionHandler}} — something 
not required for the other two error types.

Additionally, DLQ records produced by any exception handler are currently sent 
through the same {{RecordCollectorImpl.send()}} path as regular records. With a 
continue handler configured, this creates:
 * An *infinite loop* in the production path (DLQ failure triggers handler 
again, which produces another DLQ record, etc.)
 * *Silent data loss* in the deserialization/processing paths (DLQ failure is 
swallowed by the continue handler, original record is lost)

h3. Changes

This KIP adds:
 # *{{LogAndContinueProductionExceptionHandler}}* — new built-in handler that 
logs at WARN level and returns {{{}Response.resume(){}}}. Supports DLQ via 
{{{}errors.dead.letter.queue.topic.name{}}}.
 # *{{LogAndFailProductionExceptionHandler}}* — rename of 
{{DefaultProductionExceptionHandler}} for naming consistency. 
{{DefaultProductionExceptionHandler}} is deprecated as a subclass alias.
 # *sendDlqRecord()* method on RecordCollector interface — dedicated DLQ send 
path that bypasses the production exception handler, preventing the infinite 
loop and silent data loss described above. All five DLQ call sites 
(RecordCollectorImpl.handleException(), RecordCollectorImpl.recordSendError(), 
RecordDeserializer.handleDeserializationFailure(), StreamTask processing error 
handler, and ProcessorNode.process() processing error handler) are updated to 
use it.

h3. Configuration

No new configuration keys. Users opt in via existing configs:
{code:java}
production.exception.handler=org.apache.kafka.streams.errors.LogAndContinueProductionExceptionHandler
errors.dead.letter.queue.topic.name=my-app-dlq
{code}
h3. Compatibility

Fully backward compatible. Default behavior unchanged. 
{{DefaultProductionExceptionHandler}} remains as a deprecated alias. The 
internal default class reference in {{StreamsConfig}} is updated from 
{{DefaultProductionExceptionHandler}} to 
{{LogAndFailProductionExceptionHandler}} to avoid {{-Werror}} deprecation 
compile errors; runtime behavior is identical.
h3. Files Changed

*New files:*
 * 
{{streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProductionExceptionHandler.java}}
 * 
{{streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProductionExceptionHandler.java}}

*Modified source:*
 * 
{{streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java}}
 — deprecated, extends LogAndFail
 * {{streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java}} — 
default class reference updated
 * 
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java}}
 — added {{sendDlqRecord()}}
 * 
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java}}
 — implemented {{{}sendDlqRecord(){}}}, updated DLQ send in 
{{handleException()}} and {{recordSendError()}}
 * 
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java}}
 — DLQ send updated to {{sendDlqRecord()}}
 * 
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java}}
 — DLQ send updated to {{sendDlqRecord()}}
 * ProcessorNode.java — DLQ send updated to sendDlqRecord()

*Modified tests:*
 * 
{{streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java}}
 — renamed existing DLQ tests, added 2 new LogAndContinue DLQ tests
 * {{streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java}} — 
added default handler test
 * {{streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java}} — 
added {{sendDlqRecord()}} mock
 * KeyValueStoreTestDriver.java — updated from 
DefaultProductionExceptionHandler to LogAndFailProductionExceptionHandler

*Modified docs:*
 * {{docs/streams/upgrade-guide.md}} — added 4.4.0 section
 * {{docs/streams/developer-guide/config-streams.md}} — updated handler 
references


> KIP-1348: Complete Built-in Exception Handler Symmetry with 
> LogAndContinueProductionExceptionHandler
> ----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-20604
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20604
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Ankur Sinha
>            Assignee: Ankur Sinha
>            Priority: Minor
>              Labels: KIP-1348
>
> *This ticket tracks the implementation of 
> [KIP-1348|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1348%3A+Complete+Built-in+Exception+Handler+Symmetry+with+LogAndContinueProductionExceptionHandler].
>  Don't Start before the KIP solution is aprroved.*
> h3. Problem
> Kafka Streams ships built-in log-and-continue exception handlers for 
> deserialization ({{{}LogAndContinueExceptionHandler{}}}) and processing 
> ({{{}LogAndContinueProcessingExceptionHandler{}}}), but not for 
> production/serialization errors. Users who want to skip poison-pill records 
> on the output side must write a custom {{ProductionExceptionHandler}} — 
> something not required for the other two error types.
> Additionally, DLQ records produced by any exception handler are currently 
> sent through the same {{RecordCollectorImpl.send()}} path as regular records. 
> With a continue handler configured, this creates:
>  * An *infinite loop* in the production path (DLQ failure triggers handler 
> again, which produces another DLQ record, etc.)
>  * *Silent data loss* in the deserialization/processing paths (DLQ failure is 
> swallowed by the continue handler, original record is lost)
> h3. Changes
> This KIP adds:
>  # *{{LogAndContinueProductionExceptionHandler}}* — new built-in handler that 
> logs at WARN level and returns {{{}Response.resume(){}}}. Supports DLQ via 
> {{{}errors.dead.letter.queue.topic.name{}}}.
>  # *{{LogAndFailProductionExceptionHandler}}* — rename of 
> {{DefaultProductionExceptionHandler}} for naming consistency. 
> {{DefaultProductionExceptionHandler}} is deprecated as a subclass alias.
>  # *sendDlqRecord()* method on RecordCollector interface — dedicated DLQ send 
> path that bypasses the production exception handler, preventing the infinite 
> loop and silent data loss described above. All five DLQ call sites 
> (RecordCollectorImpl.handleException(), 
> RecordCollectorImpl.recordSendError(), 
> RecordDeserializer.handleDeserializationFailure(), StreamTask processing 
> error handler, and ProcessorNode.process() processing error handler) are 
> updated to use it.
> h3. Configuration
> No new configuration keys. Users opt in via existing configs:
> {code:java}
> production.exception.handler=org.apache.kafka.streams.errors.LogAndContinueProductionExceptionHandler
> errors.dead.letter.queue.topic.name=my-app-dlq
> {code}
> h3. Compatibility
> Fully backward compatible. Default behavior unchanged. 
> {{DefaultProductionExceptionHandler}} remains as a deprecated alias. The 
> internal default class reference in {{StreamsConfig}} is updated from 
> {{DefaultProductionExceptionHandler}} to 
> {{LogAndFailProductionExceptionHandler}} to avoid {{-Werror}} deprecation 
> compile errors; runtime behavior is identical.
> h3. Files Changed
> *New files:*
>  * 
> {{streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProductionExceptionHandler.java}}
>  * 
> {{streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProductionExceptionHandler.java}}
> *Modified source:*
>  * 
> {{streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java}}
>  — deprecated, extends LogAndFail
>  * {{streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java}} — 
> default class reference updated
>  * 
> {{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java}}
>  — added {{sendDlqRecord()}}
>  * 
> {{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java}}
>  — implemented {{{}sendDlqRecord(){}}}, updated DLQ send in 
> {{handleException()}} and {{recordSendError()}}
>  * 
> {{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java}}
>  — DLQ send updated to {{sendDlqRecord()}}
>  * 
> {{streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java}}
>  — DLQ send updated to {{sendDlqRecord()}}
>  * ProcessorNode.java — DLQ send updated to sendDlqRecord()
> *Modified tests:*
>  * 
> {{streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java}}
>  — renamed existing DLQ tests, added 2 new LogAndContinue DLQ tests
>  * {{streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java}} 
> — added default handler test
>  * {{streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java}} — 
> added {{sendDlqRecord()}} mock
>  * KeyValueStoreTestDriver.java — updated from 
> DefaultProductionExceptionHandler to LogAndFailProductionExceptionHandler
> *Modified docs:*
>  * {{docs/streams/upgrade-guide.md}} — added 4.4.0 section
>  * {{docs/streams/developer-guide/config-streams.md}} — updated handler 
> references



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to