cadonna commented on code in PR #16432:
URL: https://github.com/apache/kafka/pull/16432#discussion_r1689250700


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java:
##########
@@ -80,12 +81,29 @@ public static void handleDeserializationFailure(final 
DeserializationExceptionHa
                                                     final 
ConsumerRecord<byte[], byte[]> rawRecord,
                                                     final Logger log,
                                                     final Sensor 
droppedRecordsSensor) {
+        handleDeserializationFailure(deserializationExceptionHandler, 
processorContext, deserializationException, rawRecord, log, 
droppedRecordsSensor, null);
+    }
+
+    public static void handleDeserializationFailure(final 
DeserializationExceptionHandler deserializationExceptionHandler,
+                                                    final ProcessorContext<?, 
?> processorContext,
+                                                    final Exception 
deserializationException,
+                                                    final 
ConsumerRecord<byte[], byte[]> rawRecord,
+                                                    final Logger log,
+                                                    final Sensor 
droppedRecordsSensor,
+                                                    final String 
sourceNodeName) {
         final DeserializationExceptionHandler.DeserializationHandlerResponse 
response;
         try {
-            response = deserializationExceptionHandler.handle(
+            final DefaultErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
                 (InternalProcessorContext<?, ?>) processorContext,
-                rawRecord,
-                deserializationException);
+                rawRecord.topic(),
+                rawRecord.partition(),
+                rawRecord.offset(),
+                rawRecord.headers(),
+                rawRecord.key(),
+                rawRecord.value(),
+                sourceNodeName,
+                processorContext.taskId());
+            response = 
deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, 
deserializationException);
         } catch (final Exception fatalUserException) {

Review Comment:
   I think, we misunderstood each other. On this line an exception originating 
from the user-specified handler is caught. The exception can be anything that 
is thrown by user code. We did not consider exceptions thrown from user code 
for the processing exception handler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to