junrao commented on code in PR #19489:
URL: https://github.com/apache/kafka/pull/19489#discussion_r2590017754
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -403,9 +367,7 @@ private long sendProducerData(long now) {
}
accumulator.resetNextBatchExpiryTime();
- List<ProducerBatch> expiredInflightBatches =
getExpiredInflightBatches(now);
Review Comment:
One impact of this change is that now a record could take longer than
delivery time to complete. One way to address this is to set the timeout for
client.newClientRequest as the lower of the request timeout and the remaining
delivery time since the batch creation time.
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.producer.internals.BufferPool;
+import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
+import org.apache.kafka.clients.producer.internals.ProducerMetadata;
+import org.apache.kafka.clients.producer.internals.ProducerMetrics;
+import org.apache.kafka.clients.producer.internals.RecordAccumulator;
+import org.apache.kafka.clients.producer.internals.Sender;
+import org.apache.kafka.common.compress.NoCompression;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+
+public class ProducerIntegrationTest {
+
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1"),
+ })
+ public void testInFlightBatchShouldNotBeCorrupted(ClusterInstance cluster)
throws InterruptedException,
+ ExecutionException {
+ String topic = "test-topic";
+ cluster.createTopic("test-topic", 1, (short) 1);
+ try (var producer = expireProducer(cluster)) {
+ producer.send(new ProducerRecord<>(topic, "key".getBytes(),
"value".getBytes())).get();
+ }
+ try (var consumer = cluster.consumer()) {
+ consumer.subscribe(List.of(topic));
+ TestUtils.waitForCondition(() ->
consumer.poll(Duration.ofSeconds(1)).count() == 1, 5000, "failed to poll data");
+ }
+
+ }
+
+
+ @SuppressWarnings({"unchecked", "this-escape"})
+ private Producer<byte[], byte[]> expireProducer(ClusterInstance cluster) {
+ Map<String, Object> config = Map.of(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName(),
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName(),
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers(),
+ ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false,
+ ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 2000,
+ ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1500
+ );
+ return new EvilKafkaProducerBuilder().build(config);
+ }
+
+ static class EvilKafkaProducerBuilder {
+
+ Serializer<byte[]> serializer = new ByteArraySerializer();
+ ApiVersions apiVersions = new ApiVersions();
+ LogContext logContext = new LogContext("[expire Producer test ]");
+ Metrics metrics = new Metrics(Time.SYSTEM);
+
+ String clientId;
+ String transactionalId;
+ ProducerConfig config;
+ ProducerMetadata metadata;
+ RecordAccumulator accumulator;
+ Partitioner partitioner;
+ Sender sender;
+ ProducerInterceptors<String, String> interceptors;
+
+ @SuppressWarnings({"unchecked", "this-escape"})
+ Producer<byte[], byte[]> build(Map<String, Object> configs) {
+ this.config = new
ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
Review Comment:
Could we be consistent with the usage of `this.`? We don't use `this.` for
other instance variables. Ditto in a few other places.
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.producer.internals.BufferPool;
+import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
+import org.apache.kafka.clients.producer.internals.ProducerMetadata;
+import org.apache.kafka.clients.producer.internals.ProducerMetrics;
+import org.apache.kafka.clients.producer.internals.RecordAccumulator;
+import org.apache.kafka.clients.producer.internals.Sender;
+import org.apache.kafka.common.compress.NoCompression;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+
+public class ProducerIntegrationTest {
+
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1"),
+ })
+ public void testInFlightBatchShouldNotBeCorrupted(ClusterInstance cluster)
throws InterruptedException,
+ ExecutionException {
+ String topic = "test-topic";
+ cluster.createTopic("test-topic", 1, (short) 1);
+ try (var producer = expireProducer(cluster)) {
+ producer.send(new ProducerRecord<>(topic, "key".getBytes(),
"value".getBytes())).get();
+ }
+ try (var consumer = cluster.consumer()) {
+ consumer.subscribe(List.of(topic));
+ TestUtils.waitForCondition(() ->
consumer.poll(Duration.ofSeconds(1)).count() == 1, 5000, "failed to poll data");
+ }
+
+ }
+
+
+ @SuppressWarnings({"unchecked", "this-escape"})
+ private Producer<byte[], byte[]> expireProducer(ClusterInstance cluster) {
+ Map<String, Object> config = Map.of(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName(),
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName(),
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers(),
+ ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false,
+ ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 2000,
+ ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1500
+ );
+ return new EvilKafkaProducerBuilder().build(config);
+ }
+
+ static class EvilKafkaProducerBuilder {
+
+ Serializer<byte[]> serializer = new ByteArraySerializer();
+ ApiVersions apiVersions = new ApiVersions();
+ LogContext logContext = new LogContext("[expire Producer test ]");
+ Metrics metrics = new Metrics(Time.SYSTEM);
+
+ String clientId;
+ String transactionalId;
+ ProducerConfig config;
+ ProducerMetadata metadata;
+ RecordAccumulator accumulator;
+ Partitioner partitioner;
+ Sender sender;
+ ProducerInterceptors<String, String> interceptors;
+
+ @SuppressWarnings({"unchecked", "this-escape"})
+ Producer<byte[], byte[]> build(Map<String, Object> configs) {
+ this.config = new
ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
+ transactionalId =
config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+ clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
+ return new KafkaProducer<>(
+ config,
+ logContext,
+ metrics,
+ serializer,
+ serializer,
+ buildMetadata(),
+ buildAccumulator(),
+ null,
+ buildSender(),
+ buildInterceptors(),
+ buildPartition(),
+ Time.SYSTEM,
+ ioThread(),
+ Optional.empty()
+ );
+ }
+
+
+ private ProducerInterceptors buildInterceptors() {
+ this.interceptors = new ProducerInterceptors<>(List.of(), metrics);
+ return this.interceptors;
+ }
+
+ private Partitioner buildPartition() {
+ this.partitioner = config.getConfiguredInstance(
+ ProducerConfig.PARTITIONER_CLASS_CONFIG,
+ Partitioner.class,
+ Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG,
clientId));
+ return this.partitioner;
+ }
+
+ private Sender buildSender() {
+ int maxInflightRequests =
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+ int requestTimeoutMs =
config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ ProducerMetrics metricsRegistry = new
ProducerMetrics(this.metrics);
+ Sensor throttleTimeSensor =
Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
+ KafkaClient client = ClientUtils.createNetworkClient(config,
+ this.metrics,
+ "producer",
+ logContext,
+ apiVersions,
+ Time.SYSTEM,
+ maxInflightRequests,
+ metadata,
+ throttleTimeSensor,
+ null);
+
+ short acks =
Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));
+ this.sender = new Sender(logContext,
+ client,
+ metadata,
+ this.accumulator,
+ maxInflightRequests == 1,
+ config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
+ acks,
+ config.getInt(ProducerConfig.RETRIES_CONFIG),
+ metricsRegistry.senderMetrics,
+ Time.SYSTEM,
+ requestTimeoutMs,
+ config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
+ null) {
+ @Override
+ protected long sendProducerData(long now) {
+ long result = super.sendProducerData(now);
+ try {
+ // Ensure the batch expires.
+ Thread.sleep(500);
Review Comment:
Hmm, the delivery time is 2000ms. Is this enough to cause the batch to
expire? Also, the integration test doesn't control when the inflight request is
actually sent in the Socket layer. So, the test may not be very effective since
the inflight request is likely sent to the Socket before the delivery time
expires.
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.producer.internals.BufferPool;
+import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
+import org.apache.kafka.clients.producer.internals.ProducerMetadata;
+import org.apache.kafka.clients.producer.internals.ProducerMetrics;
+import org.apache.kafka.clients.producer.internals.RecordAccumulator;
+import org.apache.kafka.clients.producer.internals.Sender;
+import org.apache.kafka.common.compress.NoCompression;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+
+public class ProducerIntegrationTest {
+
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key = "offsets.topic.replication.factor", value
= "1"),
+ })
+ public void testInFlightBatchShouldNotBeCorrupted(ClusterInstance cluster)
throws InterruptedException,
+ ExecutionException {
+ String topic = "test-topic";
+ cluster.createTopic("test-topic", 1, (short) 1);
+ try (var producer = expireProducer(cluster)) {
+ producer.send(new ProducerRecord<>(topic, "key".getBytes(),
"value".getBytes())).get();
+ }
+ try (var consumer = cluster.consumer()) {
+ consumer.subscribe(List.of(topic));
+ TestUtils.waitForCondition(() ->
consumer.poll(Duration.ofSeconds(1)).count() == 1, 5000, "failed to poll data");
+ }
+
Review Comment:
extra new line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]