cadonna commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1663943150
########## streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java: ########## @@ -219,6 +222,13 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo timestampExtractorSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + if (isTopologyOverride(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, topologyOverrides)) { + processingExceptionHandlerSupplier = getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG); + log.info("Topology {} is overriding {} to {}", topologyName, PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG)); + } else { + processingExceptionHandlerSupplier = globalAppConfigs.getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG); + } Review Comment: Why do you not call the same methods for creating the instance as for the `DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG` on line 232? In your approach, I think `configure()` is never called and the variable names are really confusing, because `processingExceptionHanlderSupplier` is not a supplier. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -372,6 +374,10 @@ public final synchronized void setNamedTopology(final NamedTopology namedTopolog this.namedTopology = namedTopology; } + public final synchronized void setProcessingExceptionHandler(final ProcessingExceptionHandler processingExceptionHandler) { Review Comment: @loicgreffier What do you think of passing a supplier `() -> getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class)` for the processing exception handler to the node similar how you passed in the processing exception handler instance, `node.setProcessingExceptionHandler(...)`. Then get the instance from the supplier during node initialization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org