Repository: kafka
Updated Branches:
  refs/heads/trunk a16475eb3 -> c4193cd1a


MINOR: Rename baseTimestamp to firstTimestamp to clarify usage

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@juma.me.uk>

Closes #3457 from hachikuji/rename-base-timestamp


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4193cd1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4193cd1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4193cd1

Branch: refs/heads/trunk
Commit: c4193cd1ad8eeed8383a1c2660383c455e848929
Parents: a16475e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Jun 30 06:48:45 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Fri Jun 30 06:48:45 2017 +0100

----------------------------------------------------------------------
 .../kafka/common/record/DefaultRecordBatch.java | 80 ++++++++++++--------
 .../common/record/MemoryRecordsBuilder.java     | 12 +--
 .../kafka/common/record/MutableRecordBatch.java |  2 +-
 .../common/record/DefaultRecordBatchTest.java   |  2 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala |  2 +-
 5 files changed, 57 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c4193cd1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 2262b33..ff8d3b9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -48,7 +48,7 @@ import static 
org.apache.kafka.common.record.Records.LOG_OVERHEAD;
  *  CRC => Uint32
  *  Attributes => Int16
  *  LastOffsetDelta => Int32 // also serves as LastSequenceDelta
- *  BaseTimestamp => Int64
+ *  FirstTimestamp => Int64
  *  MaxTimestamp => Int64
  *  ProducerId => Int64
  *  ProducerEpoch => Int16
@@ -64,12 +64,28 @@ import static 
org.apache.kafka.common.record.Records.LOG_OVERHEAD;
  * computation to avoid the need to recompute the CRC when this field is 
assigned for every batch that is received by
  * the broker. The CRC-32C (Castagnoli) polynomial is used for the computation.
  *
- * On compaction: unlike the older message formats, magic v2 and above 
preserves the first and last offset/sequence
+ * On Compaction: Unlike the older message formats, magic v2 and above 
preserves the first and last offset/sequence
  * numbers from the original batch when the log is cleaned. This is required 
in order to be able to restore the
- * producer's state when the log is reloaded. If we did not retain the last 
sequence number, for example, then
- * after a partition leader failure, the producer might see an OutOfSequence 
error. The base sequence number must
- * be preserved for duplicate checking (the broker checks incoming Produce 
requests for duplicates by verifying
- * that the first and last sequence numbers of the incoming batch match the 
last from that producer).
+ * producer's state when the log is reloaded. If we did not retain the last 
sequence number, then following
+ * a partition leader failure, once the new leader has rebuilt the producer 
state from the log, the next sequence
+ * expected number would no longer be in sync with what was written by the 
client. This would cause an
+ * unexpected OutOfOrderSequence error, which is typically fatal. The base 
sequence number must be preserved for
+ * duplicate checking: the broker checks incoming Produce requests for 
duplicates by verifying that the first and
+ * last sequence numbers of the incoming batch match the last from that 
producer.
+ *
+ * Note that if all of the records in a batch are removed during compaction, 
the broker may still retain an empty
+ * batch header in order to preserve the producer sequence information as 
described above. These empty batches
+ * are retained only until either a new sequence number is written by the 
corresponding producer or the producerId
+ * is expired from lack of activity.
+ *
+ * There is no similar need to preserve the timestamp from the original batch 
after compaction. The FirstTimestamp
+ * field therefore always reflects the timestamp of the first record in the 
batch. If the batch is empty, the
+ * FirstTimestamp will be set to -1 (NO_TIMESTAMP).
+ *
+ * Similarly, the MaxTimestamp field reflects the maximum timestamp of the 
current records if the timestamp type
+ * is CREATE_TIME. For LOG_APPEND_TIME, on the other hand, the MaxTimestamp 
field reflects the timestamp set
+ * by the broker and is preserved after compaction. Additionally, the 
MaxTimestamp of an empty batch always retains
+ * the previous value prior to becoming empty.
  *
  * The current attributes are given below:
  *
@@ -92,9 +108,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
     static final int ATTRIBUTE_LENGTH = 2;
     static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + 
ATTRIBUTE_LENGTH;
     static final int LAST_OFFSET_DELTA_LENGTH = 4;
-    static final int BASE_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + 
LAST_OFFSET_DELTA_LENGTH;
-    static final int BASE_TIMESTAMP_LENGTH = 8;
-    static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + 
BASE_TIMESTAMP_LENGTH;
+    static final int FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + 
LAST_OFFSET_DELTA_LENGTH;
+    static final int FIRST_TIMESTAMP_LENGTH = 8;
+    static final int MAX_TIMESTAMP_OFFSET = FIRST_TIMESTAMP_OFFSET + 
FIRST_TIMESTAMP_LENGTH;
     static final int MAX_TIMESTAMP_LENGTH = 8;
     static final int PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + 
MAX_TIMESTAMP_LENGTH;
     static final int PRODUCER_ID_LENGTH = 8;
@@ -138,10 +154,10 @@ public class DefaultRecordBatch extends 
AbstractRecordBatch implements MutableRe
      * Get the timestamp of the first record in this batch. It is always the 
