mjsax commented on code in PR #17710: URL: https://github.com/apache/kafka/pull/17710#discussion_r1831960504
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -248,18 +251,54 @@ private static StreamsConfig createConfig(final String enforcedProcessingValue) } private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue) { - return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName(), LogAndFailProcessingExceptionHandler.class.getName()); + return createConfig( + eosConfig, + enforcedProcessingValue, + LogAndFailExceptionHandler.class, + LogAndFailProcessingExceptionHandler.class, + FailOnInvalidTimestamp.class + ); + } + + private static StreamsConfig createConfig(final Class<? extends DeserializationExceptionHandler> deserializationExceptionHandler) { + return createConfig( + AT_LEAST_ONCE, + "0", + deserializationExceptionHandler, + LogAndFailProcessingExceptionHandler.class, + FailOnInvalidTimestamp.class + ); + } + + private static StreamsConfig createConfigWithTsExtractor(final Class<? extends TimestampExtractor> timestampExtractor) { + return createConfig( + AT_LEAST_ONCE, + "0", + LogAndFailExceptionHandler.class, + LogAndFailProcessingExceptionHandler.class, + timestampExtractor + ); } - private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue, final String deserializationExceptionHandler) { - return createConfig(eosConfig, enforcedProcessingValue, deserializationExceptionHandler, LogAndFailProcessingExceptionHandler.class.getName()); + private static StreamsConfig createConfig( + final String enforcedProcessingValue, + final Class<? extends ProcessingExceptionHandler> processingExceptionHandler + ) { + return createConfig( + AT_LEAST_ONCE, + enforcedProcessingValue, + LogAndFailExceptionHandler.class, + processingExceptionHandler, + FailOnInvalidTimestamp.class + ); } private static StreamsConfig createConfig( final String eosConfig, final String enforcedProcessingValue, - final String deserializationExceptionHandler, - final String processingExceptionHandler) { + final Class<? extends DeserializationExceptionHandler> deserializationExceptionHandler, + final Class<? extends ProcessingExceptionHandler> processingExceptionHandler, + final Class<? extends TimestampExtractor> timestampExtractor) { Review Comment: Adding new `TimestampExtractor` parameter here, and cleanup the overload of `createConfig` a little bit, to avoid passing in unnecessary parameters (create a little noise on the PR below, but code is much cleaner now) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org