This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new 082e057 KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547) 082e057 is described below commit 082e057e1073c4471c92d4b341f094089598145f Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Sat Apr 6 21:08:41 2019 -0700 KAFKA-8157: fix the incorrect usage of segment.index.bytes (2.2) (#6547) Should be cherry-picked to older branches as well. Reviewers: Bill Bejeck <bbej...@gmail.com> --- .../java/org/apache/kafka/streams/StreamsConfig.java | 19 +++++++------------ .../org/apache/kafka/streams/StreamsConfigTest.java | 12 ++++++++++-- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index b756ba2..d5f30ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1003,22 +1003,17 @@ public class StreamsConfig extends AbstractConfig { // verify that producer batch config is no larger than segment size, then add topic configs required for creating topics final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false); + final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()); - if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG))) { - final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString()); - final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()); - final int batchSize; - if (producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) { - batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString()); - } else { - final ProducerConfig producerDefaultConfig = new ProducerConfig(new Properties()); - batchSize = producerDefaultConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG); - } + if (topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)) && + producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) { + final int segmentSize = Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)).toString()); + final int batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString()); if (segmentSize < batchSize) { throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic", - segmentSize, - batchSize)); + segmentSize, + batchSize)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 19d5ae0..724cbb5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -49,6 +49,7 @@ import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION; import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; +import static org.apache.kafka.streams.StreamsConfig.topicPrefix; import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; @@ -111,7 +112,7 @@ public class StreamsConfigTest { props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host"); props.put(StreamsConfig.RETRIES_CONFIG, 10); props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 5); - props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100); + props.put(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100); final StreamsConfig streamsConfig = new StreamsConfig(props); final String groupId = "example-application"; @@ -125,7 +126,7 @@ public class StreamsConfigTest { assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG)); assertNull(returnedProps.get(StreamsConfig.RETRIES_CONFIG)); assertEquals(5, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG))); - assertEquals(100, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG))); + assertEquals(100, returnedProps.get(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG))); } @Test @@ -639,6 +640,13 @@ public class StreamsConfigTest { new StreamsConfig(props); } + @Test(expected = IllegalArgumentException.class) + public void testThrowIllegalArgumentExceptionWhenTopicSegmentSizeSmallerThanProducerBatchSize() { + props.put(topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100); + props.put(producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), 101); + new StreamsConfig(props).getMainConsumerConfigs("groupId", "clientId"); + } + static class MisconfiguredSerde implements Serde { @Override public void configure(final Map configs, final boolean isKey) {