[
https://issues.apache.org/jira/browse/KAFKA-7518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16684267#comment-16684267
]
ASF GitHub Bot commented on KAFKA-7518:
---
ijuma closed pull request #5815: KAFKA-7518: FutureRecordMetadata.get deadline
calculation fix
URL: https://github.com/apache/kafka/pull/5815
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index a38bd04f906..e448d6ebdc3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -29,6 +29,7 @@
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Time;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -254,7 +255,8 @@ private void verifyNoTransactionInFlight() {
partition = partition(record, this.cluster);
TopicPartition topicPartition = new TopicPartition(record.topic(),
partition);
ProduceRequestResult result = new ProduceRequestResult(topicPartition);
-FutureRecordMetadata future = new FutureRecordMetadata(result, 0,
RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
+FutureRecordMetadata future = new FutureRecordMetadata(result, 0,
RecordBatch.NO_TIMESTAMP,
+0L, 0, 0, Time.SYSTEM);
long offset = nextOffset(topicPartition);
Completion completion = new Completion(offset, new
RecordMetadata(topicPartition, 0, offset,
RecordBatch.NO_TIMESTAMP, Long.valueOf(0L), 0, 0), result,
callback);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index 8fcc46ff3ff..d1a643b3196 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -16,13 +16,14 @@
*/
package org.apache.kafka.clients.producer.internals;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.utils.Time;
+
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.kafka.clients.producer.RecordMetadata;
-
/**
* The future result of a record send
*/
@@ -34,16 +35,18 @@
private final Long checksum;
private final int serializedKeySize;
private final int serializedValueSize;
+private final Time time;
private volatile FutureRecordMetadata nextRecordMetadata = null;
public FutureRecordMetadata(ProduceRequestResult result, long
relativeOffset, long createTimestamp,
-Long checksum, int serializedKeySize, int
serializedValueSize) {
+Long checksum, int serializedKeySize, int
serializedValueSize, Time time) {
this.result = result;
this.relativeOffset = relativeOffset;
this.createTimestamp = createTimestamp;
this.checksum = checksum;
this.serializedKeySize = serializedKeySize;
this.serializedValueSize = serializedValueSize;
+this.time = time;
}
@Override
@@ -67,13 +70,14 @@ public RecordMetadata get() throws InterruptedException,
ExecutionException {
@Override
public RecordMetadata get(long timeout, TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException {
// Handle overflow.
-long now = System.currentTimeMillis();
-long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now
+ timeout;
+long now = time.milliseconds();
+long timeoutMillis = unit.toMillis(timeout);
+long deadline = Long.MAX_VALUE - timeoutMillis < now ? Long.MAX_VALUE
: now + timeoutMillis;
boolean occurred = this.result.await(timeout, unit);
-if (nextRecordMetadata != null)
-return nextRecordMetadata.get(deadline -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (!occurred)
-throw new TimeoutException("Timeout after waiting for " +
TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
+throw new TimeoutException("Timeout after waiting for " +
timeoutMillis + " ms.");
+if (nextRecordMetadata