[ 
https://issues.apache.org/jira/browse/KAFKA-17099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Loïc Greffier updated KAFKA-17099:
----------------------------------
    Description: 
h3. Current Behaviour

 
When an exception occurs in a processor node, the task executor does not log 
the actual processor node where the exception occurs.
 
For example, considering the following topology:
 
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [PERSON_TOPIC])
      --> KSTREAM-PEEK-0000000001
    Processor: KSTREAM-PEEK-0000000001 (stores: [])
      --> KSTREAM-MAP-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAP-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-PEEK-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: PERSON_MAP_TOPIC)
      <-- KSTREAM-MAP-0000000002
 
When an exception is thrown in the processor KSTREAM-MAP-0000000002, the 
following information will be logged by the task executor:
 
2024-07-08T22:17:19.926+02:00  INFO 10552 — [-StreamThread-1] 
i.g.l.s.map.app.KafkaStreamsTopology     : Received key = 0, value = \{"id": 0, 
"firstName": "Ethan", "lastName": "Moore", "nationality": "CH", "birthDate": 
"2011-02-21T15:45:12Z"}
2024-07-08T22:17:30.082+02:00 ERROR 10552 — [-StreamThread-1] 
o.a.k.s.p.internals.TaskExecutor         : stream-thread 
[streams-map-StreamThread-1] Failed to process stream task 0_0 due to the 
following error:
 
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, 
partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad 
happened...
at 
io.github.loicgreffier.streams.map.app.KafkaStreamsTopology.lambda$topology$1(KafkaStreamsTopology.java:33)
at 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:46)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
at 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at 
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:767)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 ~[kafka-streams-3.6.1.jar:na]
Caused by: java.lang.RuntimeException: Something bad happened...
at 
io.github.loicgreffier.streams.map.app.KafkaStreamsTopology.lambda$topology$1(KafkaStreamsTopology.java:33)
 ~[classes/:na]
at 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:46)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
 ~[kafka-streams-3.6.1.jar:na]
... 6 common frames omitted
 
On line #1 of the stack trace, it appears that an exception has been caught in 
the processor KSTREAM-SOURCE-0000000000 while the exception actually occurred 
in KSTREAM-MAP-0000000002.
h3. Expected Behaviour

 
The stack trace should provide the precise node in which the exception occurred 
(e.g., KSTREAM-MAP-0000000002).
h3. Current Limitation

 
The current limitation is that processing exceptions are caught in the [stream 
task#process|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L802]
 where it is not possible to get the exact processor node where the exception 
occurred.
h3. Improvement Proposal

 
With the changes brought by KAFKA-16448, processing exceptions will be caught 
at the processor node level and wrapped into an internal exception named 
*FailedProcessingException* before being thrown to the stream task.
 
This change should allow to identify the precise processor node where a 
processing exception occurs and bring its name up to the stream task where it 
will be used to build the 
[StreamsException|https://github.com/apache/kafka/blob/25230b538841a5e7256b1b51725361dd59435901/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L803]
 that will appear in the logs.

  was:
h3. Current Behaviour
 
When an exception occurs in a processor node, the task executor does not log 
the actual processor node where the exception occurs.
 
For example, considering the following topology:
 
```
 Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [PERSON_TOPIC])
      --> KSTREAM-PEEK-0000000001
    Processor: KSTREAM-PEEK-0000000001 (stores: [])
      --> KSTREAM-MAP-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAP-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-PEEK-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: PERSON_MAP_TOPIC)
      <-- KSTREAM-MAP-0000000002
```
 
When an exception is thrown in the processor KSTREAM-MAP-0000000002, the 
following information will be logged by the task executor:
 
2024-07-08T22:17:19.926+02:00  INFO 10552 --- [-StreamThread-1] 
i.g.l.s.map.app.KafkaStreamsTopology     : Received key = 0, value = \{"id": 0, 
"firstName": "Ethan", "lastName": "Moore", "nationality": "CH", "birthDate": 
"2011-02-21T15:45:12Z"}
2024-07-08T22:17:30.082+02:00 ERROR 10552 --- [-StreamThread-1] 
o.a.k.s.p.internals.TaskExecutor         : stream-thread 
[streams-map-StreamThread-1] Failed to process stream task 0_0 due to the 
following error:
 
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, 
partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad 
happened...
at 
io.github.loicgreffier.streams.map.app.KafkaStreamsTopology.lambda$topology$1(KafkaStreamsTopology.java:33)
at 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:46)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
at 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at 
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:767)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 ~[kafka-streams-3.6.1.jar:na]
Caused by: java.lang.RuntimeException: Something bad happened...
at 
io.github.loicgreffier.streams.map.app.KafkaStreamsTopology.lambda$topology$1(KafkaStreamsTopology.java:33)
 ~[classes/:na]
