mjsax commented on code in PR #17005:
URL: https://github.com/apache/kafka/pull/17005#discussion_r1731648217


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code default.deserialization.exception.handler} */

Review Comment:
   We should update the JavaDocs, too. Using `@deprecated` annotation and 
pointing to the new config to be used.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() {
         return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
TimestampExtractor.class);
     }
 
+    public DeserializationExceptionHandler 
getDeserializationExceptionHandler() {
+        if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
+            return deserializationExceptionHandler();
+        } else {
+            return defaultDeserializationExceptionHandler();
+        }
+    }
+
     @SuppressWarnings("WeakerAccess")
-    public DeserializationExceptionHandler 
defaultDeserializationExceptionHandler() {
+    private DeserializationExceptionHandler 
defaultDeserializationExceptionHandler() {
         return 
getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
DeserializationExceptionHandler.class);
     }
 
     @SuppressWarnings("WeakerAccess")
-    public ProductionExceptionHandler defaultProductionExceptionHandler() {
+    public DeserializationExceptionHandler deserializationExceptionHandler() {
+        return 
getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
DeserializationExceptionHandler.class);
+    }
+
+    public ProductionExceptionHandler getProductionExceptionHandler() {

Review Comment:
   Some comments as above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2552,7 +2552,8 @@ public Set<TopicPartition> partitions() {
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
+    @MethodSource("data")
+    @SuppressWarnings("deprecation")

Review Comment:
   This test verifies that the handler is doing the right thing. Thus, we 
should update the test to use the new config and not add this annotation



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code default.deserialization.exception.handler} */
     @SuppressWarnings("WeakerAccess")
+    @Deprecated
     public static final String 
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = 
"default.deserialization.exception.handler";
+    @Deprecated
     public static final String 
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class 
that implements the 
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> 
interface.";
 
+    /** {@code deserialization.exception.handler} */
+    @SuppressWarnings("WeakerAccess")
+    public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG 
= "deserialization.exception.handler";
+

Review Comment:
   nit: no empty line



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -889,6 +901,11 @@ public class StreamsConfig extends AbstractConfig {
                     LogAndFailExceptionHandler.class.getName(),
                     Importance.MEDIUM,
                     DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+            .define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,

Review Comment:
   Please insert at the right place (we keep it ordered alphabetically)



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code default.deserialization.exception.handler} */
     @SuppressWarnings("WeakerAccess")
+    @Deprecated
     public static final String 
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = 
"default.deserialization.exception.handler";
+    @Deprecated
     public static final String 
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class 
that implements the 
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> 
interface.";
 
+    /** {@code deserialization.exception.handler} */
+    @SuppressWarnings("WeakerAccess")
+    public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG 
= "deserialization.exception.handler";
+
+    protected static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC 
= "Exception handling class that implements the 
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> 
interface.";
     /** {@code default.production.exception.handler} */
     @SuppressWarnings("WeakerAccess")
+    @Deprecated
     public static final String 
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = 
"default.production.exception.handler";
+
+    /** {@code production.exception.handler} */
+    @SuppressWarnings("WeakerAccess")
+    public static final String PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = 
"production.exception.handler";
     private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC 
= "Exception handling class that implements the 
<code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> 
interface.";

Review Comment:
   nit: rename variable, dropping `DEFAULT_` to align to config name.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() {
         return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
TimestampExtractor.class);
     }
 
+    public DeserializationExceptionHandler 
getDeserializationExceptionHandler() {
+        if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
+            return deserializationExceptionHandler();
+        } else {
+            return defaultDeserializationExceptionHandler();
+        }
+    }
+
     @SuppressWarnings("WeakerAccess")
-    public DeserializationExceptionHandler 
defaultDeserializationExceptionHandler() {
+    private DeserializationExceptionHandler 
defaultDeserializationExceptionHandler() {

Review Comment:
   This is public API, and we cannot just make it private -- we can only 
deprecate it -- as above, must be covered by the KIP



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code default.deserialization.exception.handler} */
     @SuppressWarnings("WeakerAccess")
+    @Deprecated
     public static final String 
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = 
"default.deserialization.exception.handler";
+    @Deprecated
     public static final String 
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class 
that implements the 
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> 
interface.";
 
+    /** {@code deserialization.exception.handler} */
+    @SuppressWarnings("WeakerAccess")
+    public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG 
= "deserialization.exception.handler";
+
+    protected static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC 
= "Exception handling class that implements the 
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> 
interface.";

Review Comment:
   Why `protected` -- should be `private`?
   
   nit: missing empty line below.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code default.deserialization.exception.handler} */
     @SuppressWarnings("WeakerAccess")
+    @Deprecated
     public static final String 
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = 
"default.deserialization.exception.handler";
+    @Deprecated
     public static final String 
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class 
that implements the 
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> 
interface.";
 
+    /** {@code deserialization.exception.handler} */
+    @SuppressWarnings("WeakerAccess")
+    public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG 
= "deserialization.exception.handler";
+
+    protected static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC 
= "Exception handling class that implements the 
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> 
interface.";
     /** {@code default.production.exception.handler} */

Review Comment:
   Update JavaDocs?



