hachikuji commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r665730032
########## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ########## @@ -90,11 +90,15 @@ * by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains * the previous value prior to becoming empty. * + * The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to + * the time for which tombstones / transaction markers needs to be removed. If it is true, then the first timestamp is Review comment: nit: needs -> need? ########## File path: checkstyle/checkstyle.xml ########## @@ -133,7 +133,7 @@ </module> <module name="CyclomaticComplexity"> <!-- default is 10--> - <property name="max" value="16"/> + <property name="max" value="18"/> Review comment: nit: usually we prefer to add exclusions rather than change the limit for everything ########## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ########## @@ -156,13 +161,27 @@ public void ensureValid() { } /** - * Get the timestamp of the first record in this batch. It is always the create time of the record even if the + * Gets the base timestamp of the batch which is used to calculate the timestamp deltas. + * + * @return The base timestamp or + * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty + */ + public long baseTimestamp() { + return buffer.getLong(FIRST_TIMESTAMP_OFFSET); + } + + /** + * Get the timestamp of the first record in this batch. It is usually the create time of the record even if the * timestamp type of the batch is log append time. - * - * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty + * + * @return The first timestamp if a record has been appended, unless the delete horizon has been set + * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty or if the delete horizon is set */ public long firstTimestamp() { Review comment: As far as I can tell, the only use of this method is in tests (after we fix `DefaultRecordBatch.RecordIterator`). Maybe we can remove it or give it default access? Otherwise, the implementation is a little dangerous because it does not accurately return the first timestamp in all cases. ########## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ########## @@ -156,13 +161,27 @@ public void ensureValid() { } /** - * Get the timestamp of the first record in this batch. It is always the create time of the record even if the + * Gets the base timestamp of the batch which is used to calculate the timestamp deltas. + * + * @return The base timestamp or + * {@link RecordBatch#NO_TIMESTAMP} if the batch is empty + */ + public long baseTimestamp() { Review comment: I think we need to update `DefaultRecordBatch.RecordIterator` to use `baseTimestamp()` instead of `firstTimestamp()`. We should also make sure we have a test case to ensure that the record timestamps remain value even when the delete horizon has been set. ########## File path: clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ########## @@ -246,6 +265,19 @@ public boolean isTransactional() { return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0; } + @Override + public boolean hasDeleteHorizonMs() { + return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0; + } + + @Override + public long deleteHorizonMs() { Review comment: Maybe we could return `OptionalLong` and get rid of `hasDeleteHorizonMs`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org