Repository: kafka
Updated Branches:
  refs/heads/trunk 690575ec4 -> ea533f0c5


KAFKA-5913; Add hasOffset() and hasTimestamp() methods to RecordMetadata

These methods help users check for cases in which this metadata was not 
returned by the broker (e.g. in the case of acks=0 or a duplicate error when 
idempotence is enabled).

Author: Apurva Mehta <apu...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #3878 from apurvam/KAFKA-5913-add-record-metadata-not-available-exception


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ea533f0c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ea533f0c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ea533f0c

Branch: refs/heads/trunk
Commit: ea533f0c5e2438f0aef070dbadb023bffb9ce009
Parents: 690575e
Author: Apurva Mehta <apu...@confluent.io>
Authored: Thu Sep 21 14:30:37 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Sep 21 14:39:00 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/RecordMetadata.java  | 23 +++++++++++++++++++-
 .../clients/producer/RecordMetadataTest.java    |  2 ++
 .../clients/producer/internals/SenderTest.java  |  6 ++++-
 .../kafka/api/ProducerFailureHandlingTest.scala |  6 ++++-
 4 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ea533f0c/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index 6757a6d..0924244 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -18,6 +18,8 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.ProduceResponse;
 
 /**
  * The metadata for a record that has been acknowledged by the server
@@ -65,17 +67,36 @@ public final class RecordMetadata {
     }
 
     /**
+     * Indicates whether the record metadata includes the offset.
+     * @return true if the offset is included in the metadata, false otherwise.
+     */
+    public boolean hasOffset() {
+        return this.offset != ProduceResponse.INVALID_OFFSET;
+    }
+
+    /**
      * The offset of the record in the topic/partition.
+     * @return the offset of the record, or -1 if {{@link #hasOffset()}} 
returns false.
      */
     public long offset() {
         return this.offset;
     }
 
     /**
+     * Indicates whether the record metadata includes the timestamp.
+     * @return true if a valid timestamp exists, false otherwise.
+     */
+    public boolean hasTimestamp() {
+        return this.timestamp != RecordBatch.NO_TIMESTAMP;
+    }
+
+    /**
      * The timestamp of the record in the topic/partition.
+     *
+     * @return the timestamp of the record, or -1 if the {{@link 
#hasTimestamp()}} returns false.
      */
     public long timestamp() {
-        return timestamp;
+        return this.timestamp;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea533f0c/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
index a735a61..bc3ffc7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.record.DefaultRecord;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 public class RecordMetadataTest {
 
@@ -37,6 +38,7 @@ public class RecordMetadataTest {
         assertEquals(tp.topic(), metadata.topic());
         assertEquals(tp.partition(), metadata.partition());
         assertEquals(timestamp, metadata.timestamp());
+        assertFalse(metadata.hasOffset());
         assertEquals(-1L, metadata.offset());
         assertEquals(checksum.longValue(), metadata.checksum());
         assertEquals(keySize, metadata.serializedKeySize());

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea533f0c/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 0995e36..e1ea10a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -1154,7 +1154,7 @@ public class SenderTest {
         assertEquals(1000, transactionManager.lastAckedOffset(tp0));
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
 
-        client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, 
Errors.DUPLICATE_SEQUENCE_NUMBER, -1));
+        client.respondToRequest(firstClientRequest, produceResponse(tp0, 
ProduceResponse.INVALID_OFFSET, Errors.DUPLICATE_SEQUENCE_NUMBER, 0));
 
         sender.run(time.milliseconds()); // receive response 0
 
@@ -1162,6 +1162,10 @@ public class SenderTest {
         assertEquals(1, transactionManager.lastAckedSequence(tp0));
         assertEquals(1000, transactionManager.lastAckedOffset(tp0));
         assertFalse(client.hasInFlightRequests());
+
+        RecordMetadata unknownMetadata = request1.get();
+        assertFalse(unknownMetadata.hasOffset());
+        assertEquals(-1L, unknownMetadata.offset());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea533f0c/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 49a096a..e1b1c9d 100644
--- 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -92,7 +92,11 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
 
     // send a too-large record
     val record = new ProducerRecord(topic1, null, "key".getBytes, new 
Array[Byte](serverMessageMaxBytes + 1))
-    assertEquals("Returned metadata should have offset -1", 
producer1.send(record).get.offset, -1L)
+
+    val recordMetadata = producer1.send(record).get()
+    assertNotNull(recordMetadata)
+    assertFalse(recordMetadata.hasOffset)
+    assertEquals(-1L, recordMetadata.offset)
   }
 
   /**

Reply via email to