lucasbru commented on code in PR #21535:
URL: https://github.com/apache/kafka/pull/21535#discussion_r2879053806
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +311,103 @@ private void reprocessState(final List<TopicPartition>
topicPartitions,
record.headers());
globalProcessorContext.setRecordContext(recordContext);
- try {
- if (record.key() != null) {
- source.process(new Record(
+ if (record.key() != null) {
+ // Deserialization phase
+ final Record<?, ?> deserializedRecord;
+ try {
+ deserializedRecord = new Record<>(
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
reprocessFactory.valueDeserializer().deserialize(record.topic(),
record.value()),
record.timestamp(),
- record.headers()));
+ record.headers());
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ handleDeserializationFailure(
+ deserializationExceptionHandler,
+ globalProcessorContext,
+ deserializationException,
+ record,
+ log,
+ droppedRecordsSensor(
+ Thread.currentThread().getName(),
+ globalProcessorContext.taskId().toString(),
+ globalProcessorContext.metrics()
+ ),
+ null
+ );
+ continue; // Skip this record
+ }
+ final ProcessingExceptionHandler.Response response;
+ // Processing phase
+ try {
+ @SuppressWarnings("unchecked")
+ final Processor<Object, Object, Object, Object>
typedSource =
+ (Processor<Object, Object, Object, Object>)
source;
+ @SuppressWarnings("unchecked")
+ final Record<Object, Object> typedRecord =
(Record<Object, Object>) deserializedRecord;
+ typedSource.process(typedRecord);
restoreCount++;
batchRestoreCount++;
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ if (processingExceptionHandler != null) {
+ final ErrorHandlerContext errorHandlerContext
= new DefaultErrorHandlerContext(
+ globalProcessorContext,
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.headers(),
+ storeName,
Review Comment:
`globalProcessorContext.currentNode()` is likely null during
`reprocessState()` — `setCurrentNode()` is never called here and `source` is a
raw `Processor`, not a `ProcessorNode`. This will NPE on `.name()`. Either
revert to `storeName` or add a `setCurrentNode` call before processing.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
Review Comment:
The remaining comment still says "e.g., for global threads" as the example
of when the handler is not set, but with this PR global threads can have the
handler when enabled. Maybe just drop the example parenthetical.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -404,6 +405,7 @@ private StateConsumer initialize() {
globalProcessorContext,
stateMgr,
config.deserializationExceptionHandler(),
+ processingExceptionHandler,
Review Comment:
Following the existing pattern for DeserializationExceptionHandler makes
sense for consistency. Fine to keep as-is.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]