[ https://issues.apache.org/jira/browse/KAFKA-7379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622564#comment-16622564 ]
ASF GitHub Bot commented on KAFKA-7379: --------------------------------------- guozhangwang closed pull request #5643: [KAFKA-7379] [streams] send.buffer.bytes should be allowed to set -1 in KafkaStreams URL: https://github.com/apache/kafka/pull/5643 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 7da7a60b4cb..a08e5b10b3b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -46,9 +46,11 @@ public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used."; + public static final int SEND_BUFFER_LOWER_BOUND = -1; public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes"; public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used."; + public static final int RECEIVE_BUFFER_LOWER_BOUND = -1; public static final String CLIENT_ID_CONFIG = "client.id"; public static final String CLIENT_ID_DOC = "An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging."; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 1fcabca134c..ddd6e06c713 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -313,13 +313,13 @@ .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, - atLeast(-1), + atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) .define(RECEIVE_BUFFER_CONFIG, Type.INT, 64 * 1024, - atLeast(-1), + atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC) .define(FETCH_MIN_BYTES_CONFIG, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 0a55303c2d7..1a1bab5127b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -251,8 +251,8 @@ .define(LINGER_MS_CONFIG, Type.INT, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) - .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) - .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC) + .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) + .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC) .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 736e9cbd34f..20f2f6923ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -630,7 +630,7 @@ .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, - atLeast(0), + atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND), Importance.LOW, CommonClientConfigs.RECEIVE_BUFFER_DOC) .define(RECONNECT_BACKOFF_MS_CONFIG, @@ -671,7 +671,7 @@ .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, - atLeast(0), + atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.LOW, CommonClientConfigs.SEND_BUFFER_DOC) .define(STATE_CLEANUP_DELAY_MS_CONFIG, diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index dd5cff7da17..5e07703ddd5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -16,12 +16,15 @@ */ package org.apache.kafka.streams; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; @@ -95,6 +98,28 @@ public void cleanup() { } } + @Test + public void testOsDefaultSocketBufferSizes() { + props.put(CommonClientConfigs.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); + props.put(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + streams.close(); + } + + @Test(expected = KafkaException.class) + public void testInvalidSocketSendBufferSize() { + props.put(CommonClientConfigs.SEND_BUFFER_CONFIG, -2); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + streams.close(); + } + + @Test(expected = KafkaException.class) + public void testInvalidSocketReceiveBufferSize() { + props.put(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, -2); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + streams.close(); + } + @Test public void testStateChanges() throws InterruptedException { final StateListenerStub stateListener = new StateListenerStub(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > send.buffer.bytes should be allowed to set -1 in KafkaStreams > ------------------------------------------------------------- > > Key: KAFKA-7379 > URL: https://issues.apache.org/jira/browse/KAFKA-7379 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1, 2.0.0 > Reporter: Badai Aqrandista > Assignee: Aleksei Izmalkin > Priority: Minor > Labels: easyfix, newbie > > send.buffer.bytes and receive.buffer.bytes are declared with atLeast(0) > constraint in StreamsConfig, whereas -1 should be also allowed to set. This > is like KAFKA-6891. -- This message was sent by Atlassian JIRA (v7.6.3#76005)