##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -93,6 +95,11 @@ public class TopologyConfig extends AbstractConfig {
                 null,
                 Importance.MEDIUM,
                 DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+            .define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,

Review Comment:
   please insert at right place (alphabetically)



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -919,6 +936,11 @@ public class StreamsConfig extends AbstractConfig {
                     DefaultProductionExceptionHandler.class.getName(),
                     Importance.MEDIUM,
                     DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
+            .define(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,

Review Comment:
   Please insert at the right place (we keep it ordered alphabetically)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java:
##########
@@ -113,6 +115,7 @@ public static void handleDeserializationFailure(final 
DeserializationExceptionHa
             throw new StreamsException("Deserialization exception handler is 
set to fail upon" +
                 " a deserialization error. If you would rather have the 
streaming pipeline" +
                 " continue after a deserialization error, please set the " +
+                DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " or the 
deprecated exception handler " +

Review Comment:
   We should just point to the new one and not refer to the deprecated one at 
all. (For this case, we also don't need the `@SuppressWarnings` annotation



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() {
         return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
TimestampExtractor.class);
     }
 
+    public DeserializationExceptionHandler 
getDeserializationExceptionHandler() {

Review Comment:
   ```suggestion
       public DeserializationExceptionHandler deserializationExceptionHandler() 
{
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() {
         return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
TimestampExtractor.class);
     }
 
+    public DeserializationExceptionHandler 
getDeserializationExceptionHandler() {
+        if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
+            return deserializationExceptionHandler();
+        } else {
+            return defaultDeserializationExceptionHandler();
+        }
+    }
+
     @SuppressWarnings("WeakerAccess")
-    public DeserializationExceptionHandler 
defaultDeserializationExceptionHandler() {
+    private DeserializationExceptionHandler 
defaultDeserializationExceptionHandler() {
         return 
getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
DeserializationExceptionHandler.class);
     }
 
     @SuppressWarnings("WeakerAccess")
-    public ProductionExceptionHandler defaultProductionExceptionHandler() {
+    public DeserializationExceptionHandler deserializationExceptionHandler() {

Review Comment:
   Seems to be redundant to `[get]DeserializationExceptionHandler` above?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java:
##########
@@ -1158,6 +1159,75 @@ public void 
shouldSkipOnDeserializationErrorsWhenReprocessing() {
         assertEquals(0, stateRestoreCallback.restored.size());
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void verifyExceptionHandlerAcceptNewConfigWhenBothArePresent() {

Review Comment:
   This test raised the question if we should log a WARN if both are set?
   
   But why do we test this on `GlobalStateManagerImplTest`? Seems this should 
go into `StreamsConfigTest`?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() {
         return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
TimestampExtractor.class);
     }
 
+    public DeserializationExceptionHandler 
getDeserializationExceptionHandler() {

Review Comment:
   We should not use `get` prefix.
   
   Also, this is public API and the KIP must cover it -- can you update the KIP 
accordingly, and send an update to the VOTE thread?
   
   Or: we don't add it as `public` -- I personally always found that having 
these method being `public` is a leaky abstraction.... But there was some 
disagreement about it: https://github.com/apache/kafka/pull/14548



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java:
##########
@@ -1066,6 +1067,41 @@ public void 
shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() {
         assertThat(topologyBuilder.topologyConfigs().parseStoreType(), 
equalTo(Materialized.StoreType.IN_MEMORY));
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void exceptionHandlerShouldAcceptNewConfig() {
+        final Properties topologyOverrides = new Properties();
+        
topologyOverrides.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 LogAndFailExceptionHandler.class);
+        
topologyOverrides.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 LogAndContinueExceptionHandler.class);
+
+        final StreamsConfig config = new 
StreamsConfig(StreamsTestUtils.getStreamsConfig());
+        final InternalTopologyBuilder topologyBuilder = new 
InternalTopologyBuilder(
+            new TopologyConfig(
+                "my-topology",
+                config,
+                topologyOverrides)
+        );
+
+        
assertThat(topologyBuilder.topologyConfigs().getTaskConfig().deserializationExceptionHandler.getClass(),
 equalTo(LogAndContinueExceptionHandler.class));
+    }
+
+    @Test
+    public void 
exceptionHandlerShouldAcceptNewConfigNoOtherDeprecatedConfigPresent() {

Review Comment:
   Instead of adding a new test, should we just update an existing test which 
uses the old config now, and let is use the new config instead (maybe 
`shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps`) ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -259,6 +259,11 @@ public void setDeserializationExceptionHandler(final 
DeserializationExceptionHan
         this.deserializationExceptionHandler = deserializationExceptionHandler;
     }
 
+    //Visible for testing
+    public DeserializationExceptionHandler 
getDeserializationExceptionHandler() {

Review Comment:
   ```suggestion
       public DeserializationExceptionHandler deserializationExceptionHandler() 
{
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java:
##########
@@ -1066,6 +1067,41 @@ public void 
shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() {
         assertThat(topologyBuilder.topologyConfigs().parseStoreType(), 
equalTo(Materialized.StoreType.IN_MEMORY));
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void exceptionHandlerShouldAcceptNewConfig() {

Review Comment:
   ```suggestion
       public void 
newDeserializationExceptionHandlerConfigShouldOverwriteOldOne() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to