at 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:46)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
 ~[kafka-streams-3.6.1.jar:na]
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
 ~[kafka-streams-3.6.1.jar:na]
... 6 common frames omitted
 
On line #1 of the stack trace, it appears that an exception has been caught in 
the processor KSTREAM-SOURCE-0000000000 while the exception actually occurred 
in KSTREAM-MAP-0000000002.
h3. Expected Behaviour
 
The stack trace should provide the precise node in which the exception occurred 
(e.g., KSTREAM-MAP-0000000002).
h3. Current Limitation
 
The current limitation is that processing exceptions are caught in the [stream 
task#process|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L802]
 where it is not possible to get the exact processor node where the exception 
occurred.
h3. Improvement Proposal
 
With the changes brought by 
[KAFKA-16448|https://issues.apache.org/jira/browse/KAFKA-16448], processing 
exceptions will be caught at the processor node level and wrapped into an 
internal exception named *FailedProcessingException* before being thrown to the 
stream task.
 
This change should allow to identify the precise processor node where a 
processing exception occurs and bring its name up to the stream task where it 
will be used to build the 
[StreamsException|https://github.com/apache/kafka/blob/25230b538841a5e7256b1b51725361dd59435901/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L803]
 that will appear in the logs.


> Improve the process exception logs with the exact processor node where the 
> exception occurs
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-17099
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17099
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Loïc Greffier
>            Priority: Minor
>
> h3. Current Behaviour
>  
> When an exception occurs in a processor node, the task executor does not log 
> the actual processor node where the exception occurs.
>  
> For example, considering the following topology:
>  
> Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-0000000000 (topics: [PERSON_TOPIC])
>       --> KSTREAM-PEEK-0000000001
>     Processor: KSTREAM-PEEK-0000000001 (stores: [])
>       --> KSTREAM-MAP-0000000002
>       <-- KSTREAM-SOURCE-0000000000
>     Processor: KSTREAM-MAP-0000000002 (stores: [])
>       --> KSTREAM-SINK-0000000003
>       <-- KSTREAM-PEEK-0000000001
>     Sink: KSTREAM-SINK-0000000003 (topic: PERSON_MAP_TOPIC)
>       <-- KSTREAM-MAP-0000000002
>  
> When an exception is thrown in the processor KSTREAM-MAP-0000000002, the 
> following information will be logged by the task executor:
>  
> 2024-07-08T22:17:19.926+02:00  INFO 10552 — [-StreamThread-1] 
> i.g.l.s.map.app.KafkaStreamsTopology     : Received key = 0, value = \{"id": 
> 0, "firstName": "Ethan", "lastName": "Moore", "nationality": "CH", 
> "birthDate": "2011-02-21T15:45:12Z"}
> 2024-07-08T22:17:30.082+02:00 ERROR 10552 — [-StreamThread-1] 
> o.a.k.s.p.internals.TaskExecutor         : stream-thread 
> [streams-map-StreamThread-1] Failed to process stream task 0_0 due to the 
> following error:
>  
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, 
> partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad 
> happened...
> at 
> io.github.loicgreffier.streams.map.app.KafkaStreamsTopology.lambda$topology$1(KafkaStreamsTopology.java:33)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:46)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:767)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  ~[kafka-streams-3.6.1.jar:na]
> Caused by: java.lang.RuntimeException: Something bad happened...
> at 
> io.github.loicgreffier.streams.map.app.KafkaStreamsTopology.lambda$topology$1(KafkaStreamsTopology.java:33)
>  ~[classes/:na]
> at 
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:46)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
>  ~[kafka-streams-3.6.1.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
>  ~[kafka-streams-3.6.1.jar:na]
> ... 6 common frames omitted
>  
> On line #1 of the stack trace, it appears that an exception has been caught 
> in the processor KSTREAM-SOURCE-0000000000 while the exception actually 
> occurred in KSTREAM-MAP-0000000002.
> h3. Expected Behaviour
>  
> The stack trace should provide the precise node in which the exception 
> occurred (e.g., KSTREAM-MAP-0000000002).
> h3. Current Limitation
>  
> The current limitation is that processing exceptions are caught in the 
> [stream 
> task#process|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L802]
>  where it is not possible to get the exact processor node where the exception 
> occurred.
> h3. Improvement Proposal
>  
> With the changes brought by KAFKA-16448, processing exceptions will be caught 
> at the processor node level and wrapped into an internal exception named 
> *FailedProcessingException* before being thrown to the stream task.
>  
> This change should allow to identify the precise processor node where a 
> processing exception occurs and bring its name up to the stream task where it 
> will be used to build the 
> [StreamsException|https://github.com/apache/kafka/blob/25230b538841a5e7256b1b51725361dd59435901/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L803]
>  that will appear in the logs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to