create time of the record even if the
      * timestamp type of the batch is log append time.
      *
-     * @return The base timestamp
+     * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the 
batch is empty
      */
-    public long baseTimestamp() {
-        return buffer.getLong(BASE_TIMESTAMP_OFFSET);
+    public long firstTimestamp() {
+        return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
     }
 
     @Override
@@ -243,9 +259,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
 
         return new RecordIterator() {
             @Override
-            protected Record readNext(long baseOffset, long baseTimestamp, int 
baseSequence, Long logAppendTime) {
+            protected Record readNext(long baseOffset, long firstTimestamp, 
int baseSequence, Long logAppendTime) {
                 try {
-                    return DefaultRecord.readFrom(inputStream, baseOffset, 
baseTimestamp, baseSequence, logAppendTime);
+                    return DefaultRecord.readFrom(inputStream, baseOffset, 
firstTimestamp, baseSequence, logAppendTime);
                 } catch (EOFException e) {
                     throw new InvalidRecordException("Incorrect declared batch 
size, premature EOF reached");
                 } catch (IOException e) {
@@ -278,9 +294,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
         buffer.position(RECORDS_OFFSET);
         return new RecordIterator() {
             @Override
-            protected Record readNext(long baseOffset, long baseTimestamp, int 
baseSequence, Long logAppendTime) {
+            protected Record readNext(long baseOffset, long firstTimestamp, 
int baseSequence, Long logAppendTime) {
                 try {
-                    return DefaultRecord.readFrom(buffer, baseOffset, 
baseTimestamp, baseSequence, logAppendTime);
+                    return DefaultRecord.readFrom(buffer, baseOffset, 
firstTimestamp, baseSequence, logAppendTime);
                 } catch (BufferUnderflowException e) {
                     throw new InvalidRecordException("Incorrect declared batch 
size, premature EOF reached");
                 }
@@ -420,7 +436,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
                             byte magic,
                             CompressionType compressionType,
                             TimestampType timestampType,
-                            long baseTimestamp,
+                            long firstTimestamp,
                             long maxTimestamp,
                             long producerId,
                             short epoch,
@@ -431,8 +447,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
                             int numRecords) {
         if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
             throw new IllegalArgumentException("Invalid magic value " + magic);
-        if (baseTimestamp < 0 && baseTimestamp != NO_TIMESTAMP)
-            throw new IllegalArgumentException("Invalid message timestamp " + 
baseTimestamp);
+        if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
+            throw new IllegalArgumentException("Invalid message timestamp " + 
firstTimestamp);
 
         short attributes = computeAttributes(compressionType, timestampType, 
isTransactional, isControlBatch);
 
@@ -442,7 +458,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
         buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, 
partitionLeaderEpoch);
         buffer.put(position + MAGIC_OFFSET, magic);
         buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
-        buffer.putLong(position + BASE_TIMESTAMP_OFFSET, baseTimestamp);
+        buffer.putLong(position + FIRST_TIMESTAMP_OFFSET, firstTimestamp);
         buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
         buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
         buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
@@ -466,13 +482,13 @@ public class DefaultRecordBatch extends 
AbstractRecordBatch implements MutableRe
             return 0;
 
         int size = RECORD_BATCH_OVERHEAD;
-        Long baseTimestamp = null;
+        Long firstTimestamp = null;
         while (iterator.hasNext()) {
             Record record = iterator.next();
             int offsetDelta = (int) (record.offset() - baseOffset);
-            if (baseTimestamp == null)
-                baseTimestamp = record.timestamp();
-            long timestampDelta = record.timestamp() - baseTimestamp;
+            if (firstTimestamp == null)
+                firstTimestamp = record.timestamp();
+            long timestampDelta = record.timestamp() - firstTimestamp;
             size += DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, 
record.key(), record.value(),
                     record.headers());
         }
@@ -486,12 +502,12 @@ public class DefaultRecordBatch extends 
AbstractRecordBatch implements MutableRe
 
         int size = RECORD_BATCH_OVERHEAD;
         int offsetDelta = 0;
-        Long baseTimestamp = null;
+        Long firstTimestamp = null;
         while (iterator.hasNext()) {
             SimpleRecord record = iterator.next();
-            if (baseTimestamp == null)
-                baseTimestamp = record.timestamp();
-            long timestampDelta = record.timestamp() - baseTimestamp;
+            if (firstTimestamp == null)
+                firstTimestamp = record.timestamp();
+            long timestampDelta = record.timestamp() - firstTimestamp;
             size += DefaultRecord.sizeInBytes(offsetDelta++, timestampDelta, 
record.key(), record.value(),
                     record.headers());
         }
@@ -516,7 +532,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
     private abstract class RecordIterator implements CloseableIterator<Record> 
{
         private final Long logAppendTime;
         private final long baseOffset;
-        private final long baseTimestamp;
+        private final long firstTimestamp;
         private final int baseSequence;
         private final int numRecords;
         private int readRecords = 0;
@@ -524,7 +540,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
         public RecordIterator() {
             this.logAppendTime = timestampType() == 
TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
             this.baseOffset = baseOffset();
-            this.baseTimestamp = baseTimestamp();
+            this.firstTimestamp = firstTimestamp();
             this.baseSequence = baseSequence();
             int numRecords = count();
             if (numRecords < 0)
@@ -544,7 +560,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
                 throw new NoSuchElementException();
 
             readRecords++;
-            Record rec = readNext(baseOffset, baseTimestamp, baseSequence, 
logAppendTime);
+            Record rec = readNext(baseOffset, firstTimestamp, baseSequence, 
logAppendTime);
             if (readRecords == numRecords) {
                 // Validate that the actual size of the batch is equal to 
declared size
                 // by checking that after reading declared number of items, 
there no items left
@@ -555,7 +571,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
             return rec;
         }
 
-        protected abstract Record readNext(long baseOffset, long 
baseTimestamp, int baseSequence, Long logAppendTime);
+        protected abstract Record readNext(long baseOffset, long 
firstTimestamp, int baseSequence, Long logAppendTime);
 
         protected abstract boolean ensureNoneRemaining();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4193cd1/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 68b0bf0..19d25d7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -71,7 +71,7 @@ public class MemoryRecordsBuilder {
     private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
     private long offsetOfMaxTimestamp = -1;
     private Long lastOffset = null;
-    private Long baseTimestamp = null;
+    private Long firstTimestamp = null;
 
     private MemoryRecords builtRecords;
     private boolean aborted = false;
@@ -342,7 +342,7 @@ public class MemoryRecordsBuilder {
             maxTimestamp = this.maxTimestamp;
 
         DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, 
magic, compressionType, timestampType,
-                baseTimestamp, maxTimestamp, producerId, producerEpoch, 
baseSequence, isTransactional, isControlBatch,
+                firstTimestamp, maxTimestamp, producerId, producerEpoch, 
baseSequence, isTransactional, isControlBatch,
                 partitionLeaderEpoch, numRecords);
 
         buffer.position(pos);
@@ -389,8 +389,8 @@ public class MemoryRecordsBuilder {
             if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && 
headers.length > 0)
                 throw new IllegalArgumentException("Magic v" + magic + " does 
not support record headers");
 
-            if (baseTimestamp == null)
-                baseTimestamp = timestamp;
+            if (firstTimestamp == null)
+                firstTimestamp = timestamp;
 
             if (magic > RecordBatch.MAGIC_VALUE_V1) {
                 appendDefaultRecord(offset, timestamp, key, value, headers);
@@ -605,7 +605,7 @@ public class MemoryRecordsBuilder {
                                      Header[] headers) throws IOException {
         ensureOpenForRecordAppend();
         int offsetDelta = (int) (offset - baseOffset);
-        long timestampDelta = timestamp - baseTimestamp;
+        long timestampDelta = timestamp - firstTimestamp;
         int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, 
timestampDelta, key, value, headers);
         recordWritten(offset, timestamp, sizeInBytes);
     }
@@ -710,7 +710,7 @@ public class MemoryRecordsBuilder {
             recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, 
key, value);
         } else {
             int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - 
baseOffset + 1);
-            long timestampDelta = baseTimestamp == null ? 0 : timestamp - 
baseTimestamp;
+            long timestampDelta = firstTimestamp == null ? 0 : timestamp - 
firstTimestamp;
             recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, 
timestampDelta, key, value, headers);
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4193cd1/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java 
b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
index 8049469..c13bb5a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
@@ -33,7 +33,7 @@ public interface MutableRecordBatch extends RecordBatch {
     /**
      * Set the max timestamp for this batch. When using log append time, this 
effectively overrides the individual
      * timestamps of all the records contained in the batch. To avoid 
recompression, the record fields are not updated
-     * by this method, but clients ignore them if the timestamp time is log 
append time. Note that baseTimestamp is not
+     * by this method, but clients ignore them if the timestamp time is log 
append time. Note that firstTimestamp is not
      * updated by this method.
      *
      * This typically requires re-computation of the batch's CRC.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4193cd1/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index 587bf14..ab8cbb7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -63,7 +63,7 @@ public class DefaultRecordBatchTest {
                     assertEquals(isTransactional, batch.isTransactional());
                     assertEquals(timestampType, batch.timestampType());
                     assertEquals(timestamp, batch.maxTimestamp());
-                    assertEquals(RecordBatch.NO_TIMESTAMP, 
batch.baseTimestamp());
+                    assertEquals(RecordBatch.NO_TIMESTAMP, 
batch.firstTimestamp());
                     assertEquals(isControlBatch, batch.isControlBatch());
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4193cd1/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala 
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 2225ca6..a5d415e 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -991,7 +991,7 @@ class LogValidatorTest {
   def maybeCheckBaseTimestamp(expected: Long, batch: RecordBatch): Unit = {
     batch match {
       case b: DefaultRecordBatch =>
-        assertEquals(s"Unexpected base timestamp of batch $batch", expected, 
b.baseTimestamp)
+        assertEquals(s"Unexpected base timestamp of batch $batch", expected, 
b.firstTimestamp)
       case _ => // no-op
     }
   }

Reply via email to