cadonna commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1663943150


##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -219,6 +222,13 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
             timestampExtractorSupplier = () -> 
globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
 TimestampExtractor.class);
         }
 
+        if (isTopologyOverride(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
topologyOverrides)) {
+            processingExceptionHandlerSupplier = 
getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG);
+            log.info("Topology {} is overriding {} to {}", topologyName, 
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG));
+        } else {
+            processingExceptionHandlerSupplier = 
globalAppConfigs.getClass(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG);
+        }

Review Comment:
   Why do you not call the same methods for creating the instance as for the 
`DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG` on line 232?
   In your approach, I think `configure()` is never called and the variable 
names are really confusing, because `processingExceptionHanlderSupplier` is not 
a supplier.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -372,6 +374,10 @@ public final synchronized void setNamedTopology(final 
NamedTopology namedTopolog
         this.namedTopology = namedTopology;
     }
 
+    public final synchronized void setProcessingExceptionHandler(final 
ProcessingExceptionHandler processingExceptionHandler) {

Review Comment:
   @loicgreffier What do you think of passing a supplier `() -> 
getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
ProcessingExceptionHandler.class)` for the processing exception handler to the 
node similar how you passed in the processing exception handler instance, 
`node.setProcessingExceptionHandler(...)`. Then get the instance from the 
supplier during node initialization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to