mjsax commented on code in PR #21535:
URL: https://github.com/apache/kafka/pull/21535#discussion_r2898156125


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +321,100 @@ private void reprocessState(final List<TopicPartition> 
topicPartitions,
                             record.headers());
                     globalProcessorContext.setRecordContext(recordContext);
 
-                    try {
-                        if (record.key() != null) {
-                            source.process(new Record(
+                    if (record.key() != null) {
+                        // Deserialization phase
+                        final Record<?, ?> deserializedRecord;
+                        try {
+                            deserializedRecord = new Record<>(
                                 
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
                                 
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
                                 record.timestamp(),
-                                record.headers()));
+                                record.headers());
+                        } catch (final Exception deserializationException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            handleDeserializationFailure(
+                                deserializationExceptionHandler,
+                                globalProcessorContext,
+                                deserializationException,
+                                record,
+                                log,
+                                droppedRecordsSensor(
+                                    Thread.currentThread().getName(),
+                                    globalProcessorContext.taskId().toString(),
+                                    globalProcessorContext.metrics()
+                                ),
+                                null
+                            );
+                            continue; // Skip this record
+                        }
+                        final ProcessingExceptionHandler.Response response;
+                        // Processing phase
+                        try {
+                            ((Processor) source).process(deserializedRecord);
                             restoreCount++;
                             batchRestoreCount++;
+                        } catch (final Exception processingException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            if (processingExceptionHandler != null) {
+                                final ErrorHandlerContext errorHandlerContext 
= new DefaultErrorHandlerContext(
+                                    globalProcessorContext,
+                                    record.topic(),
+                                    record.partition(),
+                                    record.offset(),
+                                    record.headers(),
+                                    null,

Review Comment:
   I was just tracking where the name comes from -- We could change 
`ReprocessFactory` to provide the name we need? When `ReprocessFactory` is 
set-up, we have access to `processorName` so it should be easy to pass into the 
constructor, and add a getter so we can retrieve it here.
   
   Passing in `null` is really bad, as it could lead to NPE if user code does 
not handle `null` gracefully -- if we cannot make it work to get the real name, 
the alternative could be to pass in `"unknown"` (or something similar -- any 
good ideas what a good surrogate could be) instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to