mjsax commented on code in PR #17710:
URL: https://github.com/apache/kafka/pull/17710#discussion_r1831960504


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -248,18 +251,54 @@ private static StreamsConfig createConfig(final String 
enforcedProcessingValue)
     }
 
     private static StreamsConfig createConfig(final String eosConfig, final 
String enforcedProcessingValue) {
-        return createConfig(eosConfig, enforcedProcessingValue, 
LogAndFailExceptionHandler.class.getName(), 
LogAndFailProcessingExceptionHandler.class.getName());
+        return createConfig(
+            eosConfig,
+            enforcedProcessingValue,
+            LogAndFailExceptionHandler.class,
+            LogAndFailProcessingExceptionHandler.class,
+            FailOnInvalidTimestamp.class
+        );
+    }
+
+    private static StreamsConfig createConfig(final Class<? extends 
DeserializationExceptionHandler> deserializationExceptionHandler) {
+        return createConfig(
+            AT_LEAST_ONCE,
+            "0",
+            deserializationExceptionHandler,
+            LogAndFailProcessingExceptionHandler.class,
+            FailOnInvalidTimestamp.class
+        );
+    }
+
+    private static StreamsConfig createConfigWithTsExtractor(final Class<? 
extends TimestampExtractor> timestampExtractor) {
+        return createConfig(
+            AT_LEAST_ONCE,
+            "0",
+            LogAndFailExceptionHandler.class,
+            LogAndFailProcessingExceptionHandler.class,
+            timestampExtractor
+        );
     }
 
-    private static StreamsConfig createConfig(final String eosConfig, final 
String enforcedProcessingValue, final String deserializationExceptionHandler) {
-        return createConfig(eosConfig, enforcedProcessingValue, 
deserializationExceptionHandler, 
LogAndFailProcessingExceptionHandler.class.getName());
+    private static StreamsConfig createConfig(
+        final String enforcedProcessingValue,
+        final Class<? extends ProcessingExceptionHandler> 
processingExceptionHandler
+    ) {
+        return createConfig(
+            AT_LEAST_ONCE,
+            enforcedProcessingValue,
+            LogAndFailExceptionHandler.class,
+            processingExceptionHandler,
+            FailOnInvalidTimestamp.class
+        );
     }
 
     private static StreamsConfig createConfig(
         final String eosConfig,
         final String enforcedProcessingValue,
-        final String deserializationExceptionHandler,
-        final String processingExceptionHandler) {
+        final Class<? extends DeserializationExceptionHandler> 
deserializationExceptionHandler,
+        final Class<? extends ProcessingExceptionHandler> 
processingExceptionHandler,
+        final Class<? extends TimestampExtractor> timestampExtractor) {

Review Comment:
   Adding new `TimestampExtractor` parameter here, and cleanup the overload of  
`createConfig` a little bit, to avoid passing in unnecessary parameters (create 
a little noise on the PR below, but code is much cleaner now)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to