[ 
https://issues.apache.org/jira/browse/KAFKA-7379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16622564#comment-16622564
 ] 

ASF GitHub Bot commented on KAFKA-7379:
---------------------------------------

guozhangwang closed pull request #5643: [KAFKA-7379] [streams] 
send.buffer.bytes should be allowed to set -1 in KafkaStreams
URL: https://github.com/apache/kafka/pull/5643
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 7da7a60b4cb..a08e5b10b3b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -46,9 +46,11 @@
 
     public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
     public static final String SEND_BUFFER_DOC = "The size of the TCP send 
buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default 
will be used.";
+    public static final int SEND_BUFFER_LOWER_BOUND = -1;
 
     public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
     public static final String RECEIVE_BUFFER_DOC = "The size of the TCP 
receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS 
default will be used.";
+    public static final int RECEIVE_BUFFER_LOWER_BOUND = -1;
 
     public static final String CLIENT_ID_CONFIG = "client.id";
     public static final String CLIENT_ID_DOC = "An id string to pass to the 
server when making requests. The purpose of this is to be able to track the 
source of requests beyond just ip/port by allowing a logical application name 
to be included in server-side request logging.";
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 1fcabca134c..ddd6e06c713 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -313,13 +313,13 @@
                                 .define(SEND_BUFFER_CONFIG,
                                         Type.INT,
                                         128 * 1024,
-                                        atLeast(-1),
+                                        
atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND),
                                         Importance.MEDIUM,
                                         CommonClientConfigs.SEND_BUFFER_DOC)
                                 .define(RECEIVE_BUFFER_CONFIG,
                                         Type.INT,
                                         64 * 1024,
-                                        atLeast(-1),
+                                        
atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND),
                                         Importance.MEDIUM,
                                         CommonClientConfigs.RECEIVE_BUFFER_DOC)
                                 .define(FETCH_MIN_BYTES_CONFIG,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 0a55303c2d7..1a1bab5127b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -251,8 +251,8 @@
                                 .define(LINGER_MS_CONFIG, Type.INT, 0, 
atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
                                 .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 
120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", 
Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
-                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 
1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
-                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 
1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
+                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 
1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, 
CommonClientConfigs.SEND_BUFFER_DOC)
+                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 
1024, atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND), 
Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
                                 .define(MAX_REQUEST_SIZE_CONFIG,
                                         Type.INT,
                                         1 * 1024 * 1024,
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 736e9cbd34f..20f2f6923ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -630,7 +630,7 @@
             .define(RECEIVE_BUFFER_CONFIG,
                     Type.INT,
                     32 * 1024,
-                    atLeast(0),
+                    atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND),
                     Importance.LOW,
                     CommonClientConfigs.RECEIVE_BUFFER_DOC)
             .define(RECONNECT_BACKOFF_MS_CONFIG,
@@ -671,7 +671,7 @@
             .define(SEND_BUFFER_CONFIG,
                     Type.INT,
                     128 * 1024,
-                    atLeast(0),
+                    atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND),
                     Importance.LOW,
                     CommonClientConfigs.SEND_BUFFER_DOC)
             .define(STATE_CLEANUP_DELAY_MS_CONFIG,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index dd5cff7da17..5e07703ddd5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,12 +16,15 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
@@ -95,6 +98,28 @@ public void cleanup() {
         }
     }
 
+    @Test
+    public void testOsDefaultSocketBufferSizes() {
+        props.put(CommonClientConfigs.SEND_BUFFER_CONFIG, 
Selectable.USE_DEFAULT_BUFFER_SIZE);
+        props.put(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, 
Selectable.USE_DEFAULT_BUFFER_SIZE);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        streams.close();
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testInvalidSocketSendBufferSize() {
+        props.put(CommonClientConfigs.SEND_BUFFER_CONFIG, -2);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        streams.close();
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testInvalidSocketReceiveBufferSize() {
+        props.put(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, -2);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        streams.close();
+    }
+
     @Test
     public void testStateChanges() throws InterruptedException {
         final StateListenerStub stateListener = new StateListenerStub();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> send.buffer.bytes should be allowed to set -1 in KafkaStreams
> -------------------------------------------------------------
>
>                 Key: KAFKA-7379
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7379
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>            Reporter: Badai Aqrandista
>            Assignee: Aleksei Izmalkin
>            Priority: Minor
>              Labels: easyfix, newbie
>
> send.buffer.bytes and receive.buffer.bytes are declared with atLeast(0) 
> constraint in StreamsConfig, whereas -1 should be also allowed to set. This 
> is like KAFKA-6891.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to