This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new 43e6504 KAFKA-8134: `linger.ms` must be a long 43e6504 is described below commit 43e650469412afd93faf18f176881a0b6f4d0716 Author: Dhruvil Shah <dhru...@confluent.io> AuthorDate: Mon Apr 29 08:59:18 2019 -0700 KAFKA-8134: `linger.ms` must be a long Reviewers: Ismael Juma <ism...@juma.me.uk>, Colin P. McCabe <cmcc...@apache.org> --- .../kafka/clients/producer/KafkaProducer.java | 13 ++++-- .../kafka/clients/producer/ProducerConfig.java | 2 +- .../producer/internals/RecordAccumulator.java | 8 ++-- .../producer/internals/RecordAccumulatorTest.java | 51 ++++++++++------------ .../clients/producer/internals/SenderTest.java | 8 ++-- .../producer/internals/TransactionManagerTest.java | 4 +- .../kafka/api/BaseProducerSendTest.scala | 13 +++--- .../kafka/api/PlaintextProducerSendTest.scala | 6 ++- .../scala/unit/kafka/server/FetchRequestTest.scala | 1 + 9 files changed, 57 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index bfd6bf3..8b99a9a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -396,7 +396,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.compressionType, - config.getInt(ProducerConfig.LINGER_MS_CONFIG), + lingerMs(config), retryBackoffMs, deliveryTimeoutMs, metrics, @@ -473,12 +473,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> { apiVersions); } + private static int lingerMs(ProducerConfig config) { + return (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE); + } + private static int configureDeliveryTimeout(ProducerConfig config, Logger log) { int deliveryTimeoutMs = config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG); - int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG); + int lingerMs = lingerMs(config); int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); + int lingerAndRequestTimeoutMs = (int) Math.min((long) lingerMs + requestTimeoutMs, Integer.MAX_VALUE); - if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerMs + requestTimeoutMs) { + if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerAndRequestTimeoutMs) { if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) { // throw an exception if the user explicitly set an inconsistent value throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG @@ -486,7 +491,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { + " + " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); } else { // override deliveryTimeoutMs default value to lingerMs + requestTimeoutMs for backward compatibility - deliveryTimeoutMs = lingerMs + requestTimeoutMs; + deliveryTimeoutMs = lingerAndRequestTimeoutMs; log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.", ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, ProducerConfig.LINGER_MS_CONFIG, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, deliveryTimeoutMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index c63477d..19f0ce9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -260,7 +260,7 @@ public class ProducerConfig extends AbstractConfig { ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) - .define(LINGER_MS_CONFIG, Type.INT, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) + .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 80a9d0c..e6b29f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -72,9 +72,9 @@ public final class RecordAccumulator { private final AtomicInteger appendsInProgress; private final int batchSize; private final CompressionType compression; - private final long lingerMs; + private final int lingerMs; private final long retryBackoffMs; - private final long deliveryTimeoutMs; + private final int deliveryTimeoutMs; private final BufferPool free; private final Time time; private final ApiVersions apiVersions; @@ -106,9 +106,9 @@ public final class RecordAccumulator { public RecordAccumulator(LogContext logContext, int batchSize, CompressionType compression, - long lingerMs, + int lingerMs, long retryBackoffMs, - long deliveryTimeoutMs, + int deliveryTimeoutMs, Metrics metrics, String metricGrpName, Time time, diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 13b0d1b..5061447 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -103,7 +103,7 @@ public class RecordAccumulatorTest { int batchSize = 1025; RecordAccumulator accum = createTestRecordAccumulator( - batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10L); + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10); int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { // append to the first batch @@ -152,7 +152,7 @@ public class RecordAccumulatorTest { int batchSize = 512; byte[] value = new byte[2 * batchSize]; RecordAccumulator accum = createTestRecordAccumulator( - batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0L); + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0); accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -191,7 +191,7 @@ public class RecordAccumulatorTest { new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2)))); RecordAccumulator accum = createTestRecordAccumulator( - batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0L); + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0); accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -213,7 +213,7 @@ public class RecordAccumulatorTest { @Test public void testLinger() throws Exception { - long lingerMs = 10L; + int lingerMs = 10; RecordAccumulator accum = createTestRecordAccumulator( 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, lingerMs); accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); @@ -234,7 +234,7 @@ public class RecordAccumulatorTest { @Test public void testPartialDrain() throws Exception { RecordAccumulator accum = createTestRecordAccumulator( - 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 10L); + 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 10); int appends = 1024 / msgSize + 1; List<TopicPartition> partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { @@ -254,7 +254,7 @@ public class RecordAccumulatorTest { final int msgs = 10000; final int numParts = 2; final RecordAccumulator accum = createTestRecordAccumulator( - 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0L); + 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { @@ -293,7 +293,7 @@ public class RecordAccumulatorTest { @Test public void testNextReadyCheckDelay() throws Exception { // Next check time will use lingerMs since this test won't trigger any retries/backoff - long lingerMs = 10L; + int lingerMs = 10; // test case assumes that the records do not fill the batch completely int batchSize = 1025; @@ -331,10 +331,9 @@ public class RecordAccumulatorTest { @Test public void testRetryBackoff() throws Exception { - long lingerMs = Integer.MAX_VALUE / 16; + int lingerMs = Integer.MAX_VALUE / 16; long retryBackoffMs = Integer.MAX_VALUE / 8; - int requestTimeoutMs = Integer.MAX_VALUE / 4; - long deliveryTimeoutMs = Integer.MAX_VALUE; + int deliveryTimeoutMs = Integer.MAX_VALUE; long totalSize = 10 * 1024; int batchSize = 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; String metricGrpName = "producer-metrics"; @@ -377,7 +376,7 @@ public class RecordAccumulatorTest { @Test public void testFlush() throws Exception { - long lingerMs = Integer.MAX_VALUE; + int lingerMs = Integer.MAX_VALUE; final RecordAccumulator accum = createTestRecordAccumulator( 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs); @@ -419,7 +418,7 @@ public class RecordAccumulatorTest { @Test public void testAwaitFlushComplete() throws Exception { RecordAccumulator accum = createTestRecordAccumulator( - 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE); + 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Integer.MAX_VALUE); accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); accum.beginFlush(); @@ -515,8 +514,8 @@ public class RecordAccumulatorTest { assertTrue(accum.hasIncomplete()); } - private void doExpireBatchSingle(long deliveryTimeoutMs) throws InterruptedException { - long lingerMs = 300L; + private void doExpireBatchSingle(int deliveryTimeoutMs) throws InterruptedException { + int lingerMs = 300; List<Boolean> muteStates = Arrays.asList(false, true); Set<Node> readyNodes = null; List<ProducerBatch> expiredBatches = new ArrayList<>(); @@ -554,20 +553,20 @@ public class RecordAccumulatorTest { @Test public void testExpiredBatchSingle() throws InterruptedException { - doExpireBatchSingle(3200L); + doExpireBatchSingle(3200); } @Test public void testExpiredBatchSingleMaxValue() throws InterruptedException { - doExpireBatchSingle(Long.MAX_VALUE); + doExpireBatchSingle(Integer.MAX_VALUE); } @Test public void testExpiredBatches() throws InterruptedException { long retryBackoffMs = 100L; - long lingerMs = 30L; + int lingerMs = 30; int requestTimeout = 60; - long deliveryTimeoutMs = 3200L; + int deliveryTimeoutMs = 3200; // test case assumes that the records do not fill the batch completely int batchSize = 1025; @@ -700,9 +699,8 @@ public class RecordAccumulatorTest { // Simulate talking to an older broker, ie. one which supports a lower magic. ApiVersions apiVersions = new ApiVersions(); int batchSize = 1025; - int requestTimeoutMs = 1600; - long deliveryTimeoutMs = 3200L; - long lingerMs = 10L; + int deliveryTimeoutMs = 3200; + int lingerMs = 10; long retryBackoffMs = 100L; long totalSize = 10 * batchSize; String metricGrpName = "producer-metrics"; @@ -777,7 +775,7 @@ public class RecordAccumulatorTest { // First set the compression ratio estimation to be good. CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f); - RecordAccumulator accum = createTestRecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0L); + RecordAccumulator accum = createTestRecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0); int numSplitBatches = prepareSplitBatches(accum, seed, 100, 20); assertTrue("There should be some split batches", numSplitBatches > 0); // Drain all the split batches. @@ -829,7 +827,7 @@ public class RecordAccumulatorTest { @Test public void testSoonToExpireBatchesArePickedUpForExpiry() throws InterruptedException { - long lingerMs = 500L; + int lingerMs = 500; int batchSize = 1025; RecordAccumulator accum = createTestRecordAccumulator( @@ -993,17 +991,16 @@ public class RecordAccumulatorTest { } - private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, long lingerMs) { - long deliveryTimeoutMs = 3200L; + private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, int lingerMs) { + int deliveryTimeoutMs = 3200; return createTestRecordAccumulator(deliveryTimeoutMs, batchSize, totalSize, type, lingerMs); } /** * Return a test RecordAccumulator instance */ - private RecordAccumulator createTestRecordAccumulator(long deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, long lingerMs) { + private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int lingerMs) { long retryBackoffMs = 100L; - int requestTimeoutMs = 1600; String metricGrpName = "producer-metrics"; return new RecordAccumulator( diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 16be8fc..1acdfed 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -1903,14 +1903,14 @@ public class SenderTest { TopicPartition tp) throws Exception { int maxRetries = 1; String topic = tp.topic(); - long deliveryTimeoutMs = 3000L; + int deliveryTimeoutMs = 3000; long totalSize = 1024 * 1024; String metricGrpName = "producer-metrics"; // Set a good compression ratio. CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f); try (Metrics m = new Metrics()) { accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.GZIP, - 0L, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager, + 0, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager, new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics")); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, @@ -2285,14 +2285,14 @@ public class SenderTest { } private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) { - long deliveryTimeoutMs = 1500L; + int deliveryTimeoutMs = 1500; long totalSize = 1024 * 1024; String metricGrpName = "producer-metrics"; MetricConfig metricConfig = new MetricConfig().tags(Collections.singletonMap("client-id", CLIENT_ID)); this.metrics = new Metrics(metricConfig, time); BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool; - this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L, + this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool); this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics); this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL, diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 8da76fb..92ae5f0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -116,7 +116,7 @@ public class TransactionManagerTest { Map<String, String> metricTags = new LinkedHashMap<>(); metricTags.put("client-id", CLIENT_ID); int batchSize = 16 * 1024; - long deliveryTimeoutMs = 3000L; + int deliveryTimeoutMs = 3000; long totalSize = 1024 * 1024; String metricGrpName = "producer-metrics"; MetricConfig metricConfig = new MetricConfig().tags(metricTags); @@ -126,7 +126,7 @@ public class TransactionManagerTest { Metrics metrics = new Metrics(metricConfig, time); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(metrics); - this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L, + this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 9d454e9..afa0074 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -71,6 +71,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { protected def createProducer(brokerList: String, lingerMs: Int = 0, + deliveryTimeoutMs: Int = 2 * 60 * 1000, batchSize: Int = 16384, compressionType: String = "none", maxBlockMs: Long = 60 * 1000L): KafkaProducer[Array[Byte],Array[Byte]] = { @@ -80,6 +81,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { trustStoreFile = trustStoreFile, saslProperties = clientSaslProperties, lingerMs = lingerMs, + deliveryTimeoutMs = deliveryTimeoutMs, maxBlockMs = maxBlockMs) registerProducer(producer) } @@ -171,13 +173,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { def testSendCompressedMessageWithCreateTime() { val producer = createProducer(brokerList = brokerList, compressionType = "gzip", - lingerMs = Int.MaxValue) + lingerMs = Int.MaxValue, + deliveryTimeoutMs = Int.MaxValue) sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME) } @Test def testSendNonCompressedMessageWithCreateTime() { - val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue) + val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue) sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME) } @@ -413,7 +416,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { */ @Test def testFlush() { - val producer = createProducer(brokerList, lingerMs = Int.MaxValue) + val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue) try { createTopic(topic, 2, 2) val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, @@ -442,7 +445,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { // Test closing from caller thread. for (_ <- 0 until 50) { - val producer = createProducer(brokerList, lingerMs = Int.MaxValue) + val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue) val responses = (0 until numRecords) map (_ => producer.send(record0)) assertTrue("No request is complete.", responses.forall(!_.isDone())) producer.close(0, TimeUnit.MILLISECONDS) @@ -482,7 +485,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { } } for (i <- 0 until 50) { - val producer = createProducer(brokerList, lingerMs = Int.MaxValue) + val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue) try { // send message to partition 0 // Only send the records in the first callback since we close the producer in the callback and no records diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index 8ab32af..1bb8989 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -45,6 +45,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { def testBatchSizeZero() { val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, + deliveryTimeoutMs = Int.MaxValue, batchSize = 0) sendAndVerify(producer) } @@ -53,13 +54,14 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { def testSendCompressedMessageWithLogAppendTime() { val producer = createProducer(brokerList = brokerList, compressionType = "gzip", - lingerMs = Int.MaxValue) + lingerMs = Int.MaxValue, + deliveryTimeoutMs = Int.MaxValue) sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME) } @Test def testSendNonCompressedMessageWithLogAppendTime() { - val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue) + val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue) sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME) } diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 86f3314..8ef611a 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -257,6 +257,7 @@ class FetchRequestTest extends BaseRequestTest { val batchSize = 4 * msgValueLen val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), lingerMs = Int.MaxValue, + deliveryTimeoutMs = Int.MaxValue, batchSize = batchSize, keySerializer = new StringSerializer, valueSerializer = new ByteArraySerializer)