[ 
https://issues.apache.org/jira/browse/KAFKA-7584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688677#comment-16688677
 ] 

ASF GitHub Bot commented on KAFKA-7584:
---------------------------------------

guozhangwang closed pull request #5874: KAFKA-7584: StreamsConfig throws 
ClassCastException if max.in.flight.request.per.connect is specified as String
URL: https://github.com/apache/kafka/pull/5874
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b25894c2e04..90523d22bce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -887,10 +887,30 @@ private void 
checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
         // consumer/producer configurations, log a warning and remove the user 
defined value from the Map.
         // Thus the default values for these consumer/producer configurations 
that are suitable for
         // Streams will be used instead.
-        final Object maxInFlightRequests = 
clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
-        if (eosEnabled && maxInFlightRequests != null && 5 < (int) 
maxInFlightRequests) {
-            throw new 
ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " can't 
exceed 5 when using the idempotent producer");
+
+        if (eosEnabled) {
+            final Object maxInFlightRequests = 
clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+
+            if (maxInFlightRequests != null) {
+                final int maxInFlightRequestsAsInteger;
+                if (maxInFlightRequests instanceof Integer) {
+                    maxInFlightRequestsAsInteger = (Integer) 
maxInFlightRequests;
+                } else if (maxInFlightRequests instanceof String) {
+                    try {
+                        maxInFlightRequestsAsInteger = 
Integer.parseInt(((String) maxInFlightRequests).trim());
+                    } catch (final NumberFormatException e) {
+                        throw new 
ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
maxInFlightRequests, "String value could not be parsed as 32-bit integer");
+                    }
+                } else {
+                    throw new 
ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
maxInFlightRequests, "Expected value to be a 32-bit integer, but it was a " + 
maxInFlightRequests.getClass().getName());
+                }
+
+                if (maxInFlightRequestsAsInteger > 5) {
+                    throw new 
ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
maxInFlightRequestsAsInteger, "Can't exceed 5 when exactly-once processing is 
enabled");
+                }
+            }
         }
+
         for (final String config: nonConfigurableConfigs) {
             if (clientProvidedProps.containsKey(config)) {
                 final String eosMessage =  PROCESSING_GUARANTEE_CONFIG + " is 
set to " + EXACTLY_ONCE + ". Hence, ";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index a86c38946df..7b34615cb39 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -123,7 +123,7 @@ public void 
consumerConfigMustContainStreamPartitionAssignorConfig() {
         assertEquals(StreamsPartitionAssignor.class.getName(), 
returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
         assertEquals(7L, 
returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
         assertEquals("dummy:host", 
returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
-        assertEquals(null, returnedProps.get(StreamsConfig.RETRIES_CONFIG));
+        assertNull(returnedProps.get(StreamsConfig.RETRIES_CONFIG));
         assertEquals(5, 
returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
         assertEquals(100, 
returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
     }
@@ -233,7 +233,6 @@ public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
         assertEquals("host", producerConfigs.get("interceptor.statsd.host"));
     }
 
-
     @Test
     public void shouldSupportPrefixedProducerConfigs() {
         props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
@@ -427,7 +426,7 @@ public void 
testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
     public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> consumerConfigs = 
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
-        assertThat(consumerConfigs.get("internal.leave.group.on.close"), 
CoreMatchers.<Object>equalTo(false));
+        assertThat(consumerConfigs.get("internal.leave.group.on.close"), 
CoreMatchers.equalTo(false));
     }
 
     @Test
@@ -582,15 +581,36 @@ public void shouldSpecifyCorrectValueSerdeClassOnError() {
     }
 
     @Test
-    public void 
shouldThrowExceptionIfMaxInflightRequestsGreatherThanFiveIfEosEnabled() {
-        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
+    public void 
shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled() {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         try {
             streamsConfig.getProducerConfigs("clientId");
-            fail("Should throw ConfigException when Eos is enabled and 
maxInFlight requests exceeds 5");
+            fail("Should throw ConfigException when ESO is enabled and 
maxInFlight requests exceeds 5");
+        } catch (final ConfigException e) {
+            assertEquals("Invalid value 7 for configuration 
max.in.flight.requests.per.connection: Can't exceed 5 when exactly-once 
processing is enabled", e.getMessage());
+        }
+    }
+
+    @Test
+    public void 
shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled() {
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
+
+        new StreamsConfig(props).getProducerConfigs("clientId");
+    }
+
+    @Test
+    public void 
shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled()
 {
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
"not-a-number");
+
+        try {
+            new StreamsConfig(props).getProducerConfigs("clientId");
+            fail("Should throw ConfigException when EOS is enabled and 
maxInFlight cannot be paresed into an integer");
         } catch (final ConfigException e) {
-            assertEquals("max.in.flight.requests.per.connection can't exceed 5 
when using the idempotent producer", e.getMessage());
+            assertEquals("Invalid value not-a-number for configuration 
max.in.flight.requests.per.connection: String value could not be parsed as 
32-bit integer", e.getMessage());
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamsConfig throws ClassCastException if max.in.flight.request.per.connect 
> is specified as String
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7584
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7584
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Matthias J. Sax
>            Priority: Minor
>
> Setting 
> {quote}{{props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);}}
> {{props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");}}
> {quote}
> results in
> {quote}{{java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Integer}}{{at 
> org.apache.kafka.streams.StreamsConfig.checkIfUnexpectedUserSpecifiedConsumerConfig(StreamsConfig.java:875)}}
> {{ at 
> org.apache.kafka.streams.StreamsConfig.getProducerConfigs(StreamsConfig.java:1071)}}
> {quote}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to