hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r665730032



##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -90,11 +90,15 @@
  * by the broker and is preserved after compaction. Additionally, the 
MaxTimestamp of an empty batch always retains
  * the previous value prior to becoming empty.
  *
+ * The delete horizon flag for the sixth bit is used to determine if the first 
timestamp of the batch had been set to
+ * the time for which tombstones / transaction markers needs to be removed. If 
it is true, then the first timestamp is

Review comment:
       nit: needs -> need?

##########
File path: checkstyle/checkstyle.xml
##########
@@ -133,7 +133,7 @@
     </module>
     <module name="CyclomaticComplexity">
       <!-- default is 10-->
-      <property name="max" value="16"/>
+      <property name="max" value="18"/>

Review comment:
       nit: usually we prefer to add exclusions rather than change the limit 
for everything

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +161,27 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the 
create time of the record even if the
+     * Gets the base timestamp of the batch which is used to calculate the 
timestamp deltas.
+     * 
+     * @return The base timestamp or
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     */
+    public long baseTimestamp() {
+        return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
+    }
+
+    /**
+     * Get the timestamp of the first record in this batch. It is usually the 
create time of the record even if the
      * timestamp type of the batch is log append time.
-     *
-     * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the 
batch is empty
+     * 
+     * @return The first timestamp if a record has been appended, unless the 
delete horizon has been set
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty or if 
the delete horizon is set
      */
     public long firstTimestamp() {

Review comment:
       As far as I can tell, the only use of this method is in tests (after we 
fix `DefaultRecordBatch.RecordIterator`). Maybe we can remove it or give it 
default access? Otherwise, the implementation is a little dangerous because it 
does not accurately return the first timestamp in all cases.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -156,13 +161,27 @@ public void ensureValid() {
     }
 
     /**
-     * Get the timestamp of the first record in this batch. It is always the 
create time of the record even if the
+     * Gets the base timestamp of the batch which is used to calculate the 
timestamp deltas.
+     * 
+     * @return The base timestamp or
+     *         {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+     */
+    public long baseTimestamp() {

Review comment:
       I think we need to update `DefaultRecordBatch.RecordIterator` to use 
`baseTimestamp()` instead of `firstTimestamp()`. We should also make sure we 
have a test case to ensure that the record timestamps remain value even when 
the delete horizon has been set.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
##########
@@ -246,6 +265,19 @@ public boolean isTransactional() {
         return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
     }
 
+    @Override
+    public boolean hasDeleteHorizonMs() {
+        return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+    }
+
+    @Override
+    public long deleteHorizonMs() {

Review comment:
       Maybe we could return `OptionalLong` and get rid of `hasDeleteHorizonMs`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to