[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16450337#comment-16450337 ] ASF GitHub Bot commented on KAFKA-6677: --- mjsax closed pull request #4868: KAFKA-6677: Fixed streamconfig producer's maxinflight allowed when EOS Enabled. URL: https://github.com/apache/kafka/pull/4868 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 65b1da6dede..e46d6d086ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; @@ -119,7 +120,7 @@ * * {@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"} (read_committed) - Consumers will always read committed data only * {@link ProducerConfig#ENABLE_IDEMPOTENCE_CONFIG "enable.idempotence"} (true) - Producer will always have idempotency enabled - * {@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION "max.in.flight.requests.per.connection"} (1) - Producer will always have one in-flight request per connection + * {@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION "max.in.flight.requests.per.connection"} (5) - Producer will always have one in-flight request per connection * * * @@ -650,7 +651,6 @@ final Map tempProducerDefaultOverrides = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES); tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); } @@ -785,6 +785,10 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); -assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), equalTo(1)); -} - -@Test -public void shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled() { -props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 2); -final StreamsConfig streamsConfig = new StreamsConfig(props); -final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); -assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), equalTo(2)); -} - @Test public void shouldSetDifferentDefaultsIfEosEnabled() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); @@ -446,7 +429,6 @@ public void shouldSetDifferentDefaultsIfEosEnabled() { assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))); assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)); assertThat((Integer) producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(Integer.MAX_VALUE)); -assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), equalTo(1)); assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(100L)); } @@ -563,6 +545,18 @@ public void shouldSpecifyCorrectValueSerdeClassOnError() { } } +@Test +public void shouldThrowExceptionIfMaxInflightRequestsGreatherThanFiveIfEosEnabled() { +props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7); +props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); +final StreamsConfig streamsConfig = new StreamsConfig(props); +try { +streamsConfig.getProducerConfigs("clientId"); +fail("Should throw ConfigException when Eos is enabled and maxInFlight requests exceeds 5"); +} catch (final ConfigException e) { +assertEquals("max.in.flight.requests.per.connection can't exceed 5 when using the idempotent producer", e.getMessage()); +} +} static class MisconfiguredSerde imp
[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16437360#comment-16437360 ] Jagadesh Adireddi commented on KAFKA-6677: -- Hi @mjsax , Fixed `StreamsConfig` to set the default to 5, and allow users to configure a smaller value if they wish, and throw an exception if they configure a larger value. Can you please review and let me know if any changes needed. > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16437274#comment-16437274 ] ASF GitHub Bot commented on KAFKA-6677: --- jadireddi opened a new pull request #4868: KAFKA-6677: Fixed streamconfig producer's maxinflight allowed when EOS Enabled. URL: https://github.com/apache/kafka/pull/4868 https://issues.apache.org/jira/browse/KAFKA-6677 Modified `StreamsConfig` Producer's default MaxInFlight Request allowed per connection. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434363#comment-16434363 ] Matthias J. Sax commented on KAFKA-6677: Thanks for pointing out! So we might actually set the default to 5, but allow users to configure a smaller value if they wish, but throw an exception if they configure a larger value. > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434338#comment-16434338 ] Ismael Juma commented on KAFKA-6677: There is a limit of 5 now btw. > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434331#comment-16434331 ] Matthias J. Sax commented on KAFKA-6677: Thanks for picking this up. I think what you suggest should be sufficient. > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433723#comment-16433723 ] Jagadesh Adireddi commented on KAFKA-6677: -- Hi [~mjsax], I am thinking to contribute to this issue. As per ticket description, by removing ` tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);` . from Class: StreamsConfig will resolve the issue, or i need to dig more for fixing this. Kindly provide your inputs. > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)