[ https://issues.apache.org/jira/browse/KAFKA-7584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688677#comment-16688677 ]
ASF GitHub Bot commented on KAFKA-7584: --------------------------------------- guozhangwang closed pull request #5874: KAFKA-7584: StreamsConfig throws ClassCastException if max.in.flight.request.per.connect is specified as String URL: https://github.com/apache/kafka/pull/5874 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index b25894c2e04..90523d22bce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -887,10 +887,30 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje // consumer/producer configurations, log a warning and remove the user defined value from the Map. // Thus the default values for these consumer/producer configurations that are suitable for // Streams will be used instead. - final Object maxInFlightRequests = clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); - if (eosEnabled && maxInFlightRequests != null && 5 < (int) maxInFlightRequests) { - throw new ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " can't exceed 5 when using the idempotent producer"); + + if (eosEnabled) { + final Object maxInFlightRequests = clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); + + if (maxInFlightRequests != null) { + final int maxInFlightRequestsAsInteger; + if (maxInFlightRequests instanceof Integer) { + maxInFlightRequestsAsInteger = (Integer) maxInFlightRequests; + } else if (maxInFlightRequests instanceof String) { + try { + maxInFlightRequestsAsInteger = Integer.parseInt(((String) maxInFlightRequests).trim()); + } catch (final NumberFormatException e) { + throw new ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequests, "String value could not be parsed as 32-bit integer"); + } + } else { + throw new ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequests, "Expected value to be a 32-bit integer, but it was a " + maxInFlightRequests.getClass().getName()); + } + + if (maxInFlightRequestsAsInteger > 5) { + throw new ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsAsInteger, "Can't exceed 5 when exactly-once processing is enabled"); + } + } } + for (final String config: nonConfigurableConfigs) { if (clientProvidedProps.containsKey(config)) { final String eosMessage = PROCESSING_GUARANTEE_CONFIG + " is set to " + EXACTLY_ONCE + ". Hence, "; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index a86c38946df..7b34615cb39 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -123,7 +123,7 @@ public void consumerConfigMustContainStreamPartitionAssignorConfig() { assertEquals(StreamsPartitionAssignor.class.getName(), returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)); assertEquals(7L, returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG)); - assertEquals(null, returnedProps.get(StreamsConfig.RETRIES_CONFIG)); + assertNull(returnedProps.get(StreamsConfig.RETRIES_CONFIG)); assertEquals(5, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG))); assertEquals(100, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG))); } @@ -233,7 +233,6 @@ public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() { assertEquals("host", producerConfigs.get("interceptor.statsd.host")); } - @Test public void shouldSupportPrefixedProducerConfigs() { props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10); @@ -427,7 +426,7 @@ public void testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() { public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() { final StreamsConfig streamsConfig = new StreamsConfig(props); final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId"); - assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false)); + assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.equalTo(false)); } @Test @@ -582,15 +581,36 @@ public void shouldSpecifyCorrectValueSerdeClassOnError() { } @Test - public void shouldThrowExceptionIfMaxInflightRequestsGreatherThanFiveIfEosEnabled() { - props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7); + public void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7); final StreamsConfig streamsConfig = new StreamsConfig(props); try { streamsConfig.getProducerConfigs("clientId"); - fail("Should throw ConfigException when Eos is enabled and maxInFlight requests exceeds 5"); + fail("Should throw ConfigException when ESO is enabled and maxInFlight requests exceeds 5"); + } catch (final ConfigException e) { + assertEquals("Invalid value 7 for configuration max.in.flight.requests.per.connection: Can't exceed 5 when exactly-once processing is enabled", e.getMessage()); + } + } + + @Test + public void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled() { + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3"); + + new StreamsConfig(props).getProducerConfigs("clientId"); + } + + @Test + public void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled() { + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "not-a-number"); + + try { + new StreamsConfig(props).getProducerConfigs("clientId"); + fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be paresed into an integer"); } catch (final ConfigException e) { - assertEquals("max.in.flight.requests.per.connection can't exceed 5 when using the idempotent producer", e.getMessage()); + assertEquals("Invalid value not-a-number for configuration max.in.flight.requests.per.connection: String value could not be parsed as 32-bit integer", e.getMessage()); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StreamsConfig throws ClassCastException if max.in.flight.request.per.connect > is specified as String > --------------------------------------------------------------------------------------------------- > > Key: KAFKA-7584 > URL: https://issues.apache.org/jira/browse/KAFKA-7584 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 2.0.0 > Reporter: Matthias J. Sax > Assignee: Matthias J. Sax > Priority: Minor > > Setting > {quote}{{props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);}} > {{props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");}} > {quote} > results in > {quote}{{java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Integer}}{{at > org.apache.kafka.streams.StreamsConfig.checkIfUnexpectedUserSpecifiedConsumerConfig(StreamsConfig.java:875)}} > {{ at > org.apache.kafka.streams.StreamsConfig.getProducerConfigs(StreamsConfig.java:1071)}} > {quote